Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

added support for 'replace_dot_with' flag in ES encoders #1947

Merged
merged 3 commits into from
Jun 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ Features

* Added iowait percentage output field in filter procstat (#1888).

* Added support for 'replace_dot_with' flag in ES encoders (#1947).

0.10.1 (2016-??-??)
===================

Expand Down
4 changes: 4 additions & 0 deletions docs/source/config/encoders/esjson.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Config:
the JSON output. Defaults to including all of the messages dynamic
fields. If ``dynamic_fields`` is non-empty, then the ``fields`` list *must*
contain "DynamicFields" or an error will be raised.
- replace_dots_with (string):
This specifies a string to use as a replacement in JSON output field names.

Example

Expand All @@ -73,6 +75,8 @@ Example
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
replace_dots_with = "_"

[ESJsonEncoder.field_mappings]
Timestamp = "@timestamp"
Severity = "level"
Expand Down
3 changes: 3 additions & 0 deletions docs/source/config/encoders/eslogstashv0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ Config:
the JSON output. Defaults to including all of the messages dynamic
fields. If ``dynamic_fields`` is non-empty, then the ``fields`` list *must*
contain "DynamicFields" or an error will be raised.
- replace_dots_with (string):
This specifies a string to use as a replacement in JSON output field names.

Example

Expand All @@ -78,6 +80,7 @@ Example
[ESLogstashV0Encoder]
es_index_from_timestamp = true
type_name = "%{Type}"
replace_dots_with = "_"

[ElasticSearchOutput]
message_matcher = "Type == 'nginx.access'"
Expand Down
24 changes: 20 additions & 4 deletions plugins/elasticsearch/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Tanguy Leroux ([email protected])
# Rob Miller ([email protected])
# Xavier Lange ([email protected])
# John Staford ([email protected])
#
# ***** END LICENSE BLOCK *****/

Expand Down Expand Up @@ -88,12 +89,17 @@ func writeQuotedString(b *bytes.Buffer, str string) {

}

func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool) {
func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool, replaceDotsWith string) {
if !first {
b.WriteString(`,`)
}

writeQuotedString(b, f.GetName())
if replaceDotsWith != "." {
writeQuotedString(b, strings.Replace(f.GetName(), ".", replaceDotsWith, -1))
} else {
writeQuotedString(b, f.GetName())
}

b.WriteString(`:`)

switch f.GetValueType() {
Expand Down Expand Up @@ -238,6 +244,7 @@ type ESJsonEncoder struct {
fieldMappings *ESFieldMappings
dynamicFields []string
usesDynamicFields bool
replaceDotsWith string
}

// Heka fields to ElasticSearch mapping
Expand Down Expand Up @@ -275,6 +282,8 @@ type ESJsonEncoderConfig struct {
// Dynamic fields to be included. Non-empty value raises an error if
// 'DynamicFields' is not in Fields []string property.
DynamicFields []string `toml:"dynamic_fields"`
// Replace dot (".") characters in JSON field names with a substitute string.
ReplaceDotsWith string `toml:"replace_dots_with"`
}

func (e *ESJsonEncoder) ConfigStruct() interface{} {
Expand All @@ -295,6 +304,7 @@ func (e *ESJsonEncoder) ConfigStruct() interface{} {
Pid: "Pid",
Hostname: "Hostname",
},
ReplaceDotsWith: ".",
}

config.Fields = fieldChoices[:]
Expand All @@ -307,6 +317,7 @@ func (e *ESJsonEncoder) Init(config interface{}) (err error) {
e.fields = conf.Fields
e.timestampFormat = conf.Timestamp
e.rawBytesFields = conf.RawBytesFields
e.replaceDotsWith = conf.ReplaceDotsWith
e.coord = &ElasticSearchCoordinates{
Index: conf.Index,
Type: conf.TypeName,
Expand Down Expand Up @@ -392,7 +403,7 @@ func (e *ESJsonEncoder) Encode(pack *PipelinePack) (output []byte, err error) {
}
}
}
writeField(first, &buf, field, raw)
writeField(first, &buf, field, raw, e.replaceDotsWith)
}
}
default:
Expand All @@ -416,6 +427,7 @@ type ESLogstashV0Encoder struct {
coord *ElasticSearchCoordinates
dynamicFields []string
useMessageType bool
replaceDotsWith string
}

type ESLogstashV0EncoderConfig struct {
Expand All @@ -440,6 +452,8 @@ type ESLogstashV0EncoderConfig struct {
// Dynamic fields to be included. Non-empty value raises an error if
// 'DynamicFields' is not in Fields []string property.
DynamicFields []string `toml:"dynamic_fields"`
// Replace dot (".") characters in JSON field names with a substitute string.
ReplaceDotsWith string `toml:"replace_dots_with"`
}

func (e *ESLogstashV0Encoder) ConfigStruct() interface{} {
Expand All @@ -451,6 +465,7 @@ func (e *ESLogstashV0Encoder) ConfigStruct() interface{} {
UseMessageType: false,
ESIndexFromTimestamp: false,
Id: "",
ReplaceDotsWith: ".",
}

config.Fields = fieldChoices[:]
Expand All @@ -464,6 +479,7 @@ func (e *ESLogstashV0Encoder) Init(config interface{}) (err error) {
e.fields = conf.Fields
e.timestampFormat = conf.Timestamp
e.useMessageType = conf.UseMessageType
e.replaceDotsWith = conf.ReplaceDotsWith
e.coord = &ElasticSearchCoordinates{
Index: conf.Index,
Type: conf.TypeName,
Expand Down Expand Up @@ -563,7 +579,7 @@ func (e *ESLogstashV0Encoder) Encode(pack *PipelinePack) (output []byte, err err
}
}
}
writeField(firstfield, &buf, field, raw)
writeField(firstfield, &buf, field, raw, e.replaceDotsWith)
firstfield = false
}
}
Expand Down
14 changes: 12 additions & 2 deletions plugins/elasticsearch/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#
# Contributor(s):
# Tanguy Leroux ([email protected])
# John Stanford ([email protected])
#
# ***** END LICENSE BLOCK *****/

Expand Down Expand Up @@ -74,6 +75,8 @@ func getTestMessageWithFunnyFields() *message.Message {
field12.AddValue("jkl;")
field12.AddValue("push")
field12.AddValue("pull")
field13 := message.NewFieldInit("test.dotted.field.name.string", message.Field_STRING, "")
field13.AddValue("{\"asdf\":123}")

msg := &message.Message{}
msg.SetType("TEST")
Expand Down Expand Up @@ -102,6 +105,7 @@ func getTestMessageWithFunnyFields() *message.Message {
msg.AddField(field10)
msg.AddField(field11)
msg.AddField(field12)
msg.AddField(field13)

return msg
}
Expand Down Expand Up @@ -164,7 +168,9 @@ func ESEncodersSpec(c gs.Context) {
"test_raw_field_bytes",
"test_raw_field_string_array",
"test_raw_field_bytes_array",
"test.dotted.field.name.string",
}
config.ReplaceDotsWith = "_"

c.Specify("Should properly encode a message", func() {
err := encoder.Init(config)
Expand Down Expand Up @@ -237,6 +243,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0)
})

c.Specify("encodes w/ a different timestamp format", func() {
Expand Down Expand Up @@ -292,7 +299,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(len(decoded), gs.Equals, 10)
fieldsValInterface := decoded["@fields"]
fieldsVal := fieldsValInterface.(map[string]interface{})
c.Expect(len(fieldsVal), gs.Equals, 13)
c.Expect(len(fieldsVal), gs.Equals, 14)
})
})
})
Expand All @@ -305,7 +312,9 @@ func ESEncodersSpec(c gs.Context) {
"test_raw_field_bytes",
"test_raw_field_string_array",
"test_raw_field_bytes_array",
"test.dotted.field.name.string",
}
config.ReplaceDotsWith = "_"

c.Specify("Should properly encode a message", func() {
err := encoder.Init(config)
Expand Down Expand Up @@ -377,6 +386,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(decoded["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0)
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0)
})

c.Specify("Produces valid JSON when DynamicFields is first configured field and message has no fields", func() {
Expand Down Expand Up @@ -498,7 +508,7 @@ func ESEncodersSpec(c gs.Context) {
decoded := make(map[string]interface{})
err = json.Unmarshal([]byte(lines[1]), &decoded)
c.Assume(err, gs.IsNil)
c.Expect(len(decoded), gs.Equals, 22) // 9 base fields and 13 dynamic fields.
c.Expect(len(decoded), gs.Equals, 23) // 9 base fields and 14 dynamic fields.
})

c.Specify("when dynamic_fields is specified", func() {
Expand Down