Skip to content

Commit

Permalink
Merge pull request #1010 from buyology/find-coordinator
Browse files Browse the repository at this point in the history
refactor ConsumerMetadataReq/Resp -> FindCoordinatorReq/Resp
  • Loading branch information
eapache committed Feb 13, 2018
2 parents 44e7121 + d034048 commit 5c0a1f5
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 42 deletions.
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume
return response, nil
}

func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
response := new(FindCoordinatorResponse)

err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)

Expand Down
11 changes: 6 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
return nil
}

func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) {
retry := func(err error) (*ConsumerMetadataResponse, error) {
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
Expand All @@ -748,10 +748,11 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

request := new(ConsumerMetadataRequest)
request.ConsumerGroup = consumerGroup
request := new(FindCoordinatorRequest)
request.CoordinatorKey = consumerGroup
request.CoordinatorType = CoordinatorGroup

response, err := broker.GetConsumerMetadata(request)
response, err := broker.FindCoordinator(request)

if err != nil {
Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
Expand Down
13 changes: 10 additions & 3 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ type ConsumerMetadataRequest struct {
}

func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
return pe.putString(r.ConsumerGroup)
tmp := new(FindCoordinatorRequest)
tmp.CoordinatorKey = r.ConsumerGroup
tmp.CoordinatorType = CoordinatorGroup
return tmp.encode(pe)
}

func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.ConsumerGroup, err = pd.getString()
return err
tmp := new(FindCoordinatorRequest)
if err := tmp.decode(pd, version); err != nil {
return err
}
r.ConsumerGroup = tmp.CoordinatorKey
return nil
}

func (r *ConsumerMetadataRequest) key() int16 {
Expand Down
10 changes: 7 additions & 3 deletions consumer_metadata_request_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sarama

import "testing"
import (
"testing"
)

var (
consumerMetadataRequestEmpty = []byte{
Expand All @@ -12,8 +14,10 @@ var (

func TestConsumerMetadataRequest(t *testing.T) {
request := new(ConsumerMetadataRequest)
testRequest(t, "empty string", request, consumerMetadataRequestEmpty)
testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0)

request.ConsumerGroup = "foobar"
testRequest(t, "with string", request, consumerMetadataRequestString)
testEncodable(t, "with string", request, consumerMetadataRequestString)
testVersionDecodable(t, "with string", request, consumerMetadataRequestString, 0)
}
48 changes: 20 additions & 28 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ type ConsumerMetadataResponse struct {
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
}
r.Err = KError(tmp)
tmp := new(FindCoordinatorResponse)

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
if err := tmp.decode(pd, version); err != nil {
return err
}
if coordinator.addr == ":0" {

r.Err = tmp.Err

r.Coordinator = tmp.Coordinator
if tmp.Coordinator == nil {
return nil
}
r.Coordinator = coordinator

// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
// backwards compatibility
Expand All @@ -47,28 +45,22 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if r.Coordinator != nil {
host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
if err != nil {
return err
}
port, err := strconv.ParseInt(portstr, 10, 32)
if err != nil {
return err
}
pe.putInt32(r.Coordinator.ID())
if err := pe.putString(host); err != nil {
return err
}
pe.putInt32(int32(port))
return nil
if r.Coordinator == nil {
r.Coordinator = new(Broker)
r.Coordinator.id = r.CoordinatorID
r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
}

tmp := &FindCoordinatorResponse{
Version: 0,
Err: r.Err,
Coordinator: r.Coordinator,
}
pe.putInt32(r.CoordinatorID)
if err := pe.putString(r.CoordinatorHost); err != nil {

if err := tmp.encode(pe); err != nil {
return err
}
pe.putInt32(r.CoordinatorPort)

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ var (
)

func TestConsumerMetadataResponseError(t *testing.T) {
response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testResponse(t, "error", &response, consumerMetadataResponseError)
response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
t.Error("could not decode: ", err)
}

if decodedResp.Err != ErrOffsetsLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
}
}

func TestConsumerMetadataResponseSuccess(t *testing.T) {
Expand Down
61 changes: 61 additions & 0 deletions find_coordinator_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package sarama

type CoordinatorType int8

const (
CoordinatorGroup CoordinatorType = 0
CoordinatorTransaction CoordinatorType = 1
)

type FindCoordinatorRequest struct {
Version int16
CoordinatorKey string
CoordinatorType CoordinatorType
}

func (f *FindCoordinatorRequest) encode(pe packetEncoder) error {
if err := pe.putString(f.CoordinatorKey); err != nil {
return err
}

if f.Version >= 1 {
pe.putInt8(int8(f.CoordinatorType))
}

return nil
}

func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) {
if f.CoordinatorKey, err = pd.getString(); err != nil {
return err
}

if version >= 1 {
f.Version = version
coordinatorType, err := pd.getInt8()
if err != nil {
return err
}

f.CoordinatorType = CoordinatorType(coordinatorType)
}

return nil
}

func (f *FindCoordinatorRequest) key() int16 {
return 10
}

func (f *FindCoordinatorRequest) version() int16 {
return f.Version
}

func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
switch f.Version {
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}
33 changes: 33 additions & 0 deletions find_coordinator_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sarama

import "testing"

var (
findCoordinatorRequestConsumerGroup = []byte{
0, 5, 'g', 'r', 'o', 'u', 'p',
0,
}

findCoordinatorRequestTransaction = []byte{
0, 13, 't', 'r', 'a', 'n', 's', 'a', 'c', 't', 'i', 'o', 'n', 'i', 'd',
1,
}
)

func TestFindCoordinatorRequest(t *testing.T) {
req := &FindCoordinatorRequest{
Version: 1,
CoordinatorKey: "group",
CoordinatorType: CoordinatorGroup,
}

testRequest(t, "version 1 - group", req, findCoordinatorRequestConsumerGroup)

req = &FindCoordinatorRequest{
Version: 1,
CoordinatorKey: "transactionid",
CoordinatorType: CoordinatorTransaction,
}

testRequest(t, "version 1 - transaction", req, findCoordinatorRequestTransaction)
}
85 changes: 85 additions & 0 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package sarama

import (
"time"
)

type FindCoordinatorResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
ErrMsg *string
Coordinator *Broker
}

func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
if version >= 1 {
f.Version = version

throttleTime, err := pd.getInt32()
if err != nil {
return err
}
f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
}

tmp, err := pd.getInt16()
if err != nil {
return err
}
f.Err = KError(tmp)

if version >= 1 {
if f.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
}

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
return err
}
if coordinator.addr == ":0" {
return nil
}
f.Coordinator = coordinator

return nil
}

func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
if f.Version >= 1 {
pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
}

pe.putInt16(int16(f.Err))

if f.Version >= 1 {
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
}
}

if err := f.Coordinator.encode(pe); err != nil {
return err
}

return nil
}

func (f *FindCoordinatorResponse) key() int16 {
return 10
}

func (f *FindCoordinatorResponse) version() int16 {
return f.Version
}

func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
switch f.Version {
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}
Loading

0 comments on commit 5c0a1f5

Please sign in to comment.