Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

优化 性能 #626

Merged
merged 3 commits into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions reader/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func TestGetTags(t *testing.T) {
assert.Equal(t, exp, tags)
}

func TestSetMapValueWithPrefix(t *testing.T) {
func TestSetMapValueExistWithPrefix(t *testing.T) {
data1 := map[string]interface{}{
"a": "b",
}
err1 := SetMapValueWithPrefix(data1, "newVal", "prefix", false, "a")
err1 := SetMapValueExistWithPrefix(data1, "newVal", "prefix", "a")
assert.NoError(t, err1)
exp1 := map[string]interface{}{
"a": "b",
Expand All @@ -154,7 +154,7 @@ func TestSetMapValueWithPrefix(t *testing.T) {
"age": 45,
},
}
err2 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "name"}...)
err2 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "name"}...)
assert.NoError(t, err2)
exp2 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -165,10 +165,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
}
assert.Equal(t, exp2, data2)

err3 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"xy", "name"}...)
assert.Error(t, err3)
err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...)
assert.NoError(t, err3)

err4 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "hello"}...)
err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...)
assert.NoError(t, err4)
exp4 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -177,14 +177,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
"prefix_name": "newVal",
"hello": "newVal",
},
"xy": map[string]interface{}{
"name": "newVal",
},
}
assert.Equal(t, exp4, data2)

data5 := map[string]interface{}{}
err5 := SetMapValueWithPrefix(data5, "newVal", "prefix", true, "a")
assert.NoError(t, err5)
exp5 := map[string]interface{}{
"prefix_a": "newVal",
}
assert.Equal(t, exp5, data5)
}
47 changes: 32 additions & 15 deletions sender/fault_tolerant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ var _ SkipDeepCopySender = &FtSender{}

// FtSender fault tolerance sender wrapper
type FtSender struct {
stopped int32
exitChan chan struct{}
innerSender Sender
logQueue queue.BackendQueue
BackupQueue queue.BackendQueue
writeLimit int // 写入速度限制,单位MB
strategy string
procs int //发送并发数
runnerName string
opt *FtOption
stats StatsInfo
statsMutex *sync.RWMutex
jsontool jsoniter.API
stopped int32
exitChan chan struct{}
innerSender Sender
logQueue queue.BackendQueue
BackupQueue queue.BackendQueue
writeLimit int // 写入速度限制,单位MB
strategy string
procs int //发送并发数
runnerName string
opt *FtOption
stats StatsInfo
statsMutex *sync.RWMutex
jsontool jsoniter.API
pandoraKeyCache map[string]KeyInfo
}

type FtOption struct {
Expand All @@ -59,21 +60,23 @@ type FtOption struct {
memoryChannel bool
memoryChannelSize int
longDataDiscard bool
innerSenderType string
}

type datasContext struct {
Datas []Data `json:"datas"`
}

// NewFtSender Fault tolerant sender constructor
func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) {
func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) {
memoryChannel, _ := conf.GetBoolOr(KeyFtMemoryChannel, false)
memoryChannelSize, _ := conf.GetIntOr(KeyFtMemoryChannelSize, 100)
logPath, _ := conf.GetStringOr(KeyFtSaveLogPath, ftSaveLogPath)
syncEvery, _ := conf.GetIntOr(KeyFtSyncEvery, DefaultFtSyncEvery)
writeLimit, _ := conf.GetIntOr(KeyFtWriteLimit, defaultWriteLimit)
strategy, _ := conf.GetStringOr(KeyFtStrategy, KeyFtStrategyBackupOnly)
longDataDiscard, _ := conf.GetBoolOr(KeyFtLongDataDiscard, false)
senderType, _ := conf.GetStringOr(KeySenderType, "") //此处不会没有SenderType,在调用NewFtSender时已经检查
switch strategy {
case KeyFtStrategyAlwaysSave, KeyFtStrategyBackupOnly, KeyFtStrategyConcurrent:
default:
Expand All @@ -91,9 +94,10 @@ func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtS
memoryChannel: memoryChannel,
memoryChannelSize: memoryChannelSize,
longDataDiscard: longDataDiscard,
innerSenderType: senderType,
}

return newFtSender(ftSender, runnerName, opt)
return newFtSender(innerSender, runnerName, opt)
}

func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSender, error) {
Expand Down Expand Up @@ -123,6 +127,10 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
statsMutex: new(sync.RWMutex),
jsontool: jsoniter.Config{EscapeHTML: true, UseNumber: true}.Froze(),
}

if opt.innerSenderType == TypePandora {
ftSender.pandoraKeyCache = make(map[string]KeyInfo)
}
go ftSender.asyncSendLogFromDiskQueue()
return &ftSender, nil
}
Expand All @@ -132,6 +140,15 @@ func (ft *FtSender) Name() string {
}

func (ft *FtSender) Send(datas []Data) error {

switch ft.opt.innerSenderType {
case TypePandora:
for i, v := range datas {
datas[i] = DeepConvertKeyWithCache(v, ft.pandoraKeyCache)
}
default:
}

se := &StatsError{Ft: true}
if ft.strategy == KeyFtStrategyBackupOnly {
// 尝试直接发送数据,当数据失败的时候会加入到本地重试队列。外部不需要重试
Expand Down
99 changes: 99 additions & 0 deletions sender/fault_tolerant/fault_tolerant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,102 @@ func TestSkipDeepCopySender(t *testing.T) {
assert.True(t, fs.SkipDeepCopy())
}
}

func TestPandoraExtraInfo(t *testing.T) {
pandoraServer, pt := mock_pandora.NewMockPandoraWithPrefix("/v2")
conf1 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "true",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}

innerSender, err := pandora.NewSender(conf1)
if err != nil {
t.Fatal(err)
}
s, err := sender.NewFtSender(innerSender, conf1, fttestdir)
defer os.RemoveAll(fttestdir)
if err != nil {
t.Fatal(err)
}
d := Data{}
d["x1"] = "123.2"
d["hostname"] = "123.2"
d["hostname0"] = "123.2"
d["hostname1"] = "123.2"
d["hostname2"] = "123.2"
d["osinfo"] = "123.2"
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp := pandoraServer.Body
assert.Equal(t, true, strings.Contains(resp, "core"))
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2"))

conf2 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "false",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
innerSender, err = pandora.NewSender(conf2)
if err != nil {
t.Fatal(err)
}

s, err = sender.NewFtSender(innerSender, conf1, fttestdir)
d = Data{
"*x1": "123.2",
"x2.dot": "123.2",
"@timestamp": "2018-07-18T10:17:36.549054846+08:00",
}
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp = pandoraServer.Body
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2"))
assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00"))
}
3 changes: 0 additions & 3 deletions sender/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,9 +850,6 @@ func (s *Sender) Send(datas []Data) (se error) {
case SendTypeRaw:
return s.rawSend(datas)
default:
for i, v := range datas {
datas[i] = DeepConvertKey(v)
}
return s.schemaFreeSend(datas)
}
return nil
Expand Down
91 changes: 0 additions & 91 deletions sender/pandora/pandora_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,94 +951,3 @@ func TestPandoraSenderTime(t *testing.T) {
resp = pandora.Body
assert.Equal(t, resp, "x1=123.2")
}

func TestPandoraExtraInfo(t *testing.T) {
pandora, pt := mockPandora.NewMockPandoraWithPrefix("/v2")
conf1 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "true",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
s, err := NewSender(conf1)
if err != nil {
t.Fatal(err)
}
d := Data{}
d["x1"] = "123.2"
d["hostname"] = "123.2"
d["hostname0"] = "123.2"
d["hostname1"] = "123.2"
d["hostname2"] = "123.2"
d["osinfo"] = "123.2"
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp := pandora.Body
assert.Equal(t, true, strings.Contains(resp, "core"))
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2"))

conf2 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "false",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
s, err = NewSender(conf2)
if err != nil {
t.Fatal(err)
}
d = Data{
"*x1": "123.2",
"x2.dot": "123.2",
"@timestamp": "2018-07-18T10:17:36.549054846+08:00",
}
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp = pandora.Body
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2"))
assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00"))
}
4 changes: 2 additions & 2 deletions sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ var ModeKeyOptions = map[string][]Option{
KeyName: KeyPandoraAutoConvertDate,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{"true", "false"},
Default: "true",
ChooseOptions: []interface{}{"false", "true"},
Default: "false",
DefaultNoUse: false,
Description: "自动转换时间类型(pandora_auto_convert_date)",
Advance: true,
Expand Down
4 changes: 3 additions & 1 deletion sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ func (r *Registry) NewSender(conf conf.MapConf, ftSaveLogPath string) (sender Se
return
}
faultTolerant, _ := conf.GetBoolOr(KeyFaultTolerant, true)
if faultTolerant {

//如果是 PandoraSender,目前的依赖必须启用 ftsender,依赖Ftsender做key转换检查
if faultTolerant || sendType == TypePandora {
sender, err = NewFtSender(sender, conf, ftSaveLogPath)
if err != nil {
return
Expand Down
Loading