Skip to content

Commit

Permalink
Merge branch 'versions/0.10' into dev
Browse files Browse the repository at this point in the history
Conflicts:
	docs/source/config/inputs/logstreamer.rst
	plugins/logstreamer/logstreamer_input.go
	sandbox/lua/modules/ts_line_protocol.lua
  • Loading branch information
rafrombrc committed Mar 9, 2016
2 parents 08b95b0 + 9b15492 commit fe4697e
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 102 deletions.
25 changes: 25 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,31 @@ Features
0.10.1 (2016-??-??)
===================

Features
--------

* Added `check_data_interval` setting to LogstreamerInput.

* Added `separator` and `maximum_depth` settings to JSON decoder (#1809).

Bug Handling
------------

* Fixed conditional which was causing InfluxDB line protocol encoder to not
correctly honor the `source_value_field` config setting.

* Don't use `os.Exit` in the pipeline's main `Run` function or else any
wrapping deffered functions (such as those that output the cpu and mem
profiles) won't get called.

* Stop zero-length records with no error ending the splitter if there's
more data to read (#1561)

* Fixed gzipped file seeking in logstreamer may cause an OOM exception.

* Fixed LogstreamerInput file rotation missing newly created files with new
names by always triggering LogstreamSet rescan before checking location in
stream (#1452).

0.10.0 (2015-12-30)
===================
Expand Down
2 changes: 1 addition & 1 deletion cmake/externals.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if(INCLUDE_SANDBOX)
externalproject_add(
${SANDBOX_PACKAGE}
GIT_REPOSITORY https://github.com/mozilla-services/lua_sandbox.git
GIT_TAG 4bf74cc480c2306fcffe1e423186e115cc87222d
GIT_TAG 5e22edfcc4bed35305dc0d358e681509dfb9944b
CMAKE_ARGS ${SANDBOX_ARGS}
INSTALL_DIR ${PROJECT_PATH}
)
Expand Down
57 changes: 44 additions & 13 deletions cmd/hekad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ func setGlobalConfigs(config *HekadConfig) (*pipeline.GlobalConfigStruct, string
}

func main() {
exitCode := 0
// `os.Exit` will skip any registered deferred functions, so to support
// exit codes we put it in the first registerred deferred (i.e. the last to
// run), we can set the exitCode and then call `return` to exit with an
// error code.
defer func() {
os.Exit(exitCode)
}()

configPath := flag.String("config", filepath.FromSlash("/etc/hekad.toml"),
"Config file or directory. If directory is specified then all files "+
"in the directory will be loaded.")
Expand All @@ -106,62 +115,80 @@ func main() {

if *version {
fmt.Println(VERSION)
os.Exit(0)
return
}

config, err = LoadHekadConfig(*configPath)
if err != nil {
pipeline.LogError.Fatal("Error reading config: ", err)
pipeline.LogError.Println("Error reading config: ", err)
exitCode = 1
return
}
pipeline.LogInfo.SetFlags(config.LogFlags)
pipeline.LogError.SetFlags(config.LogFlags)
if config.SampleDenominator <= 0 {
pipeline.LogError.Fatalln("'sample_denominator' value must be greater than 0.")
pipeline.LogError.Println("'sample_denominator' value must be greater than 0.")
exitCode = 1
return
}

if _, err = time.ParseDuration(config.MaxPackIdle); err != nil {
pipeline.LogError.Fatalf("Can't parse `max_pack_idle` time duration: %s\n", config.MaxPackIdle)
pipeline.LogError.Printf("Can't parse `max_pack_idle` time duration: %s\n",
config.MaxPackIdle)
exitCode = 1
return
}

globals, cpuProfName, memProfName := setGlobalConfigs(config)

if err = os.MkdirAll(globals.BaseDir, 0755); err != nil {
pipeline.LogError.Fatalf("Error creating 'base_dir' %s: %s", config.BaseDir, err)
pipeline.LogError.Printf("Error creating 'base_dir' %s: %s", config.BaseDir, err)
exitCode = 1
return
}

if config.MaxMessageSize > 1024 {
message.SetMaxMessageSize(config.MaxMessageSize)
} else if config.MaxMessageSize > 0 {
pipeline.LogError.Fatalln("Error: 'max_message_size' setting must be greater than 1024.")
pipeline.LogError.Println("Error: 'max_message_size' setting must be greater than 1024.")
exitCode = 1
return
}
if config.PidFile != "" {
contents, err := ioutil.ReadFile(config.PidFile)
if err == nil {
pid, err := strconv.Atoi(strings.TrimSpace(string(contents)))
if err != nil {
pipeline.LogError.Fatalf("Error reading proccess id from pidfile '%s': %s",
pipeline.LogError.Printf("Error reading proccess id from pidfile '%s': %s",
config.PidFile, err)
exitCode = 1
return
}

process, err := os.FindProcess(pid)

// on Windows, err != nil if the process cannot be found
if runtime.GOOS == "windows" {
if err == nil {
pipeline.LogError.Fatalf("Process %d is already running.", pid)
pipeline.LogError.Printf("Process %d is already running.", pid)
exitCode = 1
return
}
} else if process != nil {
// err is always nil on POSIX, so we have to send the process
// a signal to check whether it exists
if err = process.Signal(syscall.Signal(0)); err == nil {
pipeline.LogError.Fatalf("Process %d is already running.", pid)
pipeline.LogError.Printf("Process %d is already running.", pid)
exitCode = 1
return
}
}
}
if err = ioutil.WriteFile(config.PidFile, []byte(strconv.Itoa(os.Getpid())),
0644); err != nil {

pipeline.LogError.Fatalf("Unable to write pidfile '%s': %s", config.PidFile, err)
pipeline.LogError.Printf("Unable to write pidfile '%s': %s", config.PidFile, err)
exitCode = 1
}
pipeline.LogInfo.Printf("Wrote pid to pidfile '%s'", config.PidFile)
defer func() {
Expand All @@ -174,7 +201,9 @@ func main() {
if cpuProfName != "" {
profFile, err := os.Create(cpuProfName)
if err != nil {
pipeline.LogError.Fatalln(err)
pipeline.LogError.Println(err)
exitCode = 1
return
}

pprof.StartCPUProfile(profFile)
Expand All @@ -198,9 +227,11 @@ func main() {
// Set up and load the pipeline configuration and start the daemon.
pipeconf := pipeline.NewPipelineConfig(globals)
if err = loadFullConfig(pipeconf, configPath); err != nil {
pipeline.LogError.Fatal("Error reading config: ", err)
pipeline.LogError.Println("Error reading config: ", err)
exitCode = 1
return
}
pipeline.Run(pipeconf)
exitCode = pipeline.Run(pipeconf)
}

func loadFullConfig(pipeconf *pipeline.PipelineConfig, configPath *string) (err error) {
Expand Down
12 changes: 6 additions & 6 deletions docs/source/config/inputs/http.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,12 @@ Config:
.. versionadded:: 0.5

The HTTP method to use for the request. Defaults to "GET".
- headers (subsection):
.. versionadded:: 0.5

Subsection defining headers for the request. By default the User-Agent
header is set to "Heka"
- body (string):
.. versionadded:: 0.5

The request body (e.g. for an HTTP POST request). No default body is
specified.
- username (string):
- user (string):
.. versionadded:: 0.5

The username for HTTP Basic Authentication. No default username is
Expand All @@ -79,6 +74,11 @@ Config:

Severity level of errors, unreachable connections, and non-200 responses
of successful HTTP requests. Defaults to 1 (alert).
- headers (subsection):
.. versionadded:: 0.5

Subsection defining headers for the request. By default the User-Agent
header is set to "Heka"

Example:

Expand Down
7 changes: 7 additions & 0 deletions docs/source/config/inputs/logstreamer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ Config:
Defaults to "TokenSplitter", which will split the log stream into one
Heka message per line.

.. versionadded:: 0.10

- check_data_interval (string)
A time duration string. This interval is how often streams will be checked
for new data. Defaults to "250ms". If the plugin processes many logstreams,
you may increase this value to reduce the CPU load.

.. versionadded:: 0.11

- initial_tail (bool, optional, default: false):
Expand Down
10 changes: 8 additions & 2 deletions logstreamer/filehandling.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,20 @@ type Logstream struct {
saveBuffer []byte
// Records whether the prior read hit an EOF
priorEOF bool
// LogstreamSet to which this stream belongs, needed so we can trigger
// file rescanning when necessary.
set *LogstreamSet
}

func NewLogstream(logfiles Logfiles, position *LogstreamLocation) *Logstream {
func NewLogstream(logfiles Logfiles, position *LogstreamLocation,
set *LogstreamSet) *Logstream {

return &Logstream{
lfMutex: new(sync.RWMutex),
logfiles: logfiles,
position: position,
saveBuffer: make([]byte, 0, 200),
set: set,
}
}

Expand Down Expand Up @@ -474,7 +480,7 @@ func (ls *LogstreamSet) ScanForLogstreams() (result []string, errors *MultipleEr
errors.AddMessage(err.Error())
position.Reset()
}
logstream = NewLogstream(nil, position)
logstream = NewLogstream(nil, position, ls)
}

// Add an error if there's multiple logfiles but no priority for sorting
Expand Down
34 changes: 22 additions & 12 deletions logstreamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ func (l *Logstream) NewerFileAvailable() (file string, ok bool) {

if ok {
// 1. NO - Try and find our location

// First make sure our file list is current. Ignore errors, since we
// can't do anything about them here anyway, scanning errors should be
// reported during the next ticker interval rescan.
l.set.ScanForLogstreams()

fd, _, err := l.LocatePriorLocation(false)

if err != nil && IsFileError(err) {
Expand All @@ -244,9 +250,9 @@ func (l *Logstream) NewerFileAvailable() (file string, ok bool) {
fd.Close()
}

// Unable to locate prior position in our file-stream, are there
// any logfiles?
if err != nil {
// Unable to locate prior position in our file-stream, are there
// any logfiles?
l.lfMutex.RLock()
defer l.lfMutex.RUnlock()
if len(l.logfiles) > 0 {
Expand Down Expand Up @@ -310,10 +316,10 @@ func (l *Logstream) FileHashMismatch() bool {
return false
}

// Locate and return a file handle seeked to the appropriate location. An error will be
// returned if the prior location cannot be located.
// If the logfile this location for has changed names, the position will be updated to
// reflect the move.
// Locate and return a file handle seeked to the appropriate location. An
// error will be returned if the prior location cannot be located. If the
// logfile this location for has changed names, the position will be updated
// to reflect the move.
func (l *Logstream) LocatePriorLocation(checkFilename bool) (fd *os.File, reader io.Reader,
err error) {

Expand All @@ -336,12 +342,13 @@ func (l *Logstream) LocatePriorLocation(checkFilename bool) (fd *os.File, reader
}
}

// Unable to locate the file, or the position wasn't where we thought it should be.
// Start systematically searching all the files for this location to see if it was
// shuffled around.
// TODO: Would be more efficient to start searching backwards from where we are
// in the logstream at the moment.
for _, logfile := range l.logfiles {
// Unable to locate the file, or the position wasn't where we thought it
// should be. Start systematically searching all the files for this
// location to see if it was shuffled around. We start at the end and work
// backwards under the assumption that we're probably much closer to the
// end of the stream than the beginning.
for i := len(l.logfiles) - 1; i >= 0; i-- {
logfile := l.logfiles[i]
// Check that the file is large enough for our seek position
info, err = os.Stat(logfile.FileName)
if err != nil {
Expand All @@ -364,6 +371,9 @@ func (l *Logstream) LocatePriorLocation(checkFilename bool) (fd *os.File, reader
return
}
err = nil // Reset our error to nil
if fd != nil {
fd.Close()
}
}
// Set our default error since we were unable to locate the position
err = errors.New("Unable to locate position in the stream")
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (p *PipelinePack) EncodeMsgBytes() error {
// Main function driving Heka execution. Loads config, initializes PipelinePack
// pools, and starts all the runners. Then it listens for signals and drives
// the shutdown process when that is triggered.
func Run(config *PipelineConfig) {
func Run(config *PipelineConfig) (exitCode int) {
LogInfo.Println("Starting hekad...")

var outputsWg sync.WaitGroup
Expand Down Expand Up @@ -404,7 +404,7 @@ func Run(config *PipelineConfig) {
}

LogInfo.Println("Shutdown complete.")
os.Exit(globals.exitCode)
return globals.exitCode
}

func sandboxAbort(config *PipelineConfig) {
Expand Down
Loading

0 comments on commit fe4697e

Please sign in to comment.