Skip to content

Commit

Permalink
KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for COORDIN…
Browse files Browse the repository at this point in the history
…ATOR_NOT_AVAILABLE error (apache#11019)

This patch improves the error handling in `DeleteConsumerGroupOffsetsHandler`. `COORDINATOR_NOT_AVAILABLE` is not unmapped to trigger a new find coordinator request to be sent out.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
showuon authored Jul 15, 2021
1 parent f413435 commit 46c91f4
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,8 +72,19 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicParti
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

private void validateKeys(
Set<CoordinatorKey> groupIds
) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);

final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
new OffsetDeleteRequestTopic()
Expand All @@ -97,54 +108,67 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;
Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
validateKeys(groupIds);

final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse;
final Errors error = Errors.forCode(response.data().errorCode());

if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

handleGroupError(groupId, error, failed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data().topics().forEach(topic ->
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
response.data().topics().forEach(topic ->
topic.partitions().forEach(partition -> {
Errors partitionError = Errors.forCode(partition.errorCode());
if (!handleError(groupId, partitionError, failed, unmapped)) {
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
}

partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
})
);
if (!partitions.isEmpty())
completed.put(groupId, partitions);

return new ApiResult<>(
Collections.singletonMap(groupId, partitionResults),
Collections.emptyMap(),
Collections.emptyList()
);
}
return new ApiResult<>(completed, failed, unmapped);
}

private boolean handleError(
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> unmapped
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case GROUP_ID_NOT_FOUND:
case INVALID_GROUP_ID:
log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId,
error.exception());
case NON_EMPTY_GROUP:
log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
return true;
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`OffsetDelete` request for group id {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
return true;
case NOT_COORDINATOR:
log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
return true;
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`OffsetDelete` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry.", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
return false;
log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3290,11 +3290,11 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR));
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

final DeleteConsumerGroupOffsetsResult result = env.adminClient()
.deleteConsumerGroupOffsets("groupId", Stream.of(tp1).collect(Collectors.toSet()));
.deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet()));

TestUtils.assertFutureError(result.all(), TimeoutException.class);
}
Expand Down Expand Up @@ -3322,7 +3322,8 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception {
mockClient.prepareResponse(body -> {
firstAttemptTime.set(time.milliseconds());
return true;
}, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR));
}, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));


mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

Expand Down Expand Up @@ -3401,23 +3402,28 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));

/*
* We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse("foo", 0, Errors.NONE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
Expand Down Expand Up @@ -67,48 +69,88 @@ public void testBuildRequest() {
@Test
public void testSuccessfulHandleResponse() {
Map<TopicPartition, Errors> responseData = Collections.singletonMap(t0p0, Errors.NONE);
assertCompleted(handleWithError(Errors.NONE), responseData);
assertCompleted(handleWithGroupError(Errors.NONE), responseData);
}

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}

@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND));
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
public void testFailedHandleResponseWithGroupError() {
assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND));
assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID));
assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP));
}

private OffsetDeleteResponse buildResponse(Errors error) {
@Test
public void testFailedHandleResponseWithPartitionError() {
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC),
handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC));
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED),
handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED));
assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION),
handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
}

private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
OffsetDeleteResponse response = new OffsetDeleteResponse(
new OffsetDeleteResponseData()
.setErrorCode(error.code()));
if (error == Errors.NONE) {
response.data()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName(t0p0.topic())
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(t0p0.partition())
.setErrorCode(error.code())
).iterator()))
).iterator()));
}
return response;
}

private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
OffsetDeleteResponse response = new OffsetDeleteResponse(
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName("t0")
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(error.code())
).iterator()))
).iterator())));
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
new OffsetDeleteResponseTopic()
.setName(t0p0.topic())
.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList(
new OffsetDeleteResponsePartition()
.setPartitionIndex(t0p0.partition())
.setErrorCode(error.code())
).iterator()))
).iterator()))
);
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithError(
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithGroupError(
Errors error
) {
DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetDeleteResponse response = buildResponse(error);
OffsetDeleteResponse response = buildGroupErrorResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithPartitionError(
Errors error
) {
DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetDeleteResponse response = buildPartitionErrorResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

Expand Down Expand Up @@ -139,7 +181,7 @@ private void assertCompleted(
assertEquals(expected, result.completedKeys.get(key));
}

private void assertFailed(
private void assertGroupFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result
) {
Expand All @@ -149,4 +191,20 @@ private void assertFailed(
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}

private void assertPartitionFailed(
Map<TopicPartition, Errors> expectedResult,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result
) {
CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
assertEquals(singleton(key), result.completedKeys.keySet());

// verify the completed value is expected result
Collection<Map<TopicPartition, Errors>> completeCollection = result.completedKeys.values();
assertEquals(1, completeCollection.size());
assertEquals(expectedResult, result.completedKeys.get(key));

assertEquals(emptyList(), result.unmappedKeys);
assertEquals(emptySet(), result.failedKeys.keySet());
}
}

0 comments on commit 46c91f4

Please sign in to comment.