Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor ConsumerMetadataReq/Resp -> FindCoordinatorReq/Resp #1010

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume
return response, nil
}

func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
response := new(FindCoordinatorResponse)

err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)

Expand Down
11 changes: 6 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
return nil
}

func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) {
retry := func(err error) (*ConsumerMetadataResponse, error) {
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
Expand All @@ -748,10 +748,11 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

request := new(ConsumerMetadataRequest)
request.ConsumerGroup = consumerGroup
request := new(FindCoordinatorRequest)
request.CoordinatorKey = consumerGroup
request.CoordinatorType = CoordinatorGroup

response, err := broker.GetConsumerMetadata(request)
response, err := broker.FindCoordinator(request)

if err != nil {
Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
Expand Down
13 changes: 10 additions & 3 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ type ConsumerMetadataRequest struct {
}

func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
return pe.putString(r.ConsumerGroup)
tmp := new(FindCoordinatorRequest)
tmp.CoordinatorKey = r.ConsumerGroup
tmp.CoordinatorType = CoordinatorGroup
return tmp.encode(pe)
}

func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.ConsumerGroup, err = pd.getString()
return err
tmp := new(FindCoordinatorRequest)
if err := tmp.decode(pd, version); err != nil {
return err
}
r.ConsumerGroup = tmp.CoordinatorKey
return nil
}

func (r *ConsumerMetadataRequest) key() int16 {
Expand Down
10 changes: 7 additions & 3 deletions consumer_metadata_request_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sarama

import "testing"
import (
"testing"
)

var (
consumerMetadataRequestEmpty = []byte{
Expand All @@ -12,8 +14,10 @@ var (

func TestConsumerMetadataRequest(t *testing.T) {
request := new(ConsumerMetadataRequest)
testRequest(t, "empty string", request, consumerMetadataRequestEmpty)
testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0)

request.ConsumerGroup = "foobar"
testRequest(t, "with string", request, consumerMetadataRequestString)
testEncodable(t, "with string", request, consumerMetadataRequestString)
testVersionDecodable(t, "with string", request, consumerMetadataRequestString, 0)
}
48 changes: 20 additions & 28 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ type ConsumerMetadataResponse struct {
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
}
r.Err = KError(tmp)
tmp := new(FindCoordinatorResponse)

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
if err := tmp.decode(pd, version); err != nil {
return err
}
if coordinator.addr == ":0" {

r.Err = tmp.Err

r.Coordinator = tmp.Coordinator
if tmp.Coordinator == nil {
return nil
}
r.Coordinator = coordinator

// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
// backwards compatibility
Expand All @@ -47,28 +45,22 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if r.Coordinator != nil {
host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
if err != nil {
return err
}
port, err := strconv.ParseInt(portstr, 10, 32)
if err != nil {
return err
}
pe.putInt32(r.Coordinator.ID())
if err := pe.putString(host); err != nil {
return err
}
pe.putInt32(int32(port))
return nil
if r.Coordinator == nil {
r.Coordinator = new(Broker)
r.Coordinator.id = r.CoordinatorID
r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
}

tmp := &FindCoordinatorResponse{
Version: 0,
Err: r.Err,
Coordinator: r.Coordinator,
}
pe.putInt32(r.CoordinatorID)
if err := pe.putString(r.CoordinatorHost); err != nil {

if err := tmp.encode(pe); err != nil {
return err
}
pe.putInt32(r.CoordinatorPort)

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ var (
)

func TestConsumerMetadataResponseError(t *testing.T) {
response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testResponse(t, "error", &response, consumerMetadataResponseError)
response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
t.Error("could not decode: ", err)
}

if decodedResp.Err != ErrOffsetsLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
}
}

func TestConsumerMetadataResponseSuccess(t *testing.T) {
Expand Down
61 changes: 61 additions & 0 deletions find_coordinator_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package sarama

type CoordinatorType int8

const (
CoordinatorGroup CoordinatorType = 0
CoordinatorTransaction CoordinatorType = 1
)

type FindCoordinatorRequest struct {
Version int16
CoordinatorKey string
CoordinatorType CoordinatorType
}

func (f *FindCoordinatorRequest) encode(pe packetEncoder) error {
if err := pe.putString(f.CoordinatorKey); err != nil {
return err
}

if f.Version >= 1 {
pe.putInt8(int8(f.CoordinatorType))
}

return nil
}

func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) {
if f.CoordinatorKey, err = pd.getString(); err != nil {
return err
}

if version >= 1 {
f.Version = version
coordinatorType, err := pd.getInt8()
if err != nil {
return err
}

f.CoordinatorType = CoordinatorType(coordinatorType)
}

return nil
}

func (f *FindCoordinatorRequest) key() int16 {
return 10
}

func (f *FindCoordinatorRequest) version() int16 {
return f.Version
}

func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
switch f.Version {
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}
33 changes: 33 additions & 0 deletions find_coordinator_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sarama

import "testing"

var (
findCoordinatorRequestConsumerGroup = []byte{
0, 5, 'g', 'r', 'o', 'u', 'p',
0,
}

findCoordinatorRequestTransaction = []byte{
0, 13, 't', 'r', 'a', 'n', 's', 'a', 'c', 't', 'i', 'o', 'n', 'i', 'd',
1,
}
)

func TestFindCoordinatorRequest(t *testing.T) {
req := &FindCoordinatorRequest{
Version: 1,
CoordinatorKey: "group",
CoordinatorType: CoordinatorGroup,
}

testRequest(t, "version 1 - group", req, findCoordinatorRequestConsumerGroup)

req = &FindCoordinatorRequest{
Version: 1,
CoordinatorKey: "transactionid",
CoordinatorType: CoordinatorTransaction,
}

testRequest(t, "version 1 - transaction", req, findCoordinatorRequestTransaction)
}
85 changes: 85 additions & 0 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package sarama

import (
"time"
)

type FindCoordinatorResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
ErrMsg *string
Coordinator *Broker
}

func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
if version >= 1 {
f.Version = version

throttleTime, err := pd.getInt32()
if err != nil {
return err
}
f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
}

tmp, err := pd.getInt16()
if err != nil {
return err
}
f.Err = KError(tmp)

if version >= 1 {
if f.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
}

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
return err
}
if coordinator.addr == ":0" {
return nil
}
f.Coordinator = coordinator

return nil
}

func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
if f.Version >= 1 {
pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
}

pe.putInt16(int16(f.Err))

if f.Version >= 1 {
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
}
}

if err := f.Coordinator.encode(pe); err != nil {
return err
}

return nil
}

func (f *FindCoordinatorResponse) key() int16 {
return 10
}

func (f *FindCoordinatorResponse) version() int16 {
return f.Version
}

func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
switch f.Version {
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}
Loading