Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller #4668

Merged
merged 3 commits into from
Mar 30, 2018
Merged

KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller #4668

merged 3 commits into from
Mar 30, 2018

Conversation

gitlw
Copy link
Contributor

@gitlw gitlw commented Mar 8, 2018

This patch tries to speed up the inefficient functions identified in Kafka-6630 by grouping partitions in the ControllerContext.partitionReplicaAssignment variable by topics. Hence trying to find all replicas for a topic won't need to go through all the replicas in the cluster.

Passed all tests using "gradle testAll"

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested a review from junrao March 9, 2018 23:48
@gitlw
Copy link
Contributor Author

gitlw commented Mar 16, 2018

@onurkaraman Can you please take a look at this patch? Thanks.

@gitlw
Copy link
Contributor Author

gitlw commented Mar 23, 2018

@onurkaraman @junrao Can you please take a look at this patch? Thanks!

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gitlw Thanks for the patch. Sorry for the delay. A few comments below.

.getOrElse(topicPartition.partition, Seq.empty)
}

def clearPartitionReplicaAssignment() = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, for methods with no return value, we want to specify Unit =. Also, should we make this method more general to clear other fields such as allTopics, partitionsBeingReassigned and replicasOnOfflineDirs, instead of letting the caller do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the return type Unit and converted the clearing method to clear other topic states.

.put(topicPartition.partition, newReplicas)
}

def partitionReplicaAssignmentForTopic(topic : String) : mutable.Map[TopicPartition, Seq[Int]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better not to return a mutable map.

}
}

def removePartitionReplicaAssignmentForTopic(topic : String) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this is the same method as removeTopic? The latter seems more complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method and replaced it with removeTopic instead.

@@ -100,7 +140,7 @@ class ControllerContext {

def removeTopic(topic: String) = {
partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic }
partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic }
partitionReplicaAssignmentUnderlying.remove(topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update replicasOnOfflineDirs accordingly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should update replicasOnOfflineDirs accordingly. However I don't want to simply iterate through the current replicasOnOfflineDirs to filter out the topic since doing that will be very slow in a large cluster.
Since clearing the replicasOnOfflineDirs is an orthogonal change, I'll think more about whether it's important and if so how to do it. I feel a separate PR is probably better. Comment?

partitionReplicaAssignmentUnderlying = mutable.Map.empty
}

def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas : Seq[Int]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, we want to remove the space before :.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}.keySet.map(_.topic)
val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
val replicasForTopic = controllerContext.replicasForTopic(topic)
replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, new TopicPartition(topic, r.partition)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we could just use r.topicPartition instead of creating a new TopicPartition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@@ -1155,11 +1161,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
if (!isActive) {
0
} else {
controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
controllerContext.allPartitions.count { topicAndPartition =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicAndPartition => topicPartition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@@ -1312,14 +1320,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
if (!isActive) return
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
controllerContext.partitionReplicaAssignment(p._1).isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can change p to case (topicPartion, _) and use topicPartition instead of p._1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's better.

if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
"since it is currently being deleted")
else {
if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
partitionsToBeAdded.foreach { case (topicAndPartition, assignedReplicas) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicAndPartition => topicPartition ?

@ijuma
Copy link
Contributor

ijuma commented Mar 28, 2018

@gitlw, are you planning to address the review feedback from @junrao?

@gitlw
Copy link
Contributor Author

gitlw commented Mar 28, 2018

@ijuma Yes, sorry about the delay. I'll update this PR soon.

@gitlw
Copy link
Contributor Author

gitlw commented Mar 29, 2018

@junrao Can you please take another look? Thanks!

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gitlw : Thanks for the updated patch. Just a couple of more minor comments.

.getOrElse(topicPartition.partition, Seq.empty)
}

def clearTopicsState(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty

private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty

def partitionReplicaAssignment(topicPartition: TopicPartition) : Seq[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the space before : Seq[Int] and some other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gitlw : Thanks for the patch. LGTM

@junrao junrao merged commit 2ef6ee2 into apache:trunk Mar 30, 2018
@gitlw gitlw deleted the speedup_processing_stop_replica_response branch March 30, 2018 06:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants