Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full version matrix produce/consume test #1040

Merged
merged 11 commits into from
Feb 13, 2018
Merged

Conversation

horkhe
Copy link
Contributor

@horkhe horkhe commented Feb 9, 2018

This PR adds a test that is checking that messages produced by clients of all supported versions can be consumed by clients of all supported version, also throwing all possible compressions into the mix. When the test runs it uses the value of the KAFKA_VERSION environment variable as the upper version bound. First it produces with clients of all version up to the upper limit and for each version it produces messages with different compression algorithms. Then it reads all produced messages with clients of all supported versions.

The test revealed several issues:

  1. kafka: response did not contain all the expected topic/partition blocks:
    EDIT: I fixed that in this PR by only reporting ErrIncompleteResponse when there is no complete messages retrieved;
  2. lz4.Read: invalid header checksum: got 26 expected 130: basically lz4 compression does not work at all, I commented it out in the test for now;
    EDIT: My mistake, since lz4 was introduced in v0.10.0.0 older clients obviously cannot read it. I moved LZ4 testing to another test case that features version matrix starting with v0.10.0.0;
  3. Consumed unexpected offset: version=<any version>, index=3219, want={offset: 3220, value: msg:0.11.0.0:none:35}, got={offset: 3221, value: msg:0.11.0.0:none:35} messages produced with client version 0.11.0.0 and higher are fetched with incorrect offset when consumed by any client version. So my earlier PR that suggested offsetSkew for 0.11.0.0-1.0.0 is incomplete. The fix is most likely should be in producer, because errors are returned by clients of all supported versions.
    EDIT: Hurray, LastOffsetDelta calculation fix #1041, fixed this issue, thank you @pkedy.

WARNING: For some reason Travis-CI thinks that build succeeds for all targets even though tests fail.
EDIT: I fixed that. Now test failure makes entire build fail. So you can see that builds for version 0.11.0.0 and higher fail.
EIDT2: Builds for version 0.11.0.0 and higher do not fail anymore, thanks to #1041.

@horkhe horkhe force-pushed the maxim/test branch 2 times, most recently from 7b5451c to 2819841 Compare February 9, 2018 12:44
@horkhe horkhe force-pushed the maxim/test branch 3 times, most recently from 3ebfca4 to 4f80234 Compare February 12, 2018 07:05
@horkhe
Copy link
Contributor Author

horkhe commented Feb 12, 2018

@eapache please take a look.

consumer.go Outdated
@@ -514,7 +514,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
}
}

if incomplete || len(messages) == 0 {
if incomplete && len(messages) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change; incomplete can only be set if messages contains at least one complete message already, so this condition becomes impossible to satisfy? I assume this is the kafka: response did not contain all the expected topic/partition blocks fix you mention; I'd appreciate more details on that.

I'm looking at this method again, and I'm wondering if the incomplete boolean is even necessary anymore. I'm not sure what it was trying to accomplish originally (guarding against decreasing offsets maybe?), but a simple check on len(messages) == 0 seems to be sufficient on its own? I'm not sure. That might even mean we don't need prelude anymore either, and can just unconditionally skip messages with offset < child.offset.

Copy link
Contributor Author

@horkhe horkhe Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You right, the error got away because this condition is now impossible to achieve. My reasoning was that incomplete record is supposed to be claimed as error only if there are no complete messages in the batch/set. But now looking at incomplete I understand that it actually does not mean partial trailing message as I thought.

I do not think that even len(message) == 0 should be treated as an error. If Kafka broker decides to return an empty block, so what? it just means that there are no new message and sarama should just make another long polling fetch request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the case where there are legitimately no messages in the block is already handled in parseResponse. If we get here and there are no messages we could extract, then I think it's legitimate to conclude an error of some sort since the server presumably sent us only messages we didn't ask for? I don't know if it's OK for the server to respond with a prelude but no actual messages...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason why I saw kafka: response did not contain all the expected topic/partition blocks error while running this test, was exactly because the server was replying with a sets/batches containing prelude only. I will double check that tomorrow when I am back in the office.

prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec

p, err := NewSyncProducer(kafkaBrokers, prodCfg)
Copy link
Contributor

@eapache eapache Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be simpler to run this test with an AsyncProducer so you can avoid spinning up a bunch of goroutines and tracking them with a waitgroup in order to get parallelism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point was to have several producers produce different types of messageSets/batchRecords concurrently. I certainly can use AsyncProducers for that although then I would need to store them somewhere and collect produced messages from all of their Success() channels, not sure if this implementation is going to be shorter though. I can implement it this way if you want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I missed the wg.Wait() being fully outside the outer loop and thought the purpose was only for batching. This is fine then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know what, I cannot reproduce kafka: response did not contain all the expected topic/partition blocks now at all. Looks like #1041 fixed the root cause of this issue too. So I suggest simplifying messageSet/recordBatch parsing a bit to make it easier to understand.

Note that I left:

	if len(messages) == 0 {
		return nil, ErrIncompleteResponse
	}

intact. But honestly I think it would be better just ignore this case. Because what do you expect the library user to do about that error? Nothing but retry, and sarama can do that on its own.

V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)

SupportedVersions = []KafkaVersion{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you decided to expose these values publicly? I don't see a use for them outside the tests you wrote.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sarama exposes the Protocol implementation that is used in my project mailgun/kafka-pixy, but some things are missing. And this is one of them. I realize that this is a selfish reason, so if you are against it, I will roll it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's also fine, I just couldn't think of a use case for it but you provided one.

@pkedy
Copy link
Contributor

pkedy commented Feb 12, 2018

@horkhe - Do your producer tests set Headers in the ProducerMessages? Even after the fix for #1041, if we set Headers, the following error occurs for at least half of the messages.

kafka server: Message was too large, server rejected it to avoid allocation error.

@eapache - There are other closed issues from the past that have the same error message but this might be a unique case just for when Headers are set. Worth opening a new issue?

@eapache
Copy link
Contributor

eapache commented Feb 12, 2018

Ya, please open a new issue. The implementation of Headers is fairly new, and I don't recall any past issues of "Message was too large" that didn't just end up being messages actually too large.

@pkedy
Copy link
Contributor

pkedy commented Feb 12, 2018

@eapache Now I'm not sure this is unique to Headers because I was able to get the same error without setting them. Referring back to the older issues on this (e.g. #959), I tried setting config.Producer.MaxMessageBytes = 104857500 (100 less than the default value of sarama.MaxRequestSize) and it seems to be stable. Under load (In our test app, we are only producing ~125,000 messages at a time), why would the defaults produce this error?

@horkhe
Copy link
Contributor Author

horkhe commented Feb 12, 2018

@pkedy nope, I do not set headers. Later we can add a test that sets headers, although that would be for a much smaller subset of versions.

@horkhe
Copy link
Contributor Author

horkhe commented Feb 13, 2018

@eapache After rebase to the latest master, the gzip compression encode/decode test started to fail. The test was broken by #1044, but a deeper problem is that CI tells that build is successful even if tests fail. In this PR that is fixed. But be careful merging PRs before the fix is in master. I fixed the gzip test by the way.

@eapache
Copy link
Contributor

eapache commented Feb 13, 2018

Thank you for all of your work on this!

CI tells that build is successful even if tests fail. In this PR that is fixed

That's really weird, I swear it used to fail properly and I haven't changed the config recently. Anyway, thank you for fixing this, I'm glad the only thing I missed due to this was a minor test failure.

@eapache eapache merged commit 44e7121 into IBM:master Feb 13, 2018
@horkhe
Copy link
Contributor Author

horkhe commented Feb 13, 2018

Test errors stopped failing builds since #960. Because in bash if commands in a pipeline are separated with ; the status of the pipeline is the status of the last command, so test execution status was shadowed by the last command.

@horkhe horkhe deleted the maxim/test branch February 13, 2018 14:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants