Skip to content

Commit

Permalink
MINOR: Update test classes to use KafkaZkClient methods (apache#4367)
Browse files Browse the repository at this point in the history
Remove ZkUtils reference form ZooKeeperTestHarness plus some minor cleanups.
  • Loading branch information
omkreddy authored and hachikuji committed Feb 15, 2018
1 parent 015e224 commit d9d0d79
Show file tree
Hide file tree
Showing 25 changed files with 216 additions and 139 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ class ZkUtils(val zkClient: ZkClient,
}

/**
* Update the value of a persistent node with the given path and data.
* Update the value of a ephemeral node with the given path and data.
* create parent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -761,12 +761,30 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean

/**
* Gets the leader for a given partition
* @param partition
* @param partition The partition for which we want to get leader.
* @return optional integer if the leader exists and None otherwise.
*/
def getLeaderForPartition(partition: TopicPartition): Option[Int] =
getTopicPartitionState(partition).map(_.leaderAndIsr.leader)

/**
* Gets the in-sync replicas (ISR) for a specific topicPartition
* @param partition The partition for which we want to get ISR.
* @return optional ISR if exists and None otherwise
*/
def getInSyncReplicasForPartition(partition: TopicPartition): Option[Seq[Int]] =
getTopicPartitionState(partition).map(_.leaderAndIsr.isr)


/**
* Gets the leader epoch for a specific topicPartition
* @param partition The partition for which we want to get the leader epoch
* @return optional integer if the leader exists and None otherwise
*/
def getEpochForPartition(partition: TopicPartition): Option[Int] = {
getTopicPartitionState(partition).map(_.leaderAndIsr.leaderEpoch)
}

/**
* Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
Expand Down Expand Up @@ -1356,7 +1374,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
}
}

private[zk] def pathExists(path: String): Boolean = {
def pathExists(path: String): Boolean = {
val existsRequest = ExistsRequest(path)
val existsResponse = retryRequestUntilConnected(existsRequest)
existsResponse.resultCode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)

// create the consumer offset topic
TestUtils.createTopic(zkClient, GROUP_METADATA_TOPIC_NAME,
1,
1,
servers,
servers.head.groupCoordinator.offsetsTopicConfigs)
createTopic(GROUP_METADATA_TOPIC_NAME, topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
// create the test topic with all the brokers as replicas
TestUtils.createTopic(zkClient, topic, 1, 1, this.servers)
TestUtils.createTopic(zkClient, deleteTopic, 1, 1, this.servers)
createTopic(topic)
createTopic(deleteTopic)
}

@After
Expand Down Expand Up @@ -711,7 +707,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {

// create an unmatched topic
val unmatchedTopic = "unmatched"
TestUtils.createTopic(zkClient, unmatchedTopic, 1, 1, this.servers)
createTopic(unmatchedTopic)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
sendRecords(1, new TopicPartition(unmatchedTopic, part))
removeAllAcls()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer = this.consumers.head
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable {
def run() = TestUtils.createTopic(zkClient, newtopic, serverCount, serverCount, servers)
def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount)
}, 2, TimeUnit.SECONDS)
consumer.poll(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckZero() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
createTopic(topic1, replicationFactor = numServers)

// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
Expand All @@ -105,7 +105,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckOne() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
createTopic(topic1, replicationFactor = numServers)

// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
Expand All @@ -122,7 +122,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {

// create topic
val topic10 = "topic10"
TestUtils.createTopic(zkClient, topic10, servers.size, numServers, servers, topicConfig)
createTopic(topic10, numPartitions = servers.size, replicationFactor = numServers, topicConfig)

// send a record that is too large for replication, but within the broker max message limit
val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
Expand Down Expand Up @@ -169,7 +169,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testWrongBrokerList() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
createTopic(topic1, replicationFactor = numServers)

// producer with incorrect broker list
producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
Expand All @@ -188,7 +188,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testInvalidPartition() {
// create topic with a single partition
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
createTopic(topic1, numPartitions = 1, replicationFactor = numServers)

// create a record with incorrect partition id (higher than the number of partitions), send should fail
val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
Expand All @@ -203,7 +203,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testSendAfterClosed() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
createTopic(topic1, replicationFactor = numServers)

val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)

Expand Down Expand Up @@ -241,7 +241,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val topicProps = new Properties()
topicProps.put("min.insync.replicas",(numServers+1).toString)

TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)

val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
try {
Expand All @@ -261,7 +261,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val topicProps = new Properties()
topicProps.put("min.insync.replicas", numServers.toString)

TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)

val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import java.io.File
import java.util.Locale

import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}

Expand Down Expand Up @@ -51,6 +52,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
*/
@Test
def testZkAclsDisabled() {
val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
TestUtils.verifyUnsecureZkAcls(zkUtils)
CoreUtils.swallow(zkUtils.close(), this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package kafka.api

import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
import org.junit.Test

Expand Down Expand Up @@ -56,6 +57,8 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
*/
@Test
def testAcls() {
val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
TestUtils.verifySecureZkAcls(zkUtils, 1)
CoreUtils.swallow(zkUtils.close(), this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package kafka.api
import org.apache.kafka.common.security.scram.ScramMechanism
import kafka.utils.JaasTestUtils
import kafka.utils.ZkUtils
import kafka.zk.ConfigEntityChangeNotificationZNode

import scala.collection.JavaConverters._
import org.junit.Before

Expand All @@ -32,7 +34,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes

override def configureSecurityBeforeServersStart() {
super.configureSecurityBeforeServersStart()
zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
}
Expand Down
34 changes: 22 additions & 12 deletions core/src/test/scala/unit/kafka/admin/AdminTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali
import org.apache.kafka.common.metrics.Quota
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Test}
import org.junit.{After, Before, Test}
import java.util.Properties

import kafka.utils._
import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.zk.{ConfigEntityZNode, PreferredReplicaElectionZNode, ZooKeeperTestHarness}
import kafka.utils.{Logging, TestUtils, ZkUtils}
import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
import java.io.File
Expand All @@ -39,16 +39,26 @@ import kafka.utils.TestUtils._
import scala.collection.{Map, Set, immutable}
import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.JaasUtils

import scala.collection.JavaConverters._
import scala.util.Try

class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {

var servers: Seq[KafkaServer] = Seq()
var zkUtils: ZkUtils = null

@Before
override def setUp() {
super.setUp()
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
}

@After
override def tearDown() {
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close(), this)
TestUtils.shutdownServers(servers)
super.tearDown()
}
Expand Down Expand Up @@ -212,9 +222,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
// in sync replicas should not have any replica that is not in the new assigned replicas
checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
Expand Down Expand Up @@ -242,8 +252,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
Expand Down Expand Up @@ -271,8 +281,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
Expand Down Expand Up @@ -313,9 +323,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
// ensure that there are no under replicated partitions
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
Expand All @@ -326,7 +336,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
// try to read it back and compare with what was written
val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
val preferredReplicaElectionZkData = zkUtils.readData(PreferredReplicaElectionZNode.path)._1
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
Expand Down Expand Up @@ -531,7 +541,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
// Write config without notification to ZK.
val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
val map = Map("version" -> 1, "config" -> configMap.asJava)
zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))

val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
assertEquals("Must have 1 overriden client config", 1, configInZk.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,31 @@ import java.nio.charset.StandardCharsets

import kafka.utils._
import kafka.server.KafkaConfig
import org.junit.Test
import org.junit.{After, Before, Test}
import kafka.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.common.security.JaasUtils


@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
var zkUtils: ZkUtils = null

@Before
override def setUp() {
super.setUp()
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
}

@After
override def tearDown() {
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close(), this)
super.tearDown()
}


@Test
def testGroupWideDeleteInZK() {
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
val deletePath = getDeleteTopicPath(normalTopic)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath))
TopicCommand.deleteTopic(zkClient, deleteOpts)
assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath))

// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
Expand All @@ -93,11 +93,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
}
assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.pathExists(deleteOffsetTopicPath))
}

@Test
Expand Down
Loading

0 comments on commit d9d0d79

Please sign in to comment.