-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6868: Fix buffer underflow and expose group state in the consumer groups API #4980
Conversation
/** | ||
* The consumer group partition assignor. | ||
*/ | ||
public String state() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it makes sense to use an enum instead of having all the is* methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this too. I think the reason we left it as a string in the protocol is so that we could change existing states or add new states without requiring a version bump. By extension, the AdminClient could just expose the string and be left agnostic to specific states. In retrospect, however, we should have just used an enum in the protocol because we needed to check explicitly for particular states inside tools and now in AdminClient. Given that, I'm inclined to use an enum here and file a jira to change the protocol to use an enum as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that an enum is convenient in some ways. However, it makes the compatibility story a little more complex because if we add a new state, older clients would have to translate it to GroupState.UNKNOWN, or similar.
For example, let's say we add a new state for incremental rebalancing. In that case, we could have rebalancing() return true for that state, and just add a new string. With an enum, we'd have to add a new enum value and then map that to UNKNOWN in older clients.
Curious what you think the best approach is here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The older client would still return false for rebalancing()
in your example since it would not be aware of the new string. To be clear, even if we have an enum, we can still have methods that return true for multiple states.
Having said that, it is annoying not to be able to display the string if the type is unknown. The ideal scenario is hard to do in Java. Something like:
sealed trait State
case object Foo extends State
case object Bar extends State
case class Unknown(value: String) extends State
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created a new Java class org.apache.kafka.common.ConsumerGroupState
to represent this.
70cf70a
to
bb34a8b
Compare
} | ||
try { | ||
consumerThread.start | ||
// Test that we can list the new group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you extract the assertions with comments to separate methods? For example:
verifyNewConsumerGroupCanBeListed
verifyNewConsumerGroupCanBeDescribed
verifyFakeConsumerGroupIsListedAsDead
verifyConsumerGroupOffsetsAreListed
or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a lot of value in doing this unless we want to reuse the code, which we're not doing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually find it easier to read short, focused test methods, but I see your point. You can ignore this comment then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @asasvari for what it's worth. It is easier to understand and debug small test cases. It is not just about code reuse.
@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) { | |||
final List<MemberDescription> consumers = new ArrayList<>(members.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: memberDescriptions might be a better name
@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) { | |||
final List<MemberDescription> consumers = new ArrayList<>(members.size()); | |||
|
|||
for (DescribeGroupsResponse.GroupMember groupMember : members) { | |||
final PartitionAssignor.Assignment assignment = | |||
if (groupMember.memberAssignment().remaining() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract groupMember.memberAssignment() and protocolType.isEmpty() to final local variables to avoid code (& method call) duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are trivial accessors which are invoked twice at most, so duplicating the call is not a worry
@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) { | |||
final List<MemberDescription> consumers = new ArrayList<>(members.size()); | |||
|
|||
for (DescribeGroupsResponse.GroupMember groupMember : members) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracting this to a private method (with a signature like List getMemberDescriptions(List<DescribeGroupsResponse.GroupMember> members) might be better for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is really a worry here, since this is a short block of code.
@Override | ||
public Map<String, ConsumerGroupDescription> apply(Void v) { | ||
try { | ||
HashMap<String, ConsumerGroupDescription> map = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base type could be Map<String, ConsumerGroupDescription>, and variable name could be something like consumerGroupDescriptionByGroupId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"consumerGroupDescriptionByGroupId" is too long for a name, but "descriptions" might work here.
private final String partitionAssignor; | ||
private final String state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerGroupCommand should be updated to use the new KafkaAdminClient (as part of KAFKA-6884). Existing implementation expects "coordinator" and "assignmentStrategy" too in the return value of describeConsumerGroup(), see
GroupState(group, consumerGroupSummary.coordinator, consumerGroupSummary.assignmentStrategy, |
Can you please add these fields too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. "coordinator" should be returned here. "partitionAssignor" is already available.
7281a41
to
ed46098
Compare
* The consumer group state. | ||
*/ | ||
public enum ConsumerGroupState { | ||
UNKNOWN("Unknown"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While working with @asasvari I realized there is a similar enumeration in GroupMetadata.scala in the form of case objects of trait GroupState. However, in that file there is one additional value: Empty. Is there a particular reason it is missing from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Empty should be included here.
ed46098
to
108bb54
Compare
|
||
for (DescribeGroupsResponse.GroupMember groupMember : members) { | ||
final PartitionAssignor.Assignment assignment = | ||
if (groupMember.memberAssignment().remaining() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmccabe Do you know when would member assignment buffer be empty? The patch excludes a member from the group description if the member assignment if empty. In that case, would it be more reasonable to make change in the server side such that the member is not included in the DescribeGroupsResponse in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's odd that the server sends empty assignments, and probably should be changed. However, the 1.2 AdminClient may be used with a 1.1 or older broker, so we still have to handle the old (arguably broken) behavior. I think we can do a little better here and keep the rest of the member description even if the member assignment is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. If 1.1 broker will send empty assignment in a for a given member in a legitimate DescribeGroupsResponse, is other information of the MemberDescription
(e.g. memberId, host) still useful? I am wondering if we should still include the member in the group description instead of discarding it entirely.
Another question is that, if it is not reasonable to return an empty assignment, can we also fix it behavior in the server? If it is not straightforward then we can do it a separate patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will return an empty assignment while the group is rebalancing. Rebalances can sometimes take a little while to complete, and we have found it useful to still be able to show the members when the group is in this state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Ah I see. Thanks for the explanation. The logic seems more reasonable now after we include the member in the ConsumerGroupDescription even if the member's assignment is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few comments/questions.
*/ | ||
public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) { | ||
this.groupId = groupId; | ||
public static class Builder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious if we use builders for any of the other admin object? Does it need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious if we use builders for any of the other admin object?
I would argue the *Options classes (CreateTopicsOptions, etc.) are very similar to builders. For example, you can use CreateTopicsOptions like this: new CreateTopicsOptions().timeoutMs(10000).validateOnly(true)
This is useful to let the options evolve over time.
I think ConsumerGroupDescription
will almost certainly get new fields over time, so some of the same concerns apply here (although it's less urgent if the constructor is not public)
Does it need to be public?
I guess we can make it package-private for now. Code that needs this for unit tests could use MockAdminClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, builders are nice for the reasons you mention. However, since it wasn't discussed in KIP-222, I think keeping it package private for now might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@Override | ||
public Map<String, ConsumerGroupDescription> apply(Void v) { | ||
try { | ||
Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: guess we could pass the size to HashMap
@@ -2347,11 +2348,11 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri | |||
|
|||
@Override | |||
void handleResponse(AbstractResponse abstractResponse) { | |||
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; | |||
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this patch, but we don't seem to be checking the error code here? Maybe we can implement a simple behavior for now and fail the future for this group if the error is not NONE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I will add some basic error handling here. Another thing we missed in the original patch... 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an error like COORDINATOR_NOT_AVAILABLE, we should not send a describeConsumerGroups request. I ran into this issue when I was working on KAFKA-6884: testResetOffsetsNotExistingGroup
in ResetConsumerGroupOffsetTest
failed with a TimeoutException
. I believe the reason for this is that LeaderNotAvailableException
is a retriable exception.
ConsumerProtocol.deserializeAssignment( | ||
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); | ||
|
||
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we read and wrap? Could we just duplicate the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let's just duplicate it.
import java.util.List; | ||
|
||
/** | ||
* A description of the assignments of a specific group member. | ||
*/ | ||
public class MemberAssignment { | ||
private final List<TopicPartition> topicPartitions; | ||
private final Collection<TopicPartition> topicPartitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why you made this change. Lists are a bit more convenient for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, returning a List implies that ordering is important, which it's not in this case. Using Collection gives us the freedom to change it in the future. Most things are just as convenient on either one, right? You can iterate using the standard for loop, iterators, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue with Collection
as that equals
is not defined. So, one should be careful about returning it in public APIs. It's OK to receive it as a parameter though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think Set makes sense here, since ordering doesn't matter
|
||
public String groupId() { | ||
return groupId; | ||
public GroupIdNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. This is technically a public api as of 1.1, though I don't think it was raised from any API in that release. Perhaps it is justifiable since the expectation was never correctly implemented anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point. The API could never work, though (and still never can). ApiExceptions need to be deserializable from Error enums without additional information. Additional information is a nice-to-have, but may or may not be provided. If anyone tried to use this in 1.1, it wouldn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess I'm ok with it since the API couldn't work as intended.
} | ||
try { | ||
consumerThread.start | ||
// Test that we can list the new group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @asasvari for what it's worth. It is easier to understand and debug small test cases. It is not just about code reuse.
* The consumer groups API should expose group state. This information is needed by administrative tools and scripts that access consume groups. * The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer. * Remove nulls from the API and make all collections immutable. * DescribeConsumerGroupsResult#all should return a result as expected, rather than Void * Add Builder classes for MemberDescription and ConsumerGroupDescription, so that we can easily add new fields later without having a large number of constructors. * Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar.
f077f0c
to
25035e6
Compare
ConsumerProtocol.deserializeAssignment( | ||
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); | ||
|
||
Set<TopicPartition> partitions = Collections.<TopicPartition>emptySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: it seems that <TopicPartition>
is not needed?
this.topicPartitions = topicPartitions; | ||
MemberAssignment(Set<TopicPartition> topicPartitions) { | ||
this.topicPartitions = topicPartitions == null ? Collections.<TopicPartition>emptySet() : | ||
Collections.unmodifiableSet(new TreeSet<TopicPartition>(topicPartitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: TopicPartition
in <>
is probably not needed.
Also, do we need any specific feature from TreeSet<> here? If we don't need order of elements then should we use HashSet
to make the code a bit more consistent and efficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used TreeSet here to give a consistent order of traversal. But actually, HashSet should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Sounds good.
*/ | ||
public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) { | ||
this.groupId = groupId; | ||
ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: It may be OK. But just want to double check the recommended code style. If there are many parameters in the constructor, should we consistently put each parameter in a new line similar to e.g. LogManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once you have a chance to address Dong's feedback.
Also note the failing test case. |
Yeah, the failing test case was because of the TreeSet (no comparison operator defined for TopicPartition). It seems reasonable to just use a HashSet here instead. |
…er groups API (apache#4980) * The consumer groups API should expose group state and coordinator information. This information is needed by administrative tools and scripts that access consume groups. * The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer. * Remove nulls from the API and make all collections immutable. * DescribeConsumerGroupsResult#all should return a result as expected, rather than Void * Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar. Reviewers: Attila Sasvari <[email protected]>, Andras Beni <[email protected]>, Dong Lin <[email protected]>, Jason Gustafson <[email protected]>
The consumer groups API should expose group state. This information is needed by administrative tools and scripts that access consume groups.
The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer.
Remove nulls from the API and make all collections immutable.
DescribeConsumerGroupsResult#all should return a result as expected, rather than Void
Add Builder classes for MemberDescription and ConsumerGroupDescription, so that we can easily add new fields later without having a large number of constructors.
Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as
"The group id The group id does not exist was not found" and similar.
Add integration test