Skip to content

Commit

Permalink
KAFKA-5647; Use KafkaZkClient in ReassignPartitionsCommand and Prefer…
Browse files Browse the repository at this point in the history
…redReplicaLeaderElectionCommand

*  Use KafkaZkClient in ReassignPartitionsCommand
*  Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
*  Updated test classes to use new methods
*  All existing tests should pass

Author: Manikumar Reddy <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#4260 from omkreddy/KAFKA-5647-ADMINCOMMANDS
  • Loading branch information
omkreddy authored and junrao committed Dec 20, 2017
1 parent 35c1be7 commit 488ea4b
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 420 deletions.
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@
<allow pkg="kafka.tools" />
<allow pkg="kafka.utils" />
<allow pkg="kafka.zk" />
<allow pkg="kafka.zookeeper" />
<allow pkg="kafka.log" />
<allow pkg="scala" />
<allow pkg="scala.collection" />
<allow pkg="org.I0Itec.zkclient" />
</subpackage>

<subpackage name="test">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package kafka.admin

import joptsimple.OptionParser
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
import kafka.common.AdminCommandFailedException
import kafka.zk.KafkaZkClient
import kafka.zookeeper.ZooKeeperClient

import collection._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.NodeExistsException

object PreferredReplicaLeaderElectionCommand extends Logging {

Expand Down Expand Up @@ -51,20 +54,19 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)

val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient = null
var zkUtils: ZkUtils = null
var zooKeeperClient: ZooKeeperClient = null
var zkClient: KafkaZkClient = null
try {
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
zkUtils = ZkUtils(zkConnect,
30000,
30000,
JaasUtils.isZkSecurityEnabled())
val time = Time.SYSTEM
zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)

val partitionsForPreferredReplicaElection =
if (!options.has(jsonFileOpt))
zkUtils.getAllPartitions()
zkClient.getAllPartitions()
else
parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)

preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
} catch {
Expand All @@ -77,7 +79,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}

def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicPartition] = {
Json.parseFull(jsonString) match {
case Some(js) =>
js.asJsonObject.get("partitions") match {
Expand All @@ -86,7 +88,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partitions = partitionsRaw.map { p =>
val topic = p("topic").to[String]
val partition = p("partition").to[Int]
TopicAndPartition(topic, partition)
new TopicPartition(topic, partition)
}.toBuffer
val duplicatePartitions = CoreUtils.duplicates(partitions)
if (duplicatePartitions.nonEmpty)
Expand All @@ -98,34 +100,30 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}

def writePreferredReplicaElectionData(zkUtils: ZkUtils,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
val jsonData = ZkUtils.preferredReplicaLeaderElectionZkData(partitionsUndergoingPreferredReplicaElection)
def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) {
try {
zkUtils.createPersistentPath(zkPath, jsonData)
println("Created preferred replica election path with %s".format(jsonData))
zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
} catch {
case _: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
case _: NodeExistsException =>
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
"%s. Aborting operation".format(zkClient.getPreferredReplicaElection.mkString(",")))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
}

class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitionsFromUser: scala.collection.Set[TopicAndPartition]) {
class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {
def moveLeaderToPreferredReplica() = {
try {
val topics = partitionsFromUser.map(_.topic).toSet
val partitionsFromZk = zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) =>
partitions.map(TopicAndPartition(topic, _))
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
partitions.map(new TopicPartition(topic, _))
}.toSet

val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains)
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)

println("Successfully started preferred replica election for partitions %s".format(validPartitions))
invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
Expand Down
Loading

0 comments on commit 488ea4b

Please sign in to comment.