-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15842: Correct handling of KafkaConsumer.committed for new consumer #14859
Conversation
Merge commits from trunk
Closing and reopening PR to retry failed build. |
https://issues.apache.org/jira/browse/KAFKA-15932 opened for flaky test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AndrewJSchofield!
Thanks for the fix and the associated clean up. One minor nit-ty change requested.
@@ -48,7 +48,7 @@ | |||
<suppress id="dontUseSystemExit" | |||
files="Exit.java"/> | |||
<suppress checks="ClassFanOutComplexity" | |||
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|PrototypeAsyncConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this clean up!
@@ -663,6 +662,10 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition | |||
time.timer(timeout)); | |||
committedOffsets.forEach(this::updateLastSeenEpochIfNewer); | |||
return committedOffsets; | |||
} catch (TimeoutException e) { | |||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + | |||
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please replace the literal string default.api.timeout.ms
with the appropriate constant from ConsumerConfig
🥺
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree with @kirktrue , let's merge this PR and fix this on the side in a follow-up PR.
return new HashMap<>(); | ||
return Collections.emptyMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make a difference, or just a stylistic/pet peeve/optimiz/sation thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it strictly identical to the previous implementation.
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AndrewJSchofield !
LGTM!
…umer (apache#14859) This PR fixes some details of the interface to KafkaConsumer.committed which were different between the existing consumer and the new consumer. Adds a unit test that validates the behaviour is the same for both consumer implementations. Reviewers: Kirk True <[email protected]>, Bruno Cadonna <[email protected]>
…umer (apache#14859) This PR fixes some details of the interface to KafkaConsumer.committed which were different between the existing consumer and the new consumer. Adds a unit test that validates the behaviour is the same for both consumer implementations. Reviewers: Kirk True <[email protected]>, Bruno Cadonna <[email protected]>
…umer (apache#14859) This PR fixes some details of the interface to KafkaConsumer.committed which were different between the existing consumer and the new consumer. Adds a unit test that validates the behaviour is the same for both consumer implementations. Reviewers: Kirk True <[email protected]>, Bruno Cadonna <[email protected]>
…umer (apache#14859) This PR fixes some details of the interface to KafkaConsumer.committed which were different between the existing consumer and the new consumer. Adds a unit test that validates the behaviour is the same for both consumer implementations. Reviewers: Kirk True <[email protected]>, Bruno Cadonna <[email protected]>
This PR fixes some details of the interface to KafkaConsumer.committed which were different between the existing consumer and the new consumer.
Adds a unit test that validates the behaviour is the same for both consumer implementations.
Committer Checklist (excluded from commit message)