Skip to content

Commit

Permalink
KAFKA-13072: Make RemoveMembersFromConsumerGroupHandler unmap for COO…
Browse files Browse the repository at this point in the history
…RDINATOR_NOT_AVAILABLE error (apache#11035)

This patch improve the error handling in `RemoveMembersFromConsumerGroupHandler` and ensures that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
showuon authored Jul 15, 2021
1 parent f7cf4a4 commit 921a342
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,8 +69,18 @@ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<MemberIden
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

private void validateKeys(
Set<CoordinatorKey> groupIds
) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) {
public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
return new LeaveGroupRequest.Builder(groupId.idValue, members);
}

Expand All @@ -79,54 +90,63 @@ public ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);
final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;
Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();

final Errors error = Errors.forCode(response.data().errorCode());
final Errors error = response.topLevelError();
if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

handleGroupError(groupId, error, failed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap));
} else {
final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
for (MemberResponse memberResponse : response.memberResponses()) {
memberErrors.put(new MemberIdentity()
.setMemberId(memberResponse.memberId())
.setGroupInstanceId(memberResponse.groupInstanceId()),
Errors.forCode(memberResponse.errorCode()));

}
completed.put(groupId, memberErrors);

return new ApiResult<>(
Collections.singletonMap(groupId, memberErrors),
Collections.emptyMap(),
Collections.emptyList()
);
}
return new ApiResult<>(completed, failed, unmapped);
}

private void handleError(
private void handleGroupError(
CoordinatorKey groupId,
Errors error, Map<CoordinatorKey,
Throwable> failed,
List<CoordinatorKey> unmapped
Errors error,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId,
error.exception());
log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;

case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`LeaveGroup` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("LeaveGroup request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`LeaveGroup` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;

default:
log.error("Received unexpected error for group {} in `LeaveGroup` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `LeaveGroup` response"));
break;
log.error("`LeaveGroup` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3589,7 +3589,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception {
Collection<MemberToRemove> membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2"));

final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup(
"groupId", new RemoveMembersFromConsumerGroupOptions(membersToRemove));
GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove));

TestUtils.assertFutureError(result.all(), TimeoutException.class);
}
Expand Down Expand Up @@ -3656,10 +3656,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())));
Expand All @@ -3668,6 +3664,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
* We need to return two responses here, one for NOT_COORDINATOR call when calling remove member
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
Expand All @@ -3676,6 +3674,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

env.kafkaClient().prepareResponse(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));

env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

MemberResponse memberResponse = new MemberResponse()
.setGroupInstanceId("instance-1")
.setErrorCode(Errors.NONE.code());
Expand Down Expand Up @@ -3734,13 +3739,10 @@ public void testRemoveMembersFromGroup() throws Exception {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

// Retriable errors should be retried
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,72 @@ public void testBuildRequest() {
@Test
public void testSuccessfulHandleResponse() {
Map<MemberIdentity, Errors> responseData = Collections.singletonMap(m1, Errors.NONE);
assertCompleted(handleWithError(Errors.NONE), responseData);
assertCompleted(handleWithGroupError(Errors.NONE), responseData);
}

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}

@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(UnknownServerException.class, handleWithError(Errors.UNKNOWN_SERVER_ERROR));
assertFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(UnknownServerException.class, handleWithGroupError(Errors.UNKNOWN_SERVER_ERROR));
}

@Test
public void testFailedHandleResponseInMemberLevel() {
assertMemberFailed(Errors.FENCED_INSTANCE_ID, handleWithMemberError(Errors.FENCED_INSTANCE_ID));
assertMemberFailed(Errors.UNKNOWN_MEMBER_ID, handleWithMemberError(Errors.UNKNOWN_MEMBER_ID));
}

private LeaveGroupResponse buildResponse(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(error.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(error.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
new LeaveGroupResponseData()
.setErrorCode(error.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(Errors.NONE.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithError(
private LeaveGroupResponse buildResponseWithMemberError(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(error.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithGroupError(
Errors error
) {
RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
LeaveGroupResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithMemberError(
Errors error
) {
RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
LeaveGroupResponse response = buildResponseWithMemberError(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response);
}

private void assertUnmapped(
AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> result
) {
Expand Down Expand Up @@ -140,4 +166,16 @@ private void assertFailed(
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}

private void assertMemberFailed(
Errors expectedError,
AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> result
) {
Map<MemberIdentity, Errors> expectedResponseData = Collections.singletonMap(m1, expectedError);
CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
assertEquals(expectedResponseData, result.completedKeys.get(key));
}
}

0 comments on commit 921a342

Please sign in to comment.