-
Notifications
You must be signed in to change notification settings - Fork 63
/
auth.go
68 lines (58 loc) · 1.28 KB
/
auth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package kafka
import (
"crypto/tls"
"encoding/json"
"time"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)
const (
Plain = "plain"
SHA256 = "sha256"
SHA512 = "sha512"
)
type Credentials struct {
Username string `json:"username"`
Password string `json:"password"`
Algorithm string `json:"algorithm"`
}
func unmarshalCredentials(auth string) (creds *Credentials, err error) {
creds = &Credentials{
Algorithm: Plain,
}
err = json.Unmarshal([]byte(auth), &creds)
return
}
func getDialer(creds *Credentials) (dialer *kafkago.Dialer) {
dialer = &kafkago.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
if creds.Algorithm == Plain {
mechanism := plain.Mechanism{
Username: creds.Username,
Password: creds.Password,
}
dialer.SASLMechanism = mechanism
return
} else {
hashes := make(map[string]scram.Algorithm)
hashes["sha256"] = scram.SHA256
hashes["sha512"] = scram.SHA512
mechanism, err := scram.Mechanism(
hashes[creds.Algorithm],
creds.Username,
creds.Password,
)
if err != nil {
ReportError(err, "authentication failed")
return nil
}
dialer.SASLMechanism = mechanism
return
}
}