Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full version matrix produce/consume test #1040

Merged
merged 11 commits into from
Feb 13, 2018
Prev Previous commit
Next Next commit
Add LZ4 specific version matrix text
  • Loading branch information
horkhe committed Feb 13, 2018
commit 7579f896a21cfdd5b574bcc094183216411dce5c
115 changes: 71 additions & 44 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,59 +65,97 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
safeClose(t, pc)
}

// Makes sure that messages produced by all supported producer version can be
// consumed by all supported consumer versions. It relies on the KAFKA_VERSION
// environment variable to provide the version of the test Kafka cluster.
// Makes sure that messages produced by all supported client versions/
// compression codecs (except LZ4) combinations can be consumed by all
// supported consumer versions. It relies on the KAFKA_VERSION environment
// variable to provide the version of the test Kafka cluster.
//
// Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
// from this test case. It has a similar version matrix test case below that
// only checks versions from v0.10.0.0 until KAFKA_VERSION.
func TestVersionMatrix(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Produce lot's of message with all possible combinations of supported
// protocol versions and compressions for the except of LZ4.
testVersions := versionRange(V0_8_2_0)
allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100)

// When/Then
consumeMsgs(t, testVersions, producedMessages)
}

// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
// test LZ4 should start with v0.10.0.0.
func TestVersionMatrixLZ4(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Produce lot's of message with all possible combinations of supported
// protocol versions starting with v0.10 (first where LZ4 was supported)
// and all possible compressions.
testVersions := versionRange(V0_10_0_0)
allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100)

// When/Then
consumeMsgs(t, testVersions, producedMessages)
}

func prodMsg2Str(prodMsg *ProducerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
}

func consMsg2Str(consMsg *ConsumerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
}

func versionRange(lower KafkaVersion) []KafkaVersion {
// Get the test cluster version from the environment. If there is nothing
// there then assume the highest.
testClusterVersion, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
if err != nil {
testClusterVersion = MaxVersion
upper = MaxVersion
}

// Produce lot's of message with all possible combinations of supported
// protocol versions and compressions.
var wg sync.WaitGroup
var producedMessagesMu sync.Mutex
var producedMessages []*ProducerMessage
for _, prodVer := range SupportedVersions {
// Skip versions unsupported by the test cluster.
if !testClusterVersion.IsAtLeast(prodVer) {
versions := make([]KafkaVersion, 0, len(SupportedVersions))
for _, v := range SupportedVersions {
if !v.IsAtLeast(lower) {
continue
}
for _, compression := range []CompressionCodec{
CompressionNone,
CompressionGZIP,
CompressionSnappy,
// FIXME: lz4.Read: invalid header checksum: got 26 expected 130
// CompressionLZ4,
} {
// lz4 compression requires Version >= V0_10_0_0
if compression == CompressionLZ4 && !prodVer.IsAtLeast(V0_10_0_0) {
continue
}
if !upper.IsAtLeast(v) {
return versions
}
versions = append(versions, v)
}
return versions
}

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage {
var wg sync.WaitGroup
var producedMessagesMu sync.Mutex
var producedMessages []*ProducerMessage
for _, prodVer := range clientVersions {
for _, codec := range codecs {
prodCfg := NewConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = 17
prodCfg.Producer.Compression = compression
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec

p, err := NewSyncProducer(kafkaBrokers, prodCfg)
Copy link
Contributor

@eapache eapache Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be simpler to run this test with an AsyncProducer so you can avoid spinning up a bunch of goroutines and tracking them with a waitgroup in order to get parallelism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point was to have several producers produce different types of messageSets/batchRecords concurrently. I certainly can use AsyncProducers for that although then I would need to store them somewhere and collect produced messages from all of their Success() channels, not sure if this implementation is going to be shorter though. I can implement it this way if you want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I missed the wg.Wait() being fully outside the outer loop and thought the purpose was only for batching. This is fine then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know what, I cannot reproduce kafka: response did not contain all the expected topic/partition blocks now at all. Looks like #1041 fixed the root cause of this issue too. So I suggest simplifying messageSet/recordBatch parsing a bit to make it easier to understand.

Note that I left:

	if len(messages) == 0 {
		return nil, ErrIncompleteResponse
	}

intact. But honestly I think it would be better just ignore this case. Because what do you expect the library user to do about that error? Nothing but retry, and sarama can do that on its own.

if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, compression, err)
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
}
defer safeClose(t, p)
for i := 0; i < 100; i++ {
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, compression, i)),
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
}
wg.Add(1)
go func() {
Expand All @@ -139,20 +177,17 @@ func TestVersionMatrix(t *testing.T) {
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
})

t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
return producedMessages
}

func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
// Consume all produced messages with all client versions supported by the
// cluster.
consumerVersionLoop:
for _, consVer := range SupportedVersions {
// Skip versions unsupported by the test cluster.
if !testClusterVersion.IsAtLeast(consVer) {
continue
}
for _, consVer := range clientVersions {
t.Logf("*** Consuming with client version %s\n", consVer)

// Create a partition consumer that should start from the first produced
// message.
consCfg := NewConfig()
Expand Down Expand Up @@ -189,11 +224,3 @@ consumerVersionLoop:
}
}
}

func prodMsg2Str(prodMsg *ProducerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
}

func consMsg2Str(consMsg *ConsumerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
}