diff --git a/CHANGES.txt b/CHANGES.txt index ca7e316df..87f9077a4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -60,6 +60,8 @@ Features * Added iowait percentage output field in filter procstat (#1888). +* Added support for 'replace_dot_with' flag in ES encoders (#1947). + 0.10.1 (2016-??-??) =================== diff --git a/docs/source/config/encoders/esjson.rst b/docs/source/config/encoders/esjson.rst index 12198ed8f..8f38f9ab0 100644 --- a/docs/source/config/encoders/esjson.rst +++ b/docs/source/config/encoders/esjson.rst @@ -64,6 +64,8 @@ Config: the JSON output. Defaults to including all of the messages dynamic fields. If ``dynamic_fields`` is non-empty, then the ``fields`` list *must* contain "DynamicFields" or an error will be raised. +- replace_dots_with (string): + This specifies a string to use as a replacement in JSON output field names. Example @@ -73,6 +75,8 @@ Example index = "%{Type}-%{%Y.%m.%d}" es_index_from_timestamp = true type_name = "%{Type}" + replace_dots_with = "_" + [ESJsonEncoder.field_mappings] Timestamp = "@timestamp" Severity = "level" diff --git a/docs/source/config/encoders/eslogstashv0.rst b/docs/source/config/encoders/eslogstashv0.rst index 803d019a2..b1199d2e1 100644 --- a/docs/source/config/encoders/eslogstashv0.rst +++ b/docs/source/config/encoders/eslogstashv0.rst @@ -70,6 +70,8 @@ Config: the JSON output. Defaults to including all of the messages dynamic fields. If ``dynamic_fields`` is non-empty, then the ``fields`` list *must* contain "DynamicFields" or an error will be raised. +- replace_dots_with (string): + This specifies a string to use as a replacement in JSON output field names. Example @@ -78,6 +80,7 @@ Example [ESLogstashV0Encoder] es_index_from_timestamp = true type_name = "%{Type}" + replace_dots_with = "_" [ElasticSearchOutput] message_matcher = "Type == 'nginx.access'" diff --git a/plugins/elasticsearch/encoders.go b/plugins/elasticsearch/encoders.go index 07d3771f2..1ee1162cb 100644 --- a/plugins/elasticsearch/encoders.go +++ b/plugins/elasticsearch/encoders.go @@ -11,6 +11,7 @@ # Tanguy Leroux (tlrx.dev@gmail.com) # Rob Miller (rmiller@mozilla.com) # Xavier Lange (xavier.lange@viasat.com) +# John Staford (john@solinea.com) # # ***** END LICENSE BLOCK *****/ @@ -88,12 +89,17 @@ func writeQuotedString(b *bytes.Buffer, str string) { } -func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool) { +func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool, replaceDotsWith string) { if !first { b.WriteString(`,`) } - writeQuotedString(b, f.GetName()) + if replaceDotsWith != "." { + writeQuotedString(b, strings.Replace(f.GetName(), ".", replaceDotsWith, -1)) + } else { + writeQuotedString(b, f.GetName()) + } + b.WriteString(`:`) switch f.GetValueType() { @@ -238,6 +244,7 @@ type ESJsonEncoder struct { fieldMappings *ESFieldMappings dynamicFields []string usesDynamicFields bool + replaceDotsWith string } // Heka fields to ElasticSearch mapping @@ -275,6 +282,8 @@ type ESJsonEncoderConfig struct { // Dynamic fields to be included. Non-empty value raises an error if // 'DynamicFields' is not in Fields []string property. DynamicFields []string `toml:"dynamic_fields"` + // Replace dot (".") characters in JSON field names with a substitute string. + ReplaceDotsWith string `toml:"replace_dots_with"` } func (e *ESJsonEncoder) ConfigStruct() interface{} { @@ -295,6 +304,7 @@ func (e *ESJsonEncoder) ConfigStruct() interface{} { Pid: "Pid", Hostname: "Hostname", }, + ReplaceDotsWith: ".", } config.Fields = fieldChoices[:] @@ -307,6 +317,7 @@ func (e *ESJsonEncoder) Init(config interface{}) (err error) { e.fields = conf.Fields e.timestampFormat = conf.Timestamp e.rawBytesFields = conf.RawBytesFields + e.replaceDotsWith = conf.ReplaceDotsWith e.coord = &ElasticSearchCoordinates{ Index: conf.Index, Type: conf.TypeName, @@ -392,7 +403,7 @@ func (e *ESJsonEncoder) Encode(pack *PipelinePack) (output []byte, err error) { } } } - writeField(first, &buf, field, raw) + writeField(first, &buf, field, raw, e.replaceDotsWith) } } default: @@ -416,6 +427,7 @@ type ESLogstashV0Encoder struct { coord *ElasticSearchCoordinates dynamicFields []string useMessageType bool + replaceDotsWith string } type ESLogstashV0EncoderConfig struct { @@ -440,6 +452,8 @@ type ESLogstashV0EncoderConfig struct { // Dynamic fields to be included. Non-empty value raises an error if // 'DynamicFields' is not in Fields []string property. DynamicFields []string `toml:"dynamic_fields"` + // Replace dot (".") characters in JSON field names with a substitute string. + ReplaceDotsWith string `toml:"replace_dots_with"` } func (e *ESLogstashV0Encoder) ConfigStruct() interface{} { @@ -451,6 +465,7 @@ func (e *ESLogstashV0Encoder) ConfigStruct() interface{} { UseMessageType: false, ESIndexFromTimestamp: false, Id: "", + ReplaceDotsWith: ".", } config.Fields = fieldChoices[:] @@ -464,6 +479,7 @@ func (e *ESLogstashV0Encoder) Init(config interface{}) (err error) { e.fields = conf.Fields e.timestampFormat = conf.Timestamp e.useMessageType = conf.UseMessageType + e.replaceDotsWith = conf.ReplaceDotsWith e.coord = &ElasticSearchCoordinates{ Index: conf.Index, Type: conf.TypeName, @@ -563,7 +579,7 @@ func (e *ESLogstashV0Encoder) Encode(pack *PipelinePack) (output []byte, err err } } } - writeField(firstfield, &buf, field, raw) + writeField(firstfield, &buf, field, raw, e.replaceDotsWith) firstfield = false } } diff --git a/plugins/elasticsearch/encoders_test.go b/plugins/elasticsearch/encoders_test.go index 431fb9910..ea54ce894 100644 --- a/plugins/elasticsearch/encoders_test.go +++ b/plugins/elasticsearch/encoders_test.go @@ -9,6 +9,7 @@ # # Contributor(s): # Tanguy Leroux (tlrx.dev@gmail.com) +# John Stanford (john@solinea.com) # # ***** END LICENSE BLOCK *****/ @@ -74,6 +75,8 @@ func getTestMessageWithFunnyFields() *message.Message { field12.AddValue("jkl;") field12.AddValue("push") field12.AddValue("pull") + field13 := message.NewFieldInit("test.dotted.field.name.string", message.Field_STRING, "") + field13.AddValue("{\"asdf\":123}") msg := &message.Message{} msg.SetType("TEST") @@ -102,6 +105,7 @@ func getTestMessageWithFunnyFields() *message.Message { msg.AddField(field10) msg.AddField(field11) msg.AddField(field12) + msg.AddField(field13) return msg } @@ -164,7 +168,9 @@ func ESEncodersSpec(c gs.Context) { "test_raw_field_bytes", "test_raw_field_string_array", "test_raw_field_bytes_array", + "test.dotted.field.name.string", } + config.ReplaceDotsWith = "_" c.Specify("Should properly encode a message", func() { err := encoder.Init(config) @@ -237,6 +243,7 @@ func ESEncodersSpec(c gs.Context) { c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0) c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) + c.Expect(decoded["@fields"].(map[string]interface{})["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0) }) c.Specify("encodes w/ a different timestamp format", func() { @@ -292,7 +299,7 @@ func ESEncodersSpec(c gs.Context) { c.Expect(len(decoded), gs.Equals, 10) fieldsValInterface := decoded["@fields"] fieldsVal := fieldsValInterface.(map[string]interface{}) - c.Expect(len(fieldsVal), gs.Equals, 13) + c.Expect(len(fieldsVal), gs.Equals, 14) }) }) }) @@ -305,7 +312,9 @@ func ESEncodersSpec(c gs.Context) { "test_raw_field_bytes", "test_raw_field_string_array", "test_raw_field_bytes_array", + "test.dotted.field.name.string", } + config.ReplaceDotsWith = "_" c.Specify("Should properly encode a message", func() { err := encoder.Init(config) @@ -377,6 +386,7 @@ func ESEncodersSpec(c gs.Context) { c.Expect(decoded["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0) c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) + c.Expect(decoded["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0) }) c.Specify("Produces valid JSON when DynamicFields is first configured field and message has no fields", func() { @@ -498,7 +508,7 @@ func ESEncodersSpec(c gs.Context) { decoded := make(map[string]interface{}) err = json.Unmarshal([]byte(lines[1]), &decoded) c.Assume(err, gs.IsNil) - c.Expect(len(decoded), gs.Equals, 22) // 9 base fields and 13 dynamic fields. + c.Expect(len(decoded), gs.Equals, 23) // 9 base fields and 14 dynamic fields. }) c.Specify("when dynamic_fields is specified", func() {