Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6868: Fix buffer underflow and expose group state in the consumer groups API #4980

Merged
merged 7 commits into from
May 21, 2018

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented May 9, 2018

  • The consumer groups API should expose group state. This information is needed by administrative tools and scripts that access consume groups.

  • The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer.

  • Remove nulls from the API and make all collections immutable.

  • DescribeConsumerGroupsResult#all should return a result as expected, rather than Void

  • Add Builder classes for MemberDescription and ConsumerGroupDescription, so that we can easily add new fields later without having a large number of constructors.

  • Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as
    "The group id The group id does not exist was not found" and similar.

  • Add integration test

/**
* The consumer group partition assignor.
*/
public String state() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it makes sense to use an enum instead of having all the is* methods?

Copy link
Contributor

@hachikuji hachikuji May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this too. I think the reason we left it as a string in the protocol is so that we could change existing states or add new states without requiring a version bump. By extension, the AdminClient could just expose the string and be left agnostic to specific states. In retrospect, however, we should have just used an enum in the protocol because we needed to check explicitly for particular states inside tools and now in AdminClient. Given that, I'm inclined to use an enum here and file a jira to change the protocol to use an enum as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that an enum is convenient in some ways. However, it makes the compatibility story a little more complex because if we add a new state, older clients would have to translate it to GroupState.UNKNOWN, or similar.

For example, let's say we add a new state for incremental rebalancing. In that case, we could have rebalancing() return true for that state, and just add a new string. With an enum, we'd have to add a new enum value and then map that to UNKNOWN in older clients.

Curious what you think the best approach is here?

Copy link
Contributor

@ijuma ijuma May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The older client would still return false for rebalancing() in your example since it would not be aware of the new string. To be clear, even if we have an enum, we can still have methods that return true for multiple states.

Having said that, it is annoying not to be able to display the string if the type is unknown. The ideal scenario is hard to do in Java. Something like:

sealed trait State
case object Foo extends State
case object Bar extends State
case class Unknown(value: String) extends State

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a new Java class org.apache.kafka.common.ConsumerGroupState to represent this.

@cmccabe cmccabe changed the title MINOR: The consumer groups API should expose group state. MINOR: Fix buffer underflow and expose group state in the consumer groups API May 10, 2018
@cmccabe cmccabe changed the title MINOR: Fix buffer underflow and expose group state in the consumer groups API KAFKA-6868: Fix buffer underflow and expose group state in the consumer groups API May 11, 2018
}
try {
consumerThread.start
// Test that we can list the new group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extract the assertions with comments to separate methods? For example:
verifyNewConsumerGroupCanBeListed
verifyNewConsumerGroupCanBeDescribed
verifyFakeConsumerGroupIsListedAsDead
verifyConsumerGroupOffsetsAreListed

or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a lot of value in doing this unless we want to reuse the code, which we're not doing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually find it easier to read short, focused test methods, but I see your point. You can ignore this comment then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @asasvari for what it's worth. It is easier to understand and debug small test cases. It is not just about code reuse.

@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) {
final List<MemberDescription> consumers = new ArrayList<>(members.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: memberDescriptions might be a better name

@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) {
final List<MemberDescription> consumers = new ArrayList<>(members.size());

for (DescribeGroupsResponse.GroupMember groupMember : members) {
final PartitionAssignor.Assignment assignment =
if (groupMember.memberAssignment().remaining() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract groupMember.memberAssignment() and protocolType.isEmpty() to final local variables to avoid code (& method call) duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are trivial accessors which are invoked twice at most, so duplicating the call is not a worry

@@ -2406,21 +2406,26 @@ void handleResponse(AbstractResponse abstractResponse) {
final List<MemberDescription> consumers = new ArrayList<>(members.size());

for (DescribeGroupsResponse.GroupMember groupMember : members) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracting this to a private method (with a signature like List getMemberDescriptions(List<DescribeGroupsResponse.GroupMember> members) might be better for readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is really a worry here, since this is a short block of code.

@Override
public Map<String, ConsumerGroupDescription> apply(Void v) {
try {
HashMap<String, ConsumerGroupDescription> map = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base type could be Map<String, ConsumerGroupDescription>, and variable name could be something like consumerGroupDescriptionByGroupId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"consumerGroupDescriptionByGroupId" is too long for a name, but "descriptions" might work here.

private final String partitionAssignor;
private final String state;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerGroupCommand should be updated to use the new KafkaAdminClient (as part of KAFKA-6884). Existing implementation expects "coordinator" and "assignmentStrategy" too in the return value of describeConsumerGroup(), see

GroupState(group, consumerGroupSummary.coordinator, consumerGroupSummary.assignmentStrategy,

Can you please add these fields too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. "coordinator" should be returned here. "partitionAssignor" is already available.

@cmccabe cmccabe force-pushed the fix_consumer_groups_api branch 2 times, most recently from 7281a41 to ed46098 Compare May 14, 2018 16:29
* The consumer group state.
*/
public enum ConsumerGroupState {
UNKNOWN("Unknown"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working with @asasvari I realized there is a similar enumeration in GroupMetadata.scala in the form of case objects of trait GroupState. However, in that file there is one additional value: Empty. Is there a particular reason it is missing from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Empty should be included here.


for (DescribeGroupsResponse.GroupMember groupMember : members) {
final PartitionAssignor.Assignment assignment =
if (groupMember.memberAssignment().remaining() > 0) {
Copy link
Member

@lindong28 lindong28 May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Do you know when would member assignment buffer be empty? The patch excludes a member from the group description if the member assignment if empty. In that case, would it be more reasonable to make change in the server side such that the member is not included in the DescribeGroupsResponse in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's odd that the server sends empty assignments, and probably should be changed. However, the 1.2 AdminClient may be used with a 1.1 or older broker, so we still have to handle the old (arguably broken) behavior. I think we can do a little better here and keep the rest of the member description even if the member assignment is empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If 1.1 broker will send empty assignment in a for a given member in a legitimate DescribeGroupsResponse, is other information of the MemberDescription (e.g. memberId, host) still useful? I am wondering if we should still include the member in the group description instead of discarding it entirely.

Another question is that, if it is not reasonable to return an empty assignment, can we also fix it behavior in the server? If it is not straightforward then we can do it a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will return an empty assignment while the group is rebalancing. Rebalances can sometimes take a little while to complete, and we have found it useful to still be able to show the members when the group is in this state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Ah I see. Thanks for the explanation. The logic seems more reasonable now after we include the member in the ConsumerGroupDescription even if the member's assignment is empty.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments/questions.

*/
public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) {
this.groupId = groupId;
public static class Builder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we use builders for any of the other admin object? Does it need to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we use builders for any of the other admin object?

I would argue the *Options classes (CreateTopicsOptions, etc.) are very similar to builders. For example, you can use CreateTopicsOptions like this: new CreateTopicsOptions().timeoutMs(10000).validateOnly(true) This is useful to let the options evolve over time.

I think ConsumerGroupDescription will almost certainly get new fields over time, so some of the same concerns apply here (although it's less urgent if the constructor is not public)

Does it need to be public?

I guess we can make it package-private for now. Code that needs this for unit tests could use MockAdminClient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, builders are nice for the reasons you mention. However, since it wasn't discussed in KIP-222, I think keeping it package private for now might be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@Override
public Map<String, ConsumerGroupDescription> apply(Void v) {
try {
Map<String, ConsumerGroupDescription> descriptions = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: guess we could pass the size to HashMap

@@ -2347,11 +2348,11 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri

@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this patch, but we don't seem to be checking the error code here? Maybe we can implement a simple behavior for now and fail the future for this group if the error is not NONE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I will add some basic error handling here. Another thing we missed in the original patch... 😞

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error like COORDINATOR_NOT_AVAILABLE, we should not send a describeConsumerGroups request. I ran into this issue when I was working on KAFKA-6884: testResetOffsetsNotExistingGroup in ResetConsumerGroupOffsetTest failed with a TimeoutException. I believe the reason for this is that LeaderNotAvailableException is a retriable exception.

ConsumerProtocol.deserializeAssignment(
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));

ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we read and wrap? Could we just duplicate the buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let's just duplicate it.

import java.util.List;

/**
* A description of the assignments of a specific group member.
*/
public class MemberAssignment {
private final List<TopicPartition> topicPartitions;
private final Collection<TopicPartition> topicPartitions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you made this change. Lists are a bit more convenient for users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, returning a List implies that ordering is important, which it's not in this case. Using Collection gives us the freedom to change it in the future. Most things are just as convenient on either one, right? You can iterate using the standard for loop, iterators, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with Collection as that equals is not defined. So, one should be careful about returning it in public APIs. It's OK to receive it as a parameter though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think Set makes sense here, since ordering doesn't matter


public String groupId() {
return groupId;
public GroupIdNotFoundException(String message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. This is technically a public api as of 1.1, though I don't think it was raised from any API in that release. Perhaps it is justifiable since the expectation was never correctly implemented anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. The API could never work, though (and still never can). ApiExceptions need to be deserializable from Error enums without additional information. Additional information is a nice-to-have, but may or may not be provided. If anyone tried to use this in 1.1, it wouldn't work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess I'm ok with it since the API couldn't work as intended.

}
try {
consumerThread.start
// Test that we can list the new group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @asasvari for what it's worth. It is easier to understand and debug small test cases. It is not just about code reuse.

* The consumer groups API should expose group state.  This information is
needed by administrative tools and scripts that access consume groups.

* The partition assignment will be empty when the group is rebalancing.
Fix an issue where the adminclient attempted to deserialize this empty
buffer.

* Remove nulls from the API and make all collections immutable.

* DescribeConsumerGroupsResult#all should return a result as expected,
rather than Void

* Add Builder classes for MemberDescription and ConsumerGroupDescription,
so that we can easily add new fields later without having a large number of
constructors.

* Fix exception text for GroupIdNotFoundException,
GroupNotEmptyException. It was being filled in as
"The group id The group id does not exist was not found" and similar.
ConsumerProtocol.deserializeAssignment(
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));

Set<TopicPartition> partitions = Collections.<TopicPartition>emptySet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: it seems that <TopicPartition> is not needed?

this.topicPartitions = topicPartitions;
MemberAssignment(Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions == null ? Collections.<TopicPartition>emptySet() :
Collections.unmodifiableSet(new TreeSet<TopicPartition>(topicPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: TopicPartition in <> is probably not needed.

Also, do we need any specific feature from TreeSet<> here? If we don't need order of elements then should we use HashSet to make the code a bit more consistent and efficient?

Copy link
Contributor Author

@cmccabe cmccabe May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used TreeSet here to give a consistent order of traversal. But actually, HashSet should be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Sounds good.

*/
public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) {
this.groupId = groupId;
ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: It may be OK. But just want to double check the recommended code style. If there are many parameters in the constructor, should we consistently put each parameter in a new line similar to e.g. LogManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once you have a chance to address Dong's feedback.

@hachikuji
Copy link
Contributor

Also note the failing test case.

@cmccabe
Copy link
Contributor Author

cmccabe commented May 18, 2018

Yeah, the failing test case was because of the TreeSet (no comparison operator defined for TopicPartition). It seems reasonable to just use a HashSet here instead.

@hachikuji hachikuji merged commit 16ad358 into apache:trunk May 21, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…er groups API (apache#4980)

* The consumer groups API should expose group state and coordinator information.  This information is needed by administrative tools and scripts that access consume groups.

* The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer.

* Remove nulls from the API and make all collections immutable.

* DescribeConsumerGroupsResult#all should return a result as expected, rather than Void

* Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar.

Reviewers: Attila Sasvari <[email protected]>, Andras Beni <[email protected]>, Dong Lin <[email protected]>, Jason Gustafson <[email protected]>
@cmccabe cmccabe deleted the fix_consumer_groups_api branch May 20, 2019 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants