Skip to content

Commit

Permalink
Make sure we explicitly prep the config every time we try to
Browse files Browse the repository at this point in the history
use it when restarting plugins.
  • Loading branch information
rafrombrc committed Nov 20, 2015
1 parent 0af54c2 commit 5818e56
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Features
Bug Handling
------------

* Fixed error where restarting plugins were losing specified configuration
(#1756).

* Fixed config error where global `max_pack_idle` setting was the wrong type
and was being ignored (#1778).

Expand Down
58 changes: 37 additions & 21 deletions pipeline/plugin_runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,21 @@ type InputRunner interface {

type iRunner struct {
pRunnerBase
input Input
config CommonInputConfig
pConfig *PipelineConfig
inChan chan *PipelinePack
ticker <-chan time.Time
transient bool
syncDecode bool
sendDecodeFailures bool
logDecodeFailures bool
deliver DeliverFunc
delivererOnce sync.Once
delivererLock sync.Mutex
canExit bool
shutdownWanters []WantsDecoderRunnerShutdown
shutdownLock sync.Mutex
input Input
config CommonInputConfig
pConfig *PipelineConfig
inChan chan *PipelinePack
ticker <-chan time.Time
transient bool
syncDecode bool
sendDecodeFailures bool
logDecodeFailures bool
deliver DeliverFunc
delivererOnce sync.Once
delivererLock sync.Mutex
canExit bool
shutdownWanters []WantsDecoderRunnerShutdown
shutdownLock sync.Mutex
}

func (ir *iRunner) Ticker() (ticker <-chan time.Time) {
Expand Down Expand Up @@ -344,7 +344,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
break
}

// Otherwise we'll execute the Retry config
// Otherwise we'll execute the Retry config.
recon.CleanupForRestart()
if ir.maker == nil {
ir.pConfig.makersLock.RLock()
Expand All @@ -364,10 +364,16 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
ir.LogMessage(fmt.Sprintf("Restarting (attempt %d/%d)\n",
rh.times, rh.retries))

// If we've not been created elsewhere, call the plugin's Init()
// If we've not been created elsewhere, call the plugin's Init().
if !ir.transient {
if err = ir.plugin.Init(ir.maker.Config()); err != nil {
// We couldn't reInit the plugin, do a mini-retry loop
var config interface{}
if config, err = ir.maker.PrepConfig(); err != nil {
// We couldn't reInit the plugin, do a mini-retry loop.
ir.LogError(err)
goto initLoop
}
if err = ir.plugin.Init(config); err != nil {
// We couldn't reInit the plugin, do a mini-retry loop.
ir.LogError(err)
goto initLoop
}
Expand Down Expand Up @@ -1251,7 +1257,12 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
break
}
foRunner.LogMessage("now restarting")
if err = foRunner.plugin.Init(foRunner.maker.Config()); err != nil {
var config interface{}
if config, err = foRunner.maker.PrepConfig(); err != nil {
foRunner.LogError(err)
goto initLoop
}
if err = foRunner.plugin.Init(config); err != nil {
foRunner.LogError(err)
goto initLoop
}
Expand Down Expand Up @@ -1497,7 +1508,12 @@ func (foRunner *foRunner) OldStarter(helper PluginHelper, wg *sync.WaitGroup) {
break
}
foRunner.LogMessage("now restarting")
if err = foRunner.plugin.Init(foRunner.maker.Config()); err != nil {
var config interface{}
if config, err = foRunner.maker.PrepConfig(); err != nil {
foRunner.LogError(err)
goto initLoop
}
if err = foRunner.plugin.Init(config); err != nil {
foRunner.LogError(err)
goto initLoop
}
Expand Down

0 comments on commit 5818e56

Please sign in to comment.