Skip to content

Commit

Permalink
Merge pull request mozilla-services#1779 from mozilla-services/pack_i…
Browse files Browse the repository at this point in the history
…dle_timeout_fix

Fix global `max_pack_idle` setting
  • Loading branch information
Mike Trinkala committed Nov 3, 2015
2 parents 0561fe1 + 4cf48d0 commit 5a332fb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 46 deletions.
28 changes: 3 additions & 25 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,3 @@
0.11.0 (2015-??-??)
===================

Backwards Incompatibilities
---------------------------

* StatAccumInput Input: percent_threshold param type convert to slice

Bug Handling
------------

* Updated Sarama dependency from pre-1.0 release fork to fork (with only test
code changes) of Sarama 1.5.0 release.

Features
--------

* Allow TcpOutput to re-establish the connection after a configurable number of
successfully delivered messages.

* Added `git_clone_to_path` to the cmake build to allow git repos to be cloned
into alternate locations; useful for relocating forks of Go packages into
their original import paths.


0.10.0 (2015-??-??)
===================

Expand All @@ -49,6 +24,9 @@ Features
Bug Handling
------------

* Fixed config error where global `max_pack_idle` setting was the wrong type
and was being ignored (#1778).

* Fixed race condition in InputRunner's default deliverer initialization.

* Set hostname correctly in the Graylog decoder (#1663).
Expand Down
41 changes: 20 additions & 21 deletions cmd/hekad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,37 @@ package main

import (
"fmt"
"github.com/bbangert/toml"
"github.com/mozilla-services/heka/pipeline"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"

"github.com/bbangert/toml"
"github.com/mozilla-services/heka/pipeline"
)

type HekadConfig struct {
Maxprocs int `toml:"maxprocs"`
PoolSize int `toml:"poolsize"`
ChanSize int `toml:"plugin_chansize"`
CpuProfName string `toml:"cpuprof"`
MemProfName string `toml:"memprof"`
MaxMsgLoops uint `toml:"max_message_loops"`
MaxMsgProcessInject uint `toml:"max_process_inject"`
MaxMsgProcessDuration uint64 `toml:"max_process_duration"`
MaxMsgTimerInject uint `toml:"max_timer_inject"`
MaxPackIdle time.Duration `toml:"max_pack_idle"`
BaseDir string `toml:"base_dir"`
ShareDir string `toml:"share_dir"`
SampleDenominator int `toml:"sample_denominator"`
PidFile string `toml:"pid_file"`
Maxprocs int `toml:"maxprocs"`
PoolSize int `toml:"poolsize"`
ChanSize int `toml:"plugin_chansize"`
CpuProfName string `toml:"cpuprof"`
MemProfName string `toml:"memprof"`
MaxMsgLoops uint `toml:"max_message_loops"`
MaxMsgProcessInject uint `toml:"max_process_inject"`
MaxMsgProcessDuration uint64 `toml:"max_process_duration"`
MaxMsgTimerInject uint `toml:"max_timer_inject"`
MaxPackIdle string `toml:"max_pack_idle"`
BaseDir string `toml:"base_dir"`
ShareDir string `toml:"share_dir"`
SampleDenominator int `toml:"sample_denominator"`
PidFile string `toml:"pid_file"`
Hostname string
MaxMessageSize uint32 `toml:"max_message_size"`
LogFlags int `toml:"log_flags"`
MaxMessageSize uint32 `toml:"max_message_size"`
LogFlags int `toml:"log_flags"`
}

func LoadHekadConfig(configPath string) (config *HekadConfig, err error) {
idle, _ := time.ParseDuration("2m")
hostname, err := os.Hostname()
if err != nil {
return
Expand All @@ -65,7 +64,7 @@ func LoadHekadConfig(configPath string) (config *HekadConfig, err error) {
MaxMsgProcessInject: 1,
MaxMsgProcessDuration: 100000,
MaxMsgTimerInject: 10,
MaxPackIdle: idle,
MaxPackIdle: "2m",
BaseDir: filepath.FromSlash("/var/cache/hekad"),
ShareDir: filepath.FromSlash("/usr/share/heka"),
SampleDenominator: 1000,
Expand Down
8 changes: 8 additions & 0 deletions cmd/hekad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"strconv"
"strings"
"syscall"
"time"

"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
Expand Down Expand Up @@ -67,6 +68,7 @@ func setGlobalConfigs(config *HekadConfig) (*pipeline.GlobalConfigStruct, string
maxMsgProcessInject := config.MaxMsgProcessInject
maxMsgProcessDuration := config.MaxMsgProcessDuration
maxMsgTimerInject := config.MaxMsgTimerInject
maxPackIdle, _ := time.ParseDuration(config.MaxPackIdle)

runtime.GOMAXPROCS(maxprocs)

Expand All @@ -80,6 +82,7 @@ func setGlobalConfigs(config *HekadConfig) (*pipeline.GlobalConfigStruct, string
globals.MaxMsgProcessInject = maxMsgProcessInject
globals.MaxMsgProcessDuration = maxMsgProcessDuration
globals.MaxMsgTimerInject = maxMsgTimerInject
globals.MaxPackIdle = maxPackIdle
globals.BaseDir = config.BaseDir
globals.ShareDir = config.ShareDir
globals.SampleDenominator = config.SampleDenominator
Expand Down Expand Up @@ -114,6 +117,11 @@ func main() {
if config.SampleDenominator <= 0 {
pipeline.LogError.Fatalln("'sample_denominator' value must be greater than 0.")
}

if _, err = time.ParseDuration(config.MaxPackIdle); err != nil {
pipeline.LogError.Fatalf("Can't parse `max_pack_idle` time duration: %s\n", config.MaxPackIdle)
}

globals, cpuProfName, memProfName := setGlobalConfigs(config)

if err = os.MkdirAll(globals.BaseDir, 0755); err != nil {
Expand Down

0 comments on commit 5a332fb

Please sign in to comment.