-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis.go
42 lines (35 loc) · 841 Bytes
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"context"
"github.com/childe/gohangout/codec"
"github.com/go-redis/redis/v8"
)
type RedisInput struct {
key string
decoder codec.Decoder
client *redis.Client
}
func New(config map[interface{}]interface{}) interface{} {
client := redis.NewClient(&redis.Options{
Addr: config["address"].(string),
Password: config["password"].(string),
DB: config["db"].(int),
})
codertype := "json"
if v, ok := config["codec"]; ok {
codertype = v.(string)
}
key := config["key"].(string)
decoder := codec.NewDecoder(codertype)
return &RedisInput{
key,
decoder,
client,
}
}
func (p *RedisInput) ReadOneEvent() map[string]interface{} {
cmd := p.client.BLPop(context.TODO(), 0, p.key)
message := cmd.Val()[1]
return p.decoder.Decode([]byte(message))
}
func (p *RedisInput) Shutdown() {}