Skip to content

Commit

Permalink
Delay startup when buffered plugin has full buffer to give time
Browse files Browse the repository at this point in the history
for back-pressure to be relieved, add `full_buffer_max_retries`
global config setting, exit w/ non-zero exit code on error cases.
  • Loading branch information
rafrombrc committed Nov 18, 2015
1 parent 5a332fb commit bf2fa29
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 37 deletions.
4 changes: 3 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Features
* Adder `log_flags` to hekad config, to control the prefix for STDOUT and
STDERR logs.


Bug Handling
------------

Expand Down Expand Up @@ -62,6 +61,9 @@ Bug Handling
* Fixed panic that was occurring when loading a config file or directory that
exists but which registers no plugins (#1597).

* Delay start up when a buffered plugin's buffer is at capacity to give the
back-pressure time to resolve (#1738).

* Fixed bug where LogStreamerInput would sometimes loop infinitely reading the
same file over and over when reading gzipped log files.

Expand Down
2 changes: 2 additions & 0 deletions cmd/hekad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type HekadConfig struct {
Hostname string
MaxMessageSize uint32 `toml:"max_message_size"`
LogFlags int `toml:"log_flags"`
FullBufferMaxRetries uint32 `toml:"full_buffer_max_retries"`
}

func LoadHekadConfig(configPath string) (config *HekadConfig, err error) {
Expand All @@ -71,6 +72,7 @@ func LoadHekadConfig(configPath string) (config *HekadConfig, err error) {
PidFile: "",
Hostname: hostname,
LogFlags: log.LstdFlags,
FullBufferMaxRetries: 10,
}

var configFile map[string]toml.Primitive
Expand Down
1 change: 1 addition & 0 deletions cmd/hekad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func setGlobalConfigs(config *HekadConfig) (*pipeline.GlobalConfigStruct, string
globals.ShareDir = config.ShareDir
globals.SampleDenominator = config.SampleDenominator
globals.Hostname = config.Hostname
globals.FullBufferMaxRetries = uint(config.FullBufferMaxRetries)

return globals, cpuProfName, memProfName
}
Expand Down
8 changes: 8 additions & 0 deletions docs/source/config/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ Config:
and time, the default) or 0 (no prefix). See
`https://golang.org/pkg/log/#pkg-constants Go documentation`_ for details.

- full_buffer_max_retries (int):
When Heka shuts down due to a buffer filling to capacity, the next time
Heka starts it will delay startup briefly to give the buffer a chance to
drain, to alleviate the back-pressure. This setting specifies the maximum
number of intervals (max 1s in duration) Heka should wait for the buffer
size to get below 90% of capacity before deciding that the issue is not
resolved and continuing startup (or shutting down).

Example hekad.toml file
=======================

Expand Down
29 changes: 18 additions & 11 deletions pipeline/pipeline_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ type GlobalConfigStruct struct {
MaxPackIdle time.Duration
stopping bool
stoppingMutex sync.RWMutex
shutdownOnce sync.Once
BaseDir string
ShareDir string
SampleDenominator int
sigChan chan os.Signal
Hostname string
abortChan chan struct{}
FullBufferMaxRetries uint
exitCode int
}

// Creates a GlobalConfigStruct object populated w/ default values.
Expand Down Expand Up @@ -87,10 +90,13 @@ func (g *GlobalConfigStruct) SigChan() chan os.Signal {
// This method returns immediately by spawning a goroutine to do to
// work so that the caller won't end up blocking part of the shutdown
// sequence
func (g *GlobalConfigStruct) ShutDown() {
go func() {
g.sigChan <- syscall.SIGINT
}()
func (g *GlobalConfigStruct) ShutDown(exitCode int) {
g.shutdownOnce.Do(func() {
g.exitCode = exitCode
go func() {
g.sigChan <- syscall.SIGINT
}()
})
}

func (g *GlobalConfigStruct) IsShuttingDown() (stopping bool) {
Expand Down Expand Up @@ -251,9 +257,9 @@ func (p *PipelinePack) EncodeMsgBytes() error {
return err
}

// Main function driving Heka execution. Loads config, initializes
// PipelinePack pools, and starts all the runners. Then it listens for signals
// and drives the shutdown process when that is triggered.
// Main function driving Heka execution. Loads config, initializes PipelinePack
// pools, and starts all the runners. Then it listens for signals and drives
// the shutdown process when that is triggered.
func Run(config *PipelineConfig) {
LogInfo.Println("Starting hekad...")

Expand All @@ -268,7 +274,7 @@ func Run(config *PipelineConfig) {
LogError.Printf("Output '%s' failed to start: %s", name, err)
outputsWg.Done()
if !output.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
continue
}
Expand All @@ -281,7 +287,7 @@ func Run(config *PipelineConfig) {
LogError.Printf("Filter '%s' failed to start: %s", name, err)
config.filtersWg.Done()
if !filter.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
continue
}
Expand Down Expand Up @@ -319,7 +325,7 @@ func Run(config *PipelineConfig) {
LogError.Printf("Input '%s' failed to start: %s", name, err)
config.inputsWg.Done()
if !input.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
continue
}
Expand Down Expand Up @@ -398,6 +404,7 @@ func Run(config *PipelineConfig) {
}

LogInfo.Println("Shutdown complete.")
os.Exit(globals.exitCode)
}

func sandboxAbort(config *PipelineConfig) {
Expand Down Expand Up @@ -442,6 +449,6 @@ func sandboxAbort(config *PipelineConfig) {
// and abort any sandboxes that are wedged inside process_message or
// timer_event.
config.allReportsStdout()
config.Globals.ShutDown()
config.Globals.ShutDown(1)
close(config.Globals.abortChan)
}
85 changes: 63 additions & 22 deletions pipeline/plugin_runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,21 @@ type InputRunner interface {

type iRunner struct {
pRunnerBase
input Input
config CommonInputConfig
pConfig *PipelineConfig
inChan chan *PipelinePack
ticker <-chan time.Time
transient bool
syncDecode bool
sendDecodeFailures bool
logDecodeFailures bool
deliver DeliverFunc
delivererOnce sync.Once
delivererLock sync.Mutex
canExit bool
shutdownWanters []WantsDecoderRunnerShutdown
shutdownLock sync.Mutex
input Input
config CommonInputConfig
pConfig *PipelineConfig
inChan chan *PipelinePack
ticker <-chan time.Time
transient bool
syncDecode bool
sendDecodeFailures bool
logDecodeFailures bool
deliver DeliverFunc
delivererOnce sync.Once
delivererLock sync.Mutex
canExit bool
shutdownWanters []WantsDecoderRunnerShutdown
shutdownLock sync.Mutex
}

func (ir *iRunner) Ticker() (ticker <-chan time.Time) {
Expand Down Expand Up @@ -314,7 +314,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
if err != nil {
ir.LogError(err)
if !ir.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
return
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {

// If we're not a stoppable input, trigger Heka shutdown.
if !ir.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
}

Expand Down Expand Up @@ -957,10 +957,43 @@ func (foRunner *foRunner) BackPressured() bool {
return len(foRunner.inChan) >= foRunner.capacity ||
foRunner.matcher.InChanLen() >= foRunner.capacity
}

return foRunner.capacity > 0 && foRunner.bufReader.queueSize.Get() >= uint64(foRunner.capacity)
}

func (foRunner *foRunner) waitForBackPressure() error {
globals := foRunner.pConfig.Globals
retryOptions := getDefaultRetryOptions()
retryOptions.MaxDelay = "1s"
retryOptions.MaxRetries = int(globals.FullBufferMaxRetries)
// NewRetryHelper will only return an error if the duration strings don't
// parse. Ours are hard-coded, so this error shouldn't happen.
retry, err := NewRetryHelper(retryOptions)
if err != nil {
return fmt.Errorf("can't create retry helper: %s", err.Error())
}
for !globals.IsShuttingDown() {
bp := foRunner.BackPressured()
fmt.Println("back-pressured?: ", bp)
if !bp {
return nil
}
err = retry.Wait()
if err != nil {
// We've exhausted our max allowed retries, so we honor the
// buffer's 'full_action' setting and trigger a shutdown if
// necessary.
if foRunner.bufReader.config.FullAction == "shutdown" {
globals.ShutDown(1)
foRunner.LogError(errors.New("back-pressure not resolving: triggering shutdown"))
}
// But we always return `nil` so that regular start up sequence can
// continue.
return nil
}
}
return nil
}

func (foRunner *foRunner) Start(h PluginHelper, wg *sync.WaitGroup) (err error) {
foRunner.h = h
foRunner.pConfig = h.PipelineConfig()
Expand Down Expand Up @@ -1034,6 +1067,14 @@ func (foRunner *foRunner) Start(h PluginHelper, wg *sync.WaitGroup) (err error)
} else {
go foRunner.OldStarter(h, wg)
}

if foRunner.useBuffering && foRunner.BackPressured() {
foRunner.LogMessage("Delaying start while trying to relieve back-pressure...")
if err = foRunner.waitForBackPressure(); err != nil {
return err
}
}

return
}

Expand Down Expand Up @@ -1145,7 +1186,7 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
if err != nil {
foRunner.LogError(err)
if !foRunner.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
return
}
Expand Down Expand Up @@ -1183,7 +1224,7 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
// No more retries.
foRunner.lastErr = err
if !foRunner.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
return
}
Expand Down Expand Up @@ -1308,7 +1349,7 @@ func (foRunner *foRunner) exit() {
// Also, if this isn't a "stoppable" plugin we shut everything down.
if !foRunner.IsStoppable() {
foRunner.LogMessage("has stopped, shutting down.")
foRunner.pConfig.Globals.ShutDown()
foRunner.pConfig.Globals.ShutDown(1)
return
}

Expand Down Expand Up @@ -1424,7 +1465,7 @@ func (foRunner *foRunner) OldStarter(helper PluginHelper, wg *sync.WaitGroup) {
if err != nil {
foRunner.LogError(err)
if !foRunner.IsStoppable() {
globals.ShutDown()
globals.ShutDown(1)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (mr *MatchRunner) deliver(pack *PipelinePack) error {
if err == QueueIsFull {
switch mr.bufFeeder.Config.FullAction {
case "shutdown":
mr.globals.ShutDown()
mr.globals.ShutDown(1)
case "block":
for {
err = mr.bufFeeder.QueueRecord(pack)
Expand Down
4 changes: 2 additions & 2 deletions sandbox/plugins/sandbox_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *SandboxDecoder) SetDecoderRunner(dr pipeline.DecoderRunner) {
s.sb.Destroy("")
s.sb = nil
}
s.pConfig.Globals.ShutDown()
s.pConfig.Globals.ShutDown(1)
return
}

Expand Down Expand Up @@ -306,7 +306,7 @@ func (s *SandboxDecoder) Decode(pack *pipeline.PipelinePack) (packs []*pipeline.
if retval > 0 {
err = fmt.Errorf("FATAL: %s", s.sb.LastError())
s.dRunner.LogError(err)
s.pConfig.Globals.ShutDown()
s.pConfig.Globals.ShutDown(1)
}
if retval < 0 {
atomic.AddInt64(&s.processMessageFailures, 1)
Expand Down

0 comments on commit bf2fa29

Please sign in to comment.