forked from mozilla-services/heka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
plugin_runners.go
1747 lines (1557 loc) · 46.4 KB
/
plugin_runners.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012-2015
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller ([email protected])
# Mike Trinkala ([email protected])
# Ben Bangert ([email protected])
#
# ***** END LICENSE BLOCK *****/
package pipeline
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/mozilla-services/heka/client"
"github.com/mozilla-services/heka/message"
)
var ErrUnknownPluginType = errors.New("Unable to assert this is an Output or Filter")
// Base interface for the Heka plugin runners.
type PluginRunner interface {
// All plugins either can or cannot stop without causing Heka to shutdown.
Stoppable
// Plugin name.
Name() string
// Plugin name mutator.
SetName(name string)
// Underlying plugin object.
Plugin() Plugin
// Plugins should call `LogError` on their runner to log error messages
// rather than doing logging directly.
LogError(err error)
// Plugins should call `LogMessage` on their runner to write to the log
// rather than doing so directly.
LogMessage(msg string)
// Sets the amount of currently 'leaked' packs that have gone through
// this plugin. The new value will overwrite prior ones.
SetLeakCount(count int)
// Returns the current leak count
LeakCount() int
}
// Base struct for the specialized PluginRunners
type pRunnerBase struct {
notStoppable
name string
plugin Plugin
h PluginHelper
leakCount int
maker PluginMaker
}
func (pr *pRunnerBase) Name() string {
return pr.name
}
func (pr *pRunnerBase) SetName(name string) {
pr.name = name
}
func (pr *pRunnerBase) Plugin() Plugin {
return pr.plugin
}
func (pr *pRunnerBase) SetLeakCount(count int) {
pr.leakCount = count
}
func (pr *pRunnerBase) LeakCount() int {
return pr.leakCount
}
// AddDecodeFailureFields adds two fields to the provided message object. The
// first field is a boolean field called `decode_failure`, set to true. The
// second is a string field called `decode_error` which will contain the
// provided error message, truncated to 500 bytes if necessary.
func AddDecodeFailureFields(m *message.Message, errMsg string) error {
field0, err := message.NewField("decode_failure", true, "")
if err != nil {
err = fmt.Errorf("field creation error: %s", err.Error())
return err
}
if len(errMsg) > 500 {
errMsg = errMsg[:500]
}
field1, err := message.NewField("decode_error", errMsg, "")
if err != nil {
err = fmt.Errorf("field creation error: %s", err.Error())
return err
}
m.AddField(field0)
m.AddField(field1)
return nil
}
type DeliverFunc func(pack *PipelinePack)
type Deliverer interface {
Deliver(pack *PipelinePack)
DeliverFunc() DeliverFunc
Done()
}
type deliverer struct {
deliver DeliverFunc
dRunner DecoderRunner
decoder Decoder
pConfig *PipelineConfig
}
func (d *deliverer) Deliver(pack *PipelinePack) {
d.deliver(pack)
}
func (d *deliverer) DeliverFunc() DeliverFunc {
return d.deliver
}
func (d *deliverer) Done() {
if d.dRunner != nil {
d.pConfig.StopDecoderRunner(d.dRunner)
}
if d.decoder != nil {
d.pConfig.allSyncDecodersLock.Lock()
for i, reporting := range d.pConfig.allSyncDecoders {
if d.decoder == reporting.decoder {
d.pConfig.allSyncDecoders = append(d.pConfig.allSyncDecoders[:i], d.pConfig.allSyncDecoders[i+1:]...)
break
}
}
d.pConfig.allSyncDecodersLock.Unlock()
}
}
// Heka PluginRunner for Input plugins.
type InputRunner interface {
PluginRunner
// InChan returns the input channel from which Inputs can get fresh
// PipelinePacks, ready to be populated.
InChan() chan *PipelinePack
// Input returns the associated Input plugin object.
Input() Input
// Ticker returns a ticker channel configured to send ticks at an interval
// specified by the plugin's ticker_interval config value, if provided.
Ticker() (ticker <-chan time.Time)
// Starts Input in a separate goroutine and returns. Should decrement the
// plugin when the Input stops and the goroutine has completed.
Start(h PluginHelper, wg *sync.WaitGroup) (err error)
// Injects PipelinePack into the Heka Router's input channel for delivery
// to all Filter and Output plugins with corresponding message_matchers.
Inject(pack *PipelinePack) error
// If Transient returns true, Heka won't try to keep the Input running,
// nor will it generate reporting data. Life span and reporting for a
// transient InputRunner must be managed by the code that creates the
// runner.
Transient() bool
// SetTransient specifies whether or not this Input should be considered
// transient.
SetTransient(transient bool)
// Creates and returns a new Deliverer, creating a new Decoder or
// DecoderRunner to handle decoding before router injection if necessary.
// It is the caller's responsibility to call `Done()` when the deliverer
// is no longer in use to ensure that any DecoderRunner goroutines get
// cleaned up.
NewDeliverer(token string) Deliverer
// Deliver accepts packs from the Input plugin and performs the
// appropriate one of three possible next actions. Possible actions are 1)
// placing the pack on the Decoder's input channel, if a decoder is
// specified and syncDecode is false; 2) synchronously decoding the pack
// and then placing it on the router's input channel, if a decoder is
// specified and syncDecode is true; or 3) placing the pack directly on
// the router's input channel, if no decoder is specified. Delegates to
// DeliverTo.
Deliver(pack *PipelinePack)
NewSplitterRunner(token string) SplitterRunner
// Tells if synchrounous decode is enabled
SynchronousDecode() bool
}
type iRunner struct {
pRunnerBase
input Input
config CommonInputConfig
pConfig *PipelineConfig
inChan chan *PipelinePack
ticker <-chan time.Time
transient bool
syncDecode bool
sendDecodeFailures bool
logDecodeFailures bool
deliver DeliverFunc
delivererOnce sync.Once
delivererLock sync.Mutex
canExit bool
shutdownWanters []WantsDecoderRunnerShutdown
shutdownLock sync.Mutex
}
func (ir *iRunner) Ticker() (ticker <-chan time.Time) {
return ir.ticker
}
func (ir *iRunner) Transient() bool {
return ir.transient
}
func (ir *iRunner) SetTransient(transient bool) {
ir.transient = transient
}
func (ir *iRunner) IsStoppable() bool {
return ir.canExit
}
// Creates and returns a new (not yet started) InputRunner associated w/ the
// provided Input. If transient is true Heka won't try to manage the input's
// life span at all, it's up to the caller to do so.
func NewInputRunner(name string, input Input, config CommonInputConfig) (ir InputRunner) {
runner := &iRunner{
pRunnerBase: pRunnerBase{
name: name,
plugin: input.(Plugin),
},
input: input,
config: config,
}
if config.SyncDecode != nil {
runner.syncDecode = *config.SyncDecode
}
if config.SendDecodeFailures != nil {
runner.sendDecodeFailures = *config.SendDecodeFailures
}
if config.LogDecodeFailures != nil {
runner.logDecodeFailures = *config.LogDecodeFailures
}
if config.CanExit != nil && *config.CanExit {
runner.canExit = true
}
return runner
}
func (ir *iRunner) Input() Input {
return ir.input
}
func (ir *iRunner) InChan() chan *PipelinePack {
return ir.inChan
}
func (ir *iRunner) Start(h PluginHelper, wg *sync.WaitGroup) (err error) {
ir.h = h
ir.pConfig = h.PipelineConfig()
ir.inChan = ir.pConfig.inputRecycleChan
if ir.config.Ticker != 0 {
tickLength := time.Duration(ir.config.Ticker) * time.Second
ir.ticker = time.Tick(tickLength)
}
if ir.config.Splitter == "" {
ir.config.Splitter = "NullSplitter"
}
ir.pConfig.makersLock.RLock()
splitters := ir.pConfig.makers["Splitter"]
splitterMaker, ok := splitters[ir.config.Splitter]
ir.pConfig.makersLock.RUnlock()
if !ok {
return fmt.Errorf("%s specifies undefined splitter %s", ir.name,
ir.config.Splitter)
}
if _, err = splitterMaker.MakeRunner(ir.name); err != nil {
return fmt.Errorf("%s error creating splitter %s: %s", ir.name,
ir.config.Splitter, err.Error())
}
if ir.config.Decoder != "" {
_, ok := ir.pConfig.Decoder(ir.config.Decoder)
if !ok {
return fmt.Errorf("no registered '%s' decoder", ir.config.Decoder)
}
}
go ir.Starter(h, wg)
return
}
func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
defer wg.Done()
globals := ir.pConfig.Globals
rh, err := NewRetryHelper(ir.config.Retries)
if err != nil {
ir.LogError(err)
if !ir.IsStoppable() {
globals.ShutDown(1)
}
return
}
for !globals.IsShuttingDown() {
// ir.Input().Run() shouldn't return unless error or shutdown.
err := ir.input.Run(ir, h)
registered, ok := ir.pConfig.InputRunners[ir.name]
if !ok || registered != ir || globals.IsShuttingDown() {
// Plugin was removed deliberately from the list of InputRunners or
// has been superseded by another instance, or we're in shutdown.
// In this case, avoid triggering a Heka shutdown ourselves.
ir.Unregister(ir.pConfig)
return
} else if err == nil {
// Plugin exited cleanly.
break
} else {
// Plugin exited by returning an error.
ir.LogError(err)
// If we don't support restart, just stop here.
recon, ok := ir.plugin.(Restarting)
if !ok {
break
}
// Otherwise we'll execute the Retry config.
recon.CleanupForRestart()
if ir.maker == nil {
ir.pConfig.makersLock.RLock()
ir.maker = ir.pConfig.makers["Input"][ir.name]
ir.pConfig.makersLock.RUnlock()
}
initLoop:
if err = rh.Wait(); err != nil {
// We've used up our retry attempts, exit.
ir.LogError(err)
break
}
if globals.IsShuttingDown() {
break
}
ir.LogMessage(fmt.Sprintf("Restarting (attempt %d/%d)\n",
rh.times, rh.retries))
// If we've not been created elsewhere, call the plugin's Init().
if !ir.transient {
var config interface{}
if config, err = ir.maker.PrepConfig(); err != nil {
// We couldn't reInit the plugin, do a mini-retry loop.
ir.LogError(err)
goto initLoop
}
if err = ir.plugin.Init(config); err != nil {
// We couldn't reInit the plugin, do a mini-retry loop.
ir.LogError(err)
goto initLoop
}
}
}
}
ir.Unregister(ir.pConfig)
// If we're not a stoppable input, trigger Heka shutdown.
if !ir.IsStoppable() {
globals.ShutDown(1)
}
}
func (ir *iRunner) Unregister(pConfig *PipelineConfig) error {
// Send shutdown signal to any decoders that need it.
if len(ir.shutdownWanters) > 0 {
ir.shutdownLock.Lock()
for _, wanter := range ir.shutdownWanters {
wanter.Shutdown()
}
ir.shutdownLock.Unlock()
}
return nil
}
func (ir *iRunner) Inject(pack *PipelinePack) error {
if err := pack.EncodeMsgBytes(); err != nil {
err = fmt.Errorf("encoding message: %s", err.Error())
ir.LogError(err)
pack.recycle()
return err
}
return ir.pConfig.router.Inject(pack)
}
func (ir *iRunner) LogError(err error) {
LogError.Printf("Input '%s' error: %s", ir.name, err)
}
func (ir *iRunner) LogMessage(msg string) {
LogInfo.Printf("Input '%s': %s", ir.name, msg)
}
func (ir *iRunner) getDeliverFunc(token string) (DeliverFunc, DecoderRunner, Decoder) {
var deliver DeliverFunc
decoderName := ir.config.Decoder
// If no decoder is specified we just inject into the router.
if decoderName == "" {
deliver = func(pack *PipelinePack) {
ir.Inject(pack)
}
return deliver, nil, nil
}
ir.pConfig.makersLock.RLock()
_, ok := ir.pConfig.DecoderMakers[decoderName]
ir.pConfig.makersLock.RUnlock()
if !ok {
ir.LogError(fmt.Errorf("decoder '%s' not registered", decoderName))
return nil, nil, nil
}
var fullName string
if token == "" {
fullName = fmt.Sprintf("%s-%s", ir.name, decoderName)
} else {
fullName = fmt.Sprintf("%s-%s-%s", ir.name, decoderName, token)
}
// No synchronous decode means create a DecoderRunner and drop packs on
// its inChan.
if !ir.syncDecode {
dr, _ := ir.pConfig.DecoderRunner(decoderName, fullName)
dr.SetFailureHandling(ir.logDecodeFailures, ir.sendDecodeFailures)
inChan := dr.InChan()
deliver = func(pack *PipelinePack) {
inChan <- pack
}
return deliver, dr, nil
}
// Synchronous decode means create a decoder instance and call Decode
// directly.
decoder, _ := ir.pConfig.Decoder(decoderName)
if wanter, ok := decoder.(WantsDecoderRunner); ok {
dr := NewDecoderRunner(fullName, decoder, 0).(*dRunner)
dr.h = ir.h
dr.router = ir.pConfig.router
wanter.SetDecoderRunner(dr)
}
if wanter, ok := decoder.(WantsDecoderRunnerShutdown); ok {
ir.shutdownLock.Lock()
ir.shutdownWanters = append(ir.shutdownWanters, wanter)
ir.shutdownLock.Unlock()
}
ir.pConfig.allSyncDecodersLock.Lock()
ir.pConfig.allSyncDecoders = append(ir.pConfig.allSyncDecoders, ReportingDecoder{
name: fullName,
decoder: decoder,
})
ir.pConfig.allSyncDecodersLock.Unlock()
// See if the decoder sets TrustMsgBytes for us.
_, trustMsgBytes := decoder.(EncodesMsgBytes)
deliver = func(pack *PipelinePack) {
packs, err := decoder.Decode(pack)
if err != nil {
errMsg := err.Error()
e := fmt.Errorf("decoding: %s", errMsg)
if ir.logDecodeFailures {
ir.LogError(e)
}
if !ir.sendDecodeFailures {
pack.recycle()
return
}
if err = AddDecodeFailureFields(pack.Message, errMsg); err != nil {
ir.LogError(err)
}
pack.TrustMsgBytes = false
ir.Inject(pack)
return
}
for _, p := range packs {
if !trustMsgBytes {
p.TrustMsgBytes = false
}
ir.Inject(p)
}
}
return deliver, nil, decoder
}
func (ir *iRunner) NewDeliverer(token string) Deliverer {
deliver, dRunner, decoder := ir.getDeliverFunc(token)
d := &deliverer{
deliver: deliver,
dRunner: dRunner,
decoder: decoder,
pConfig: ir.pConfig,
}
return d
}
func (ir *iRunner) Deliver(pack *PipelinePack) {
if ir.deliver == nil {
// The lock keeps latecomers from hitting the `deliver` call before the
// first `getDeliverFunc` call has returned.
ir.delivererLock.Lock()
ir.delivererOnce.Do(func() {
ir.deliver, _, _ = ir.getDeliverFunc("")
})
ir.delivererLock.Unlock()
}
ir.deliver(pack)
}
func (ir *iRunner) SynchronousDecode() bool {
return ir.syncDecode
}
func (ir *iRunner) NewSplitterRunner(token string) SplitterRunner {
if ir.config.Splitter == "" {
ir.config.Splitter = "NullSplitter"
}
ir.pConfig.makersLock.RLock()
maker := ir.pConfig.makers["Splitter"][ir.config.Splitter]
ir.pConfig.makersLock.RUnlock()
var name string
if token == "" {
name = fmt.Sprintf("%s-%s", ir.name, ir.config.Splitter)
} else {
name = fmt.Sprintf("%s-%s-%s", ir.name, ir.config.Splitter, token)
}
srInterface, _ := maker.MakeRunner(name)
sr := srInterface.(*sRunner)
sr.ir = ir
ir.pConfig.allSplittersLock.Lock()
ir.pConfig.allSplitters = append(ir.pConfig.allSplitters, sr)
ir.pConfig.allSplittersLock.Unlock()
return sr
}
// Heka PluginRunner for Decoder plugins. Decoding is typically a simpler job,
// so these runners handle a bit more than the others.
type DecoderRunner interface {
PluginRunner
// Returns associated Decoder plugin object.
Decoder() Decoder
// Starts the DecoderRunner so it's listening for incoming PipelinePacks.
// Should decrement the wait group after shut down has completed.
Start(h PluginHelper, wg *sync.WaitGroup)
// Returns the channel into which incoming PipelinePacks to be decoded
// should be dropped.
InChan() chan *PipelinePack
// Returns the running Heka router for direct use by decoder plugins.
Router() MessageRouter
// Fetches a new pack from the input supply and returns it to the caller,
// for decoders that generate multiple messages from a single input
// message.
NewPack() *PipelinePack
// SetFailureHandling allows the InputRunner to specify whether or not
// messages that fail decoding should still be tagged and given to the
// router.
SetFailureHandling(printFailure, sendFailure bool)
}
type dRunner struct {
pRunnerBase
decoder Decoder
inChan chan *PipelinePack
router *messageRouter
h PluginHelper
printFailure bool
sendFailure bool
encodes bool
globals *GlobalConfigStruct
}
// Creates and returns a new (but not yet started) DecoderRunner for the
// provided Decoder plugin.
func NewDecoderRunner(name string, decoder Decoder, chanSize int) DecoderRunner {
dr := &dRunner{
pRunnerBase: pRunnerBase{
name: name,
plugin: decoder.(Plugin),
},
decoder: decoder,
inChan: make(chan *PipelinePack, chanSize),
}
_, dr.encodes = decoder.(EncodesMsgBytes)
return dr
}
func (dr *dRunner) Decoder() Decoder {
return dr.decoder
}
func (dr *dRunner) Start(h PluginHelper, wg *sync.WaitGroup) {
dr.h = h
pConfig := h.PipelineConfig()
dr.router = pConfig.router
dr.globals = pConfig.Globals
if wanter, ok := dr.decoder.(WantsDecoderRunner); ok {
wanter.SetDecoderRunner(dr)
}
go dr.start(h, wg)
}
func (dr *dRunner) start(h PluginHelper, wg *sync.WaitGroup) {
var (
pack *PipelinePack
packs []*PipelinePack
err error
)
for pack = range dr.inChan {
if packs, err = dr.decoder.Decode(pack); packs != nil {
for _, p := range packs {
dr.deliver(p)
}
} else {
if err != nil {
if dr.printFailure {
dr.LogError(err)
}
if dr.sendFailure {
if err = AddDecodeFailureFields(pack.Message, err.Error()); err != nil {
dr.LogError(err)
}
pack.TrustMsgBytes = false
dr.deliver(pack)
continue
}
}
pack.recycle()
continue
}
}
if wanter, ok := dr.decoder.(WantsDecoderRunnerShutdown); ok {
wanter.Shutdown()
}
dr.LogMessage("stopped")
wg.Done()
}
func (dr *dRunner) deliver(pack *PipelinePack) {
if !dr.encodes || !pack.TrustMsgBytes {
err := pack.EncodeMsgBytes()
if err != nil {
err = fmt.Errorf("encoding message: %s", err.Error())
dr.LogError(err)
pack.recycle()
return
}
}
dr.router.Inject(pack)
}
func (dr *dRunner) InChan() chan *PipelinePack {
return dr.inChan
}
func (dr *dRunner) Router() MessageRouter {
return dr.router
}
func (dr *dRunner) NewPack() *PipelinePack {
var pack *PipelinePack
select {
case pack = <-dr.h.PipelineConfig().inputRecycleChan:
case <-dr.globals.abortChan:
}
return pack // Might be nil if we're aborting.
}
func (dr *dRunner) LogError(err error) {
LogError.Printf("Decoder '%s' error: %s", dr.name, err)
}
func (dr *dRunner) LogMessage(msg string) {
LogInfo.Printf("Decoder '%s': %s", dr.name, msg)
}
func (dr *dRunner) SetFailureHandling(printFailure, sendFailure bool) {
dr.printFailure = printFailure
dr.sendFailure = sendFailure
}
// Any decoder that needs access to its DecoderRunner can implement this
// interface and it will be provided at DecoderRunner start time.
type WantsDecoderRunner interface {
SetDecoderRunner(dr DecoderRunner)
}
// Any decoder that needs to know when the DecoderRunner is exiting can
// implement this interface and it will be called on DecoderRunner exit.
type WantsDecoderRunnerShutdown interface {
Shutdown()
}
// Heka PluginRunner interface for Filter type plugins.
type FilterRunner interface {
PluginRunner
// Input channel on which the Filter should listen for incoming messages
// to be processed. Closure of the channel signals shutdown to the filter.
InChan() chan *PipelinePack
// Channel that will be closed when the Filter is exiting, should be used
// by Filter plugins in situations that might block to ensure that shutdown
// messages are received.
StopChan() chan bool
// Associated Filter plugin object.
Filter() Filter
// Associated Filter plugin object using the deprecated API.
OldFilter() OldFilter
// Starts the Filter (so it's listening on the input channel for messages
// to be processed) in a separate goroutine and returns. Should decrement
// the wait group when the Filter has stopped and the goroutine has
// completed.
Start(h PluginHelper, wg *sync.WaitGroup) (err error)
// Returns a ticker channel configured to send ticks at an interval
// specified by the plugin's ticker_interval config value, if provided.
Ticker() (ticker <-chan time.Time)
// Hands provided PipelinePack to the Heka Router for delivery to any
// Filter or Output plugins with a corresponding message_matcher. Returns
// false and doesn't perform message injection if the message would be
// caught by the sending Filter's message_matcher.
Inject(pack *PipelinePack) bool
// Parsing engine for this Filter's message_matcher.
MatchRunner() *MatchRunner
// Retains a pack for future delivery to the plugin when a plugin needs to
// shut down and wants to retain the pack for the next time its running
// properly.
RetainPack(pack *PipelinePack)
// Returns whether or not use_buffering was set in the filter's
// configuration
UsesBuffering() bool
// Updates the queue buffer cursor to indicate that a given message has
// been delivered and can be safely removed from the queue.
UpdateCursor(queueCursor string)
// Returns whether or not the filter is currently generating back-pressure,
// either through the input channels being full or through the disk buffer
// up to 90% of the configured max.
BackPressured() bool
}
// Heka PluginRunner for Output plugins.
type OutputRunner interface {
PluginRunner
// Input channel where Output should be listening for incoming messages.
InChan() chan *PipelinePack
// Channel that will be closed when the Output is exiting, should be used
// by Output plugins in situations that might block to ensure that shutdown
// messages are received.
StopChan() chan bool
// Associated Output plugin instance.
Output() Output
// Associated Output plugin instance using the deprecated API.
OldOutput() OldOutput
// Starts the Output plugin listening on the input channel in a separate
// goroutine and returns. Wait group should be released when the Output
// plugin shuts down cleanly and the goroutine has completed.
Start(h PluginHelper, wg *sync.WaitGroup) (err error)
// Returns a ticker channel configured to send ticks at an interval
// specified by the plugin's ticker_interval config value, if provided.
Ticker() (ticker <-chan time.Time)
// Retains a pack for future delivery to the plugin when a plugin needs
// to shut down and wants to retain the pack for the next time its
// running properly
RetainPack(pack *PipelinePack)
// Parsing engine for this Output's message_matcher.
MatchRunner() *MatchRunner
// Returns an instance of the Encoder specified by the output's config, or
// nil if none was specified. Multiple calls will return the same
// instance.
Encoder() Encoder
// Uses the output's Encoder to encode the message attached to the
// provided PipelinePack. Will prepend a Heka stream framing header if
// use_framing was set to true in the output configuration.
Encode(pack *PipelinePack) (output []byte, err error)
// Returns whether or not use_framing was set to true in the output's
// configuration, i.e. whether or not Heka stream framing will be applied
// to the results of calls to the Encode method.
UsesFraming() bool
// Allows an output to specify whether or not it's using framing.
SetUseFraming(useFraming bool)
// Returns whether or not use_buffering was set in the output's
// configuration
UsesBuffering() bool
// Updates the queue buffer cursor to indicate that a given message has
// been delivered and can be safely removed from the queue.
UpdateCursor(queueCursor string)
// Returns whether or not the output is currently generating back-pressure,
// either through the input channels being full or through the disk buffer
// up to 90% of the configured max.
BackPressured() bool
}
type foRunnerKind int
const (
foUnknown foRunnerKind = iota
foFilter
foOutput
)
func (kind foRunnerKind) String() string {
switch kind {
case foFilter:
return "filter"
case foOutput:
return "output"
}
return "unknown"
}
// This one struct provides the implementation of both FilterRunner and
// OutputRunner interfaces.
type foRunner struct {
processMessageCount int64
dropMessageCount int64
capacity int
pRunnerBase
pluginType string
config CommonFOConfig
matcher *MatchRunner
ticker <-chan time.Time
inChan chan *PipelinePack
backChan chan *PipelinePack
h PluginHelper
retainPack *PipelinePack
leakCount int
encoder Encoder // output only
useFraming bool // output only
canExit bool
useBuffering bool
kind foRunnerKind
pConfig *PipelineConfig
lastErr error
bufReader *BufferReader
stopChan chan bool
}
const pluginPoolSize = 2
// Creates and returns foRunner pointer for use as either a FilterRunner or an
// OutputRunner.
func NewFORunner(name string, plugin Plugin, config CommonFOConfig,
pluginType string, chanSize int) (*foRunner, error) {
runner := &foRunner{
pRunnerBase: pRunnerBase{
name: name,
plugin: plugin,
},
pluginType: pluginType,
config: config,
}
if config.Matcher == "" {
return nil, fmt.Errorf("'%s' missing message matcher", name)
}
if config.Ticker != 0 {
canTick := false
// Check to make sure we know what to do with the ticker interval.
if _, ok := plugin.(OldFilter); ok {
canTick = true
} else if _, ok := plugin.(OldOutput); ok {
canTick = true
} else if _, ok := plugin.(TickerPlugin); ok {
canTick = true
}
if !canTick {
return nil, fmt.Errorf("'%s' can't support a ticker_interval setting", name)
}
}
if config.UseBuffering != nil && *config.UseBuffering {
runner.useBuffering = true
if config.Buffering.FullAction == "" {
config.Buffering.FullAction = "shutdown"
}
switch config.Buffering.FullAction {
case "shutdown", "drop", "block":
default:
msg := "buffer full_action must be 'shutdown', 'drop', or 'block', got '%s'"
return nil, fmt.Errorf(msg, config.Buffering.FullAction)
}
runner.capacity = int(config.Buffering.MaxBufferSize) * 90 / 100
}
var matchChan chan *PipelinePack
if runner.useBuffering {
runner.inChan = make(chan *PipelinePack, pluginPoolSize)
runner.backChan = make(chan *PipelinePack, pluginPoolSize)
for i := 0; i < pluginPoolSize; i++ {
pack := NewPipelinePack(runner.backChan)
pack.BufferedPack = true
pack.DelivErrChan = make(chan error, 1)
runner.backChan <- pack
}
} else {
runner.inChan = make(chan *PipelinePack, chanSize)
matchChan = runner.inChan
runner.capacity = chanSize
}
// matchChan is nil if buffering is used, this is intentional.
matcher, err := NewMatchRunner(config.Matcher, config.Signer, runner, chanSize,
matchChan)
if err != nil {
return nil, fmt.Errorf("Can't create message matcher for '%s': %s", name, err)
}
runner.matcher = matcher
if config.CanExit != nil && *config.CanExit {
runner.canExit = true
}
if config.UseFraming != nil && *config.UseFraming {
runner.useFraming = true
}
if _, ok := plugin.(OldFilter); ok {
runner.kind = foFilter
} else if _, ok := plugin.(OldOutput); ok {
runner.kind = foOutput
} else if _, ok := plugin.(Filter); ok {
runner.kind = foFilter
} else if _, ok := plugin.(Output); ok {
runner.kind = foOutput
} else {
err := fmt.Errorf("FORunner plugin must be filter or output, %s is neither",
name)
return nil, err
}
return runner, nil
}
func (foRunner *foRunner) BackPressured() bool {
if !foRunner.useBuffering {
// reading a channel length is generally fast ~1ns
// we need to check the entire chain back to the router
return len(foRunner.inChan) >= foRunner.capacity ||
foRunner.matcher.InChanLen() >= foRunner.capacity
}
return foRunner.capacity > 0 && foRunner.bufReader.queueSize.Get() >= uint64(foRunner.capacity)
}
func (foRunner *foRunner) waitForBackPressure() error {
globals := foRunner.pConfig.Globals
retryOptions := getDefaultRetryOptions()
retryOptions.MaxDelay = "1s"
retryOptions.MaxRetries = int(globals.FullBufferMaxRetries)
// NewRetryHelper will only return an error if the duration strings don't
// parse. Ours are hard-coded, so this error shouldn't happen.
retry, err := NewRetryHelper(retryOptions)
if err != nil {
return fmt.Errorf("can't create retry helper: %s", err.Error())
}
for !globals.IsShuttingDown() {
bp := foRunner.BackPressured()
if !bp {
return nil
}
err = retry.Wait()
if err != nil {
// We've exhausted our max allowed retries, so we honor the
// buffer's 'full_action' setting and trigger a shutdown if
// necessary.
if foRunner.bufReader.config.FullAction == "shutdown" {
globals.ShutDown(1)
foRunner.LogError(errors.New("back-pressure not resolving: triggering shutdown"))
}
// But we always return `nil` so that regular start up sequence can
// continue.
return nil
}
}
return nil
}