Skip to content

Commit

Permalink
KAFKA-6073; Use ZookeeperClient in KafkaApis
Browse files Browse the repository at this point in the history
I kept zkUtils for the call to AdminUtils.createTopic(). AdminUtils can be done in another PR.

Is there a reason why we use TopicAndPartition instead of TopicPartition in KafkaControllerZkUtils ?

Author: Mickael Maison <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes apache#4111 from mimaison/KAFKA-6073
  • Loading branch information
mimaison authored and junrao committed Oct 30, 2017
1 parent f4e9c84 commit 9504af7
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 17 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.controller.KafkaController
import kafka.controller.{KafkaController}
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
Expand All @@ -38,6 +38,7 @@ import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendActi
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
Expand Down Expand Up @@ -71,6 +72,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val zkUtils: ZkUtils,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
val metadataCache: MetadataCache,
Expand Down Expand Up @@ -300,12 +302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// for version 0 always store offsets to ZK
val responseInfo = authorizedTopicRequestInfo.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
else {
zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
zkClient.setOrCreateConsumerOffset(offsetCommitRequest.groupId, topicPartition, partitionData.offset)
(topicPartition, Errors.NONE)
}
} catch {
Expand Down Expand Up @@ -1006,12 +1007,11 @@ class KafkaApis(val requestChannel: RequestChannel,

// version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
try {
if (!metadataCache.contains(topicPartition.topic))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
Expand Down
101 changes: 90 additions & 11 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.common.TopicPartition

/**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
Expand Down Expand Up @@ -226,7 +227,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @return SetDataResponse
*/
def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1)
val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.NoVersion)
retryRequestUntilConnected(setDataRequest)
}

Expand Down Expand Up @@ -284,7 +285,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
*/
def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.NoVersion)
}
retryRequestsUntilConnected(deleteRequests)
}
Expand Down Expand Up @@ -329,7 +330,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param topics the topics to remove.
*/
def deleteTopicDeletions(topics: Seq[String]): Unit = {
val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1))
val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.NoVersion))
retryRequestsUntilConnected(deleteRequests)
}

Expand All @@ -355,7 +356,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @return SetDataResponse
*/
def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1)
val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), ZkVersion.NoVersion)
retryRequestUntilConnected(setDataRequest)
}

Expand All @@ -374,7 +375,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Deletes the partition reassignment znode.
*/
def deletePartitionReassignment(): Unit = {
val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.NoVersion)
retryRequestUntilConnected(deleteRequest)
}

Expand Down Expand Up @@ -451,7 +452,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
*/
def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.NoVersion)
}
retryRequestsUntilConnected(deleteRequests)
}
Expand All @@ -476,7 +477,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Deletes the preferred replica election znode.
*/
def deletePreferredReplicaElection(): Unit = {
val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.NoVersion)
retryRequestUntilConnected(deleteRequest)
}

Expand All @@ -500,7 +501,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Deletes the controller znode.
*/
def deleteController(): Unit = {
val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.NoVersion)
retryRequestUntilConnected(deleteRequest)
}

Expand Down Expand Up @@ -534,7 +535,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* @param topics the topics whose configs we wish to delete.
*/
def deleteTopicConfigs(topics: Seq[String]): Unit = {
val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1))
val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ZkVersion.NoVersion))
retryRequestsUntilConnected(deleteRequests)
}

Expand Down Expand Up @@ -591,18 +592,96 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
zooKeeperClient.close()
}

private def deleteRecursive(path: String): Unit = {
/**
* Get the committed offset for a topic partition and group
* @param group the group we wish to get offset for
* @param topicPartition the topic partition we wish to get the offset for
* @return optional long that is Some if there was an offset committed for topic partition, group and None otherwise.
*/
def getConsumerOffset(group: String, topicPartition: TopicPartition): Option[Long] = {
val getDataRequest = GetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition))
val getDataResponse = retryRequestUntilConnected(getDataRequest)
if (getDataResponse.resultCode == Code.OK) {
ConsumerOffset.decode(getDataResponse.data)
} else if (getDataResponse.resultCode == Code.NONODE) {
None
} else {
throw getDataResponse.resultException.get
}
}

/**
* Set the committed offset for a topic partition and group
* @param group the group whose offset is being set
* @param topicPartition the topic partition whose offset is being set
* @param offset the offset value
*/
def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
val setDataResponse = setConsumerOffset(group, topicPartition, offset)
if (setDataResponse.resultCode == Code.NONODE) {
val createResponse = createConsumerOffset(group, topicPartition, offset)
if (createResponse.resultCode != Code.OK) {
throw createResponse.resultException.get
}
} else if (setDataResponse.resultCode != Code.OK) {
throw setDataResponse.resultException.get
}
}

private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = {
val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition), ConsumerOffset.encode(offset), ZkVersion.NoVersion)
retryRequestUntilConnected(setDataRequest)
}

private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): CreateResponse = {
val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode == Code.NONODE) {
val indexOfLastSlash = path.lastIndexOf("/")
if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
createRecursive(path.substring(0, indexOfLastSlash))
createResponse = retryRequestUntilConnected(createRequest)
}
createResponse
}

private[zk] def deleteRecursive(path: String): Unit = {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
if (getChildrenResponse.resultCode == Code.OK) {
getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion))
if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
throw deleteResponse.resultException.get
}
} else if (getChildrenResponse.resultCode != Code.NONODE) {
throw getChildrenResponse.resultException.get
}
}

private[zk] def pathExists(path: String): Boolean = {
val getDataRequest = GetDataRequest(path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode == Code.OK
}

private[zk] def createRecursive(path: String): Unit = {
val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode == Code.NONODE) {
val indexOfLastSlash = path.lastIndexOf("/")
if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
val parentPath = path.substring(0, indexOfLastSlash)
createRecursive(parentPath)
createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
} else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
}

private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
val createRequests = partitions.map { partition =>
val path = TopicPartitionZNode.path(partition)
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/zk/ZkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,13 @@ object PreferredReplicaElectionZNode {
}
}.map(_.toSet).getOrElse(Set.empty)
}

object ConsumerOffset {
def path(group: String, topic: String, partition: Integer) = s"/consumers/${group}/offset/${topic}/${partition}"
def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong)
}

object ZkVersion {
val NoVersion = -1
}
3 changes: 3 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import kafka.network.RequestChannel
import kafka.security.auth.Authorizer
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils, ZkUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.memory.MemoryPool
Expand Down Expand Up @@ -60,6 +61,7 @@ class KafkaApisTest {
private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
private val controller = EasyMock.createNiceMock(classOf[KafkaController])
private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
private val metrics = new Metrics()
private val brokerId = 1
Expand All @@ -83,6 +85,7 @@ class KafkaApisTest {
txnCoordinator,
controller,
zkUtils,
zkClient,
brokerId,
new KafkaConfig(properties),
metadataCache,
Expand Down
92 changes: 92 additions & 0 deletions core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.zk

import kafka.zookeeper.ZooKeeperClient

import org.junit.{After, Before, Test}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.apache.kafka.common.TopicPartition

class KafkaZkClientTest extends ZooKeeperTestHarness {

private var zooKeeperClient: ZooKeeperClient = null
private var zkClient: KafkaZkClient = null

private val group = "my-group"
private val topicPartition = new TopicPartition("topic", 0)

@Before
override def setUp() {
super.setUp()
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
zkClient = new KafkaZkClient(zooKeeperClient, false)
}

@After
override def tearDown() {
zkClient.close()
super.tearDown()
}

@Test
def testSetAndGetConsumerOffset() {
val offset = 123L
// None if no committed offsets
assertTrue(zkClient.getConsumerOffset(group, topicPartition).isEmpty)
// Set and retrieve an offset
zkClient.setOrCreateConsumerOffset(group, topicPartition, offset)
assertEquals(offset, zkClient.getConsumerOffset(group, topicPartition).get)
// Update an existing offset and retrieve it
zkClient.setOrCreateConsumerOffset(group, topicPartition, offset + 2L)
assertEquals(offset + 2L, zkClient.getConsumerOffset(group, topicPartition).get)
}

@Test
def testGetConsumerOffsetNoData() {
zkClient.createRecursive(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition))
assertTrue(zkClient.getConsumerOffset(group, topicPartition).isEmpty)
}

@Test
def testDeleteRecursive() {
zkClient.deleteRecursive("/delete/does-not-exist")

zkClient.createRecursive("/delete/some/random/path")
assertTrue(zkClient.pathExists("/delete/some/random/path"))
zkClient.deleteRecursive("/delete")
assertFalse(zkClient.pathExists("/delete/some/random/path"))
assertFalse(zkClient.pathExists("/delete/some/random"))
assertFalse(zkClient.pathExists("/delete/some"))
assertFalse(zkClient.pathExists("/delete"))

intercept[IllegalArgumentException](zkClient.deleteRecursive("delete-invalid-path"))
}

@Test
def testCreateRecursive() {
zkClient.createRecursive("/create-newrootpath")
assertTrue(zkClient.pathExists("/create-newrootpath"))

zkClient.createRecursive("/create/some/random/long/path")
assertTrue(zkClient.pathExists("/create/some/random/long/path"))
zkClient.createRecursive("/create/some/random/long/path") // no errors if path already exists

intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
}

}

0 comments on commit 9504af7

Please sign in to comment.