forked from mozilla-services/heka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
host_filter.go
89 lines (81 loc) · 2.22 KB
/
host_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller ([email protected])
#
# ***** END LICENSE BLOCK *****/
package examples
import (
"errors"
"fmt"
"github.com/mozilla-services/heka/pipeline"
)
type HostFilter struct {
hosts map[string]bool
output string
}
// Extract hosts value from config and store it on the plugin instance.
func (f *HostFilter) Init(config interface{}) error {
var (
hostsConf interface{}
hosts []interface{}
host string
outputConf interface{}
ok bool
)
conf := config.(pipeline.PluginConfig)
if hostsConf, ok = conf["hosts"]; !ok {
return errors.New("No 'hosts' setting specified.")
}
if hosts, ok = hostsConf.([]interface{}); !ok {
return errors.New("'hosts' setting not a sequence.")
}
if outputConf, ok = conf["output"]; !ok {
return errors.New("No 'output' setting specified.")
}
if f.output, ok = outputConf.(string); !ok {
return errors.New("'output' setting not a string value.")
}
f.hosts = make(map[string]bool)
for _, h := range hosts {
if host, ok = h.(string); !ok {
return errors.New("Non-string host value.")
}
f.hosts[host] = true
}
return nil
}
// Fetch correct output and iterate over received messages, checking against
// message hostname and delivering to the output if hostname is in our config.
func (f *HostFilter) Run(runner pipeline.FilterRunner, helper pipeline.PluginHelper) (
err error) {
var (
hostname string
output pipeline.OutputRunner
ok bool
)
if output, ok = helper.Output(f.output); !ok {
return fmt.Errorf("No output: %s", output)
}
for pack := range runner.InChan() {
hostname = pack.Message.GetHostname()
if f.hosts[hostname] {
output.InChan() <- pack
} else {
pack.Recycle(nil)
}
}
return
}
func init() {
pipeline.RegisterPlugin("HostFilter", func() interface{} {
return new(HostFilter)
})
}