Skip to content

Commit

Permalink
KAFKA-13972; Ensure replica state deleted after reassignment cancella…
Browse files Browse the repository at this point in the history
…tion (#13107)

When a reassignment is cancelled, we need to delete the partition state of adding replicas. Failing to do so causes "stray" replicas which take up disk space and can cause topicId conflicts if the topic is later recreated. Currently, this logic does not work because the leader epoch does not always get bumped after cancellation. Without a leader epoch bump, the replica will ignore StopReplica requests sent by the controller and the replica may remain online.

In this patch, we fix the problem by loosening the epoch check on the broker side when a StopReplica request is received. Instead of ignoring the request when the request epoch matches the current epoch, the request will be accepted.

Note, this problem only affects the ZK controller. The integration tests added here nevertheless cover both metadata modes.

Reviewers:  David Jacot <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
hachikuji authored Jan 18, 2023
1 parent b7e3ee7 commit 653e284
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class ReplicaManager(val config: KafkaConfig,
// epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch > currentLeaderEpoch) {
requestLeaderEpoch >= currentLeaderEpoch) {
stoppedPartitions += topicPartition -> deletePartition
// Assume that everything will go right. It is overwritten in case of an error.
responseMap.put(topicPartition, Errors.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
"""]}"""

val foo0 = new TopicPartition("foo", 0)
val bar0 = new TopicPartition("bar", 0)

// Check that the assignment has not yet been started yet.
val initialAssignment = Map(
new TopicPartition("foo", 0) ->
PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
new TopicPartition("bar", 0) ->
PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
foo0 -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
bar0 -> PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
)
waitForVerifyAssignment(cluster.adminClient, assignment, false,
VerifyAssignmentResult(initialAssignment))
Expand All @@ -122,10 +123,8 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
val finalAssignment = Map(
new TopicPartition("foo", 0) ->
PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
new TopicPartition("bar", 0) ->
PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
foo0 -> PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
bar0 -> PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
)

val verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, false)
Expand All @@ -137,6 +136,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {

assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))

// Verify that partitions are removed from brokers no longer assigned
verifyReplicaDeleted(topicPartition = foo0, replicaId = 2)
verifyReplicaDeleted(topicPartition = bar0, replicaId = 1)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
Expand Down Expand Up @@ -296,10 +299,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCancellation(quorum: String): Unit = {
val foo0 = new TopicPartition("foo", 0)
val baz1 = new TopicPartition("baz", 1)

cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 200)
cluster.produceMessages("baz", 1, 200)
cluster.produceMessages(foo0.topic, foo0.partition, 200)
cluster.produceMessages(baz1.topic, baz1.partition, 200)
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" +
Expand All @@ -314,14 +320,11 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
// from completing before this runs.
waitForVerifyAssignment(cluster.adminClient, assignment, true,
VerifyAssignmentResult(Map(
new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
foo0 -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
baz1 -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
true, Map(), false))
// Cancel the reassignment.
assertEquals((Set(
new TopicPartition("foo", 0),
new TopicPartition("baz", 1)
), Set()), runCancelAssignment(cluster.adminClient, assignment, true))
assertEquals((Set(foo0, baz1), Set()), runCancelAssignment(cluster.adminClient, assignment, true))
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
// Cancelling the reassignment again should reveal nothing to cancel.
Expand All @@ -330,6 +333,62 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(cluster.adminClient, assignment, false).partsOngoing)
// Verify that the partition is removed from cancelled replicas
verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
verifyReplicaDeleted(topicPartition = baz1, replicaId = 3)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCancellationWithAddingReplicaInIsr(quorum: String): Unit = {
val foo0 = new TopicPartition("foo", 0)

cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages(foo0.topic, foo0.partition, 200)

// The reassignment will bring replicas 3 and 4 into the replica set and remove 1 and 2.
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,3,4],"log_dirs":["any","any","any"]}""" +
"""]}"""

// We will throttle replica 4 so that only replica 3 joins the ISR
TestUtils.setReplicationThrottleForPartitions(
cluster.adminClient,
brokerIds = Seq(4),
partitions = Set(foo0),
throttleBytes = 1
)

// Execute the assignment and wait for replica 3 (only) to join the ISR
runExecuteAssignment(
cluster.adminClient,
additional = false,
reassignmentJson = assignment
)
TestUtils.waitUntilTrue(
() => TestUtils.currentIsr(cluster.adminClient, foo0) == Set(0, 1, 2, 3),
msg = "Timed out while waiting for replica 3 to join the ISR"
)

// Now cancel the assignment and verify that the partition is removed from cancelled replicas
assertEquals((Set(foo0), Set()), runCancelAssignment(cluster.adminClient, assignment, preserveThrottles = true))
verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
verifyReplicaDeleted(topicPartition = foo0, replicaId = 4)
}

private def verifyReplicaDeleted(
topicPartition: TopicPartition,
replicaId: Int
): Unit = {
def isReplicaStoppedAndDeleted(): Boolean = {
val server = cluster.servers(replicaId)
val partition = server.replicaManager.getPartition(topicPartition)
val log = server.logManager.getLog(topicPartition)
partition == HostedPartition.None && log.isEmpty
}
TestUtils.waitUntilTrue(isReplicaStoppedAndDeleted,
msg = s"Timed out waiting for replica $replicaId of $topicPartition to be deleted")
}

private def waitForLogDirThrottle(throttledBrokers: Set[Int], logDirThrottle: Long): Unit = {
Expand Down Expand Up @@ -541,8 +600,8 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
private def runExecuteAssignment(adminClient: Admin,
additional: Boolean,
reassignmentJson: String,
interBrokerThrottle: Long,
replicaAlterLogDirsThrottle: Long) = {
interBrokerThrottle: Long = -1,
replicaAlterLogDirsThrottle: Long = -1) = {
println(s"==> executeAssignment(adminClient, additional=${additional}, " +
s"reassignmentJson=${reassignmentJson}, " +
s"interBrokerThrottle=${interBrokerThrottle}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2766,7 +2766,7 @@ class ReplicaManagerTest {

@Test
def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(1, false, false, Errors.FENCED_LEADER_EPOCH)
testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
}

@Test
Expand Down Expand Up @@ -2796,7 +2796,7 @@ class ReplicaManagerTest {

@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(1, true, false, Errors.FENCED_LEADER_EPOCH)
testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
}

@Test
Expand Down
21 changes: 14 additions & 7 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1971,16 +1971,23 @@ object TestUtils extends Logging {
)
}

def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
val description = admin.describeTopics(Set(partition.topic).asJava)
.allTopicNames
.get
.asScala

description
.values
.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
.map(_.id)
.toSet
}

def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = {
waitUntilTrue(
() => {
val description = client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
val isr = description
.values
.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
.map(_.id)
.toSet

val isr = currentIsr(client, partition)
brokerIds.subsetOf(isr)
},
s"Expected brokers $brokerIds to be in the ISR for $partition"
Expand Down

0 comments on commit 653e284

Please sign in to comment.