Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] Improve consumer logging on LeaveGroup #5420

Merged
merged 2 commits into from
Jul 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add GroupCoordinator logging, and address review comments
  • Loading branch information
dhruvilshah3 committed Jul 27, 2018
commit 6dc9c0c01496cf166f1711ce696845f3220af87d
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public CommitFailedException() {
"rebalanced and assigned the partitions to another member. This means that the time " +
"between subsequent calls to poll() was longer than the configured max.poll.interval.ms, " +
"which typically implies that the poll loop is spending too much time message processing. " +
"You can address this either by increasing the session timeout or by reducing the maximum " +
"You can address this either by increasing max.poll.interval.ms or by reducing the maximum " +
"size of batches returned in poll() with max.poll.records.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this description is completely accurate as the commit can fail when the coordinator has kicked us out of the group as well (for example, on session timeout). Though I am not sure if we have a great way of distinguishing between these cases based on the local consumer state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree. max.poll.interval.ms should be tuned more instead of session timeout given the heartbeat thread mechanism.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,12 @@ public void run() {
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
log.info("Poll timeout expired");
log.warn("This member will leave the group because consumer poll timeout has expired. This " +
"means the time between subsequent calls to poll() was longer than the configured " +
"max.poll.interval.ms, which typically implies that the poll loop is spending too " +
"much time processing messages. You can address this either by increasing " +
"max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +
"with max.poll.records.");
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(CompletingRebalance) && generationId == group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
Expand Down Expand Up @@ -333,7 +333,7 @@ class GroupCoordinator(val brokerId: Int,
val member = group.get(memberId)
removeHeartbeatForLeavingMember(group, member)
debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group")
removeMemberAndUpdateGroup(group, member)
removeMemberAndUpdateGroup(group, member, s"removing member $memberId on LeaveGroup")
responseCallback(Errors.NONE)
}
}
Expand Down Expand Up @@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
callback: JoinCallback): MemberMetadata = {
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
Expand All @@ -710,7 +710,7 @@ class GroupCoordinator(val brokerId: Int,
group.newMemberAdded = true

group.add(member)
maybePrepareRebalance(group)
maybePrepareRebalance(group, s"Adding new member $memberId")
member
}

Expand All @@ -720,17 +720,17 @@ class GroupCoordinator(val brokerId: Int,
callback: JoinCallback) {
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
maybePrepareRebalance(group)
maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")
}

private def maybePrepareRebalance(group: GroupMetadata) {
private def maybePrepareRebalance(group: GroupMetadata, reason: String) {
group.inLock {
if (group.canRebalance)
prepareRebalance(group)
prepareRebalance(group, reason)
}
}

private def prepareRebalance(group: GroupMetadata) {
private def prepareRebalance(group: GroupMetadata, reason: String) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
Expand All @@ -747,18 +747,18 @@ class GroupCoordinator(val brokerId: Int,

group.transitionTo(PreparingRebalance)

info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")

val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) {
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String) {
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
case Stable | CompletingRebalance => maybePrepareRebalance(group)
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
Expand Down Expand Up @@ -837,7 +837,7 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member)
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
Expand Down