Skip to content

Commit

Permalink
KAFKA-6193; Only delete reassign_partitions znode after reassignment …
Browse files Browse the repository at this point in the history
…is complete

- Ensure that `partitionsBeingReassigned` is fully populated before
`removePartitionFromReassignedPartitions` is invoked. This is
necessary to avoid premature deletion of the `reassign_partitions`
znode.
- Modify and add tests to verify the fixes.
- Add documentation.
- Use `info` log message if assignedReplicas == newReplicas and
remove control flow based on exceptions.
- General logging improvements.
- Simplify `initializePartitionAssignment` by relying on logic already
present in `maybeTriggerPartitionReassignment`.

Author: Ismael Juma <[email protected]>

Reviewers: Jun Rao <[email protected]>, Guozhang Wang <[email protected]>

Closes apache#4283 from ijuma/kafka-6193-flaky-shouldPerformMultipleReassignmentOperationsOverVariousTopics
  • Loading branch information
ijuma committed Dec 6, 2017
1 parent b690933 commit d543e19
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ private void processDisconnection(List<ClientResponse> responses, String nodeId,
break; // Disconnections in other states are logged at debug level in Selector
}
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", request.request,
request.header.correlationId(), nodeId);
log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleDisconnection(request.destination);
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class RequestSendThread(val controllerId: Int,
name: String)
extends ShutdownableThread(name = name) {

logIdent = s"[RequestSendThread controllerId=$controllerId] "

private val socketTimeoutMs = config.controllerSocketTimeoutMs

override def doWork(): Unit = {
Expand Down Expand Up @@ -248,8 +250,9 @@ class RequestSendThread(val controllerId: Int,

val response = clientResponse.responseBody

stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace("Received response " +
s"${response.toString(requestHeader.apiVersion)} for a request sent to broker $brokerNode")
stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response " +
s"${response.toString(requestHeader.apiVersion)} for request $api with correlation id " +
s"${requestHeader.correlationId} sent to broker $brokerNode")

if (callback != null) {
callback(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.collection._
object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
}
class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventProcessedListener: ControllerEvent => Unit) {

@volatile private var _state: ControllerState = ControllerState.Idle
Expand All @@ -56,6 +56,8 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
}

class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
logIdent = s"[ControllerEventThread controllerId=$controllerId] "

override def doWork(): Unit = {
queue.take() match {
case KafkaController.ShutdownEventThread => initiateShutdown()
Expand Down
133 changes: 73 additions & 60 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private[controller] val kafkaScheduler = new KafkaScheduler(1)

// visible for testing
private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())
private[controller] val eventManager = new ControllerEventManager(config.brokerId,
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())

val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
Expand Down Expand Up @@ -234,7 +234,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
partitionStateMachine.startup()

info(s"Ready to serve as the new controller with epoch $epoch")
maybeTriggerPartitionReassignment()
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
onPreferredReplicaElection(pendingPreferredReplicaElections)
Expand Down Expand Up @@ -489,42 +489,62 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionFromReassignedPartitions(topicPartition)
info(s"Removed partition $topicPartition from the list of reassigned partitions in zookeeper")
controllerContext.partitionsBeingReassigned.remove(topicPartition)
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
}
}

private def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicPartition.topic
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if (assignedReplicas == newReplicas) {
throw new KafkaException(s"Partition $topicPartition to be reassigned is already assigned to replicas " +
s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment")
} else {
info(s"Handling reassignment of partition $topicPartition to new replicas ${newReplicas.mkString(",")}")
// first register ISR change listener
reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
controllerContext.partitionsBeingReassigned.put(topicPartition, reassignedPartitionContext)
// mark topic ineligible for deletion for the partitions being reassigned
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(topicPartition, reassignedPartitionContext)
}
case None => throw new KafkaException(s"Attempt to reassign partition $topicPartition that doesn't exist")
/**
* Trigger partition reassignment for the provided partitions if the assigned replicas are not the same as the
* reassigned replicas (as defined in `ControllerContext.partitionsBeingReassigned`) and if the topic has not been
* deleted.
*
* `partitionsBeingReassigned` must be populated with all partitions being reassigned before this method is invoked
* as explained in the method documentation of `removePartitionFromReassignedPartitions` (which is invoked by this
* method).
*
* @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
*/
private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
topicPartitions.foreach { tp =>
if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
error(s"Skipping reassignment of $tp since the topic is currently being deleted")
removePartitionFromReassignedPartitions(tp)
} else {
val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")
}
val newReplicas = reassignedPartitionContext.newReplicas
val topic = tp.topic
controllerContext.partitionReplicaAssignment.get(tp) match {
case Some(assignedReplicas) =>
if (assignedReplicas == newReplicas) {
info(s"Partition $tp to be reassigned is already assigned to replicas " +
s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
removePartitionFromReassignedPartitions(tp)
} else {
try {
info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
// first register ISR change listener
reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
// mark topic ineligible for deletion for the partitions being reassigned
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
case e: Throwable =>
error(s"Error completing reassignment of partition $tp", e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(tp)
}
}
case None =>
error(s"Ignoring request to reassign partition $tp that doesn't exist.")
removePartitionFromReassignedPartitions(tp)
}
}
} catch {
case e: Throwable =>
error(s"Error completing reassignment of partition $topicPartition", e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicPartition)
}
}

Expand Down Expand Up @@ -624,22 +644,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
val partitionsBeingReassigned = zkClient.getPartitionReassignment
// check if they are already completed or topic was deleted
val reassignedPartitions = partitionsBeingReassigned.filter { case (tp, reassignmentReplicas) =>
controllerContext.partitionReplicaAssignment.get(tp) match {
case None => true // topic deleted
case Some(currentReplicas) => currentReplicas == reassignmentReplicas // reassignment completed
}
}.keys
reassignedPartitions.foreach(removePartitionFromReassignedPartitions)
val partitionsToReassign = partitionsBeingReassigned -- reassignedPartitions
controllerContext.partitionsBeingReassigned ++= partitionsToReassign.map { case (tp, newReplicas) =>
info(s"Partitions being reassigned: $partitionsBeingReassigned")

controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp)
tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
}
info(s"Partitions being reassigned: $partitionsBeingReassigned")
info(s"Partitions already reassigned: $reassignedPartitions")
info(s"Resuming reassignment of partitions: $partitionsToReassign")
}

private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
Expand All @@ -654,12 +664,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
(topicsToBeDeleted, topicsIneligibleForDeletion)
}

private def maybeTriggerPartitionReassignment() {
controllerContext.partitionsBeingReassigned.foreach { case (tp, reassignContext) =>
initiateReassignReplicasForTopicPartition(tp, reassignContext)
}
}

private def startChannelManager() {
controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
stateChangeLogger, threadNamePrefix)
Expand Down Expand Up @@ -795,13 +799,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}

/**
* Remove partition from partitions being reassigned in ZooKeeper and ControllerContext. If the partition reassignment
* is complete (i.e. there is no other partition with a reassignment in progress), the reassign_partitions znode
* is deleted.
*
* `ControllerContext.partitionsBeingReassigned` must be populated with all partitions being reassigned before this
* method is invoked to avoid premature deletion of the `reassign_partitions` znode.
*/
private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
reassignContext.unregisterReassignIsrChangeHandler(zkClient)
}

val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition

info(s"Removing partition $topicPartition from the list of reassigned partitions in zookeeper")

// write the new list to zookeeper
if (updatedPartitionsBeingReassigned.isEmpty) {
info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
Expand Down Expand Up @@ -1281,17 +1295,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// the `path exists` check for free
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
val partitionReassignment = zkClient.getPartitionReassignment
val partitionsToBeReassigned = partitionReassignment -- controllerContext.partitionsBeingReassigned.keys
partitionsToBeReassigned.foreach { case (tp, newReplicas) =>
if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
error(s"Skipping reassignment of $tp since the topic is currently being deleted")
removePartitionFromReassignedPartitions(tp)
} else {
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
tp)
initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
}

// Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking
// `maybeTriggerPartitionReassignment` (see method documentation for the reason)
partitionReassignment.foreach { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
tp)
controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
}

maybeTriggerPartitionReassignment(partitionReassignment.keySet)
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -672,18 +672,15 @@ class LogManager(logDirs: Seq[File],
else
currentLogs.put(topicPartition, log)

info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
topicPartition.partition,
logDir,
config.originals.asScala.mkString(", ")))
info(s"Created log for partition $topicPartition in $logDir with properties " +
s"{${config.originals.asScala.mkString(", ")}}.")
// Remove the preferred log dir since it has already been satisfied
preferredLogDirs.remove(topicPartition)

log
} catch {
case e: IOException =>
val msg = s"Error while creating log for $topicPartition in dir ${logDir}"
val msg = s"Error while creating log for $topicPartition in dir $logDir"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
Expand Down
Loading

0 comments on commit d543e19

Please sign in to comment.