Skip to content

Commit

Permalink
KAFKA-5631; Use Jackson for serialising to JSON
Browse files Browse the repository at this point in the history
- Rename `encode` to `legacyEncodeAsString`, we
can remove this when we remove `ZkUtils`.
- Introduce `encodeAsString` that uses Jackson.
- Change `encodeAsBytes` to use Jackson.
- Avoid intermediate string when converting
Broker to json bytes.

The methods that use Jackson only support
Java collections unlike `legacyEncodeAsString`.

Tests were added `encodeAsString` and
`encodeAsBytes`.

Author: umesh chaudhary <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#4259 from umesh9794/KAFKA-5631
  • Loading branch information
umesh9794 authored and ijuma committed Dec 12, 2017
1 parent 651c6e4 commit 0a508a4
Show file tree
Hide file tree
Showing 20 changed files with 187 additions and 112 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.Random
import java.util.Properties

import kafka.common.TopicAlreadyMarkedForDeletionException
import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.errors._

import collection.{Map, Set, mutable, _}
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -628,7 +628,7 @@ object AdminUtils extends Logging with AdminUtilities {

// create the change notification
val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath))
zkUtils.createSequentialPersistentPath(seqNode, content)
}

Expand All @@ -641,7 +641,7 @@ object AdminUtils extends Logging with AdminUtilities {
*/
private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
val map = Map("version" -> 1, "config" -> config.asScala)
zkUtils.updatePersistentPath(entityPath, Json.encode(map))
zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))
}

/**
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/admin/LogDirsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object LogDirsCommand {
}

private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = {
Json.encode(Map(
Json.encodeAsString(Map(
"version" -> 1,
"brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
Map(
Expand All @@ -73,13 +73,13 @@ object LogDirsCommand {
"size" -> replicaInfo.size,
"offsetLag" -> replicaInfo.offsetLag,
"isFuture" -> replicaInfo.isFuture
)
}
)
}
)
}
))
).asJava
}.asJava
).asJava
}.asJava
).asJava
}.asJava
).asJava)
}

private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = {
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ object ReassignPartitionsCommand extends Logging {
private[admin] val AnyLogDir = "any"

def main(args: Array[String]): Unit = {

val opts = validateAndParseArgs(args)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
val zkUtils = ZkUtils(zkConnect,
Expand Down Expand Up @@ -224,17 +223,17 @@ object ReassignPartitionsCommand extends Logging {

def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
Json.encode(Map(
Json.encodeAsString(Map(
"version" -> 1,
"partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
Map(
"topic" -> topic,
"partition" -> partition,
"replicas" -> replicas,
"log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir))
)
}
))
"replicas" -> replicas.asJava,
"log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir)).asJava
).asJava
}.asJava
).asJava)
}

// Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/api/LeaderAndIsr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package kafka.api

import kafka.utils._

object LeaderAndIsr {
val initialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0
Expand All @@ -43,6 +41,6 @@ case class LeaderAndIsr(leader: Int,
def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)

override def toString: String = {
Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, zkVersion=$zkVersion)"
}
}
14 changes: 8 additions & 6 deletions core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time

import scala.collection.Map
import scala.collection.JavaConverters._

/**
* A Kafka broker.
* A broker has an id and a collection of end-points.
Expand Down Expand Up @@ -127,12 +130,12 @@ object Broker {
}
}

def toJson(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
rack: Option[String]): String = {
def toJsonBytes(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
rack: Option[String]): Array[Byte] = {
val jsonMap = collection.mutable.Map(VersionKey -> version,
HostKey -> host,
PortKey -> port,
EndpointsKey -> advertisedEndpoints.map(_.connectionString).toArray,
EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
JmxPortKey -> jmxPort,
TimestampKey -> Time.SYSTEM.milliseconds().toString
)
Expand All @@ -141,10 +144,9 @@ object Broker {
if (version >= 4) {
jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint =>
endPoint.listenerName.value -> endPoint.securityProtocol.name
}.toMap)
}.toMap.asJava)
}

Json.encode(jsonMap)
Json.encodeAsBytes(jsonMap.asJava)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
import scala.collection.JavaConverters._


/**
* This class handles the consumers interaction with zookeeper
*
Expand Down Expand Up @@ -272,8 +273,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
val timestamp = Time.SYSTEM.milliseconds.toString
val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
"timestamp" -> timestamp))

val consumerRegistrationInfo = Json.encodeAsString(Map("version" -> 1,
"subscription" -> topicCount.getTopicCountMap.asJava,
"pattern" -> topicCount.pattern,
"timestamp" -> timestamp
).asJava)

val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import kafka.common.KafkaException
import kafka.utils.{Json, Logging, ZkUtils}
import kafka.zk.KafkaZkClient

import scala.collection.JavaConverters._

/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
* such that the same producerId will not be assigned twice across multiple transaction coordinators.
Expand All @@ -37,7 +39,7 @@ object ProducerIdManager extends Logging {
Json.encodeAsBytes(Map("version" -> CurrentVersion,
"broker" -> producerIdBlock.brokerId,
"block_start" -> producerIdBlock.blockStartId.toString,
"block_end" -> producerIdBlock.blockEndId.toString)
"block_end" -> producerIdBlock.blockEndId.toString).asJava
)
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/security/auth/Acl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.security.auth
import kafka.utils.Json
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SecurityUtils
import scala.collection.JavaConverters._

object Acl {
val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
Expand Down Expand Up @@ -71,7 +72,7 @@ object Acl {
}

def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap).toList)
Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap.asJava).toList.asJava)
}
}

Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils

import scala.collection.mutable
import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -334,12 +334,14 @@ object DumpLogSegments {
}
}.mkString("{", ",", "}")

val keyString = Json.encode(Map("metadata" -> groupId))
val valueString = Json.encode(Map(
"protocolType" -> protocolType,
"protocol" -> group.protocol,
"generationId" -> group.generationId,
"assignment" -> assignment))
val keyString = Json.encodeAsString(Map("metadata" -> groupId).asJava)

val valueString = Json.encodeAsString(Map(
"protocolType" -> protocolType,
"protocol" -> group.protocol,
"generationId" -> group.generationId,
"assignment" -> assignment
).asJava)

(Some(keyString), Some(valueString))
}
Expand Down
30 changes: 18 additions & 12 deletions core/src/main/scala/kafka/utils/Json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,38 @@ object Json {
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
*
* This method does not properly handle non-ascii characters.
* This implementation is inefficient, so we recommend `encodeAsString` or `encodeAsBytes` (the latter is preferred
* if possible). This method supports scala Map implementations while the other two do not. Once this functionality
* is no longer required, we can remove this method.
*/
def encode(obj: Any): String = {
def legacyEncodeAsString(obj: Any): String = {
obj match {
case null => "null"
case b: Boolean => b.toString
case s: String => mapper.writeValueAsString(s)
case n: Number => n.toString
case m: Map[_, _] => "{" +
m.map {
case (k, v) => encode(k) + ":" + encode(v)
case (k, v) => legacyEncodeAsString(k) + ":" + legacyEncodeAsString(v)
case elem => throw new IllegalArgumentException(s"Invalid map element '$elem' in $obj")
}.mkString(",") + "}"
case a: Array[_] => encode(a.toSeq)
case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
case a: Array[_] => legacyEncodeAsString(a.toSeq)
case i: Iterable[_] => "[" + i.map(legacyEncodeAsString).mkString(",") + "]"
case other: AnyRef => throw new IllegalArgumentException(s"Unknown argument of type ${other.getClass}: $other")
}
}

/**
* Encode an object into a JSON value in bytes. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
*
* This method does not properly handle non-ascii characters.
*/
def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8)
* Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
*/
def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)

/**
* Encode an object into a JSON value in bytes. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid
* a jackson-scala dependency).
*/
def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj)
}
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.utils

import java.nio.charset.StandardCharsets
import java.util.concurrent.{CountDownLatch, TimeUnit}

import kafka.admin._
Expand Down Expand Up @@ -198,15 +199,15 @@ object ZkUtils {
}

def controllerZkData(brokerId: Int, timestamp: Long): String = {
Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
}

def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = {
Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
}

def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
Json.encode(Map(
Json.legacyEncodeAsString(Map(
"version" -> 1,
"partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
Map(
Expand Down Expand Up @@ -315,8 +316,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
object ClusterId {

def toJson(id: String) = {
val jsonMap = Map("version" -> "1", "id" -> id)
Json.encode(jsonMap)
Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id))
}

def fromJson(clusterIdJson: String): String = {
Expand Down Expand Up @@ -457,7 +457,8 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val brokerIdPath = BrokerIdsPath + "/" + id
// see method documentation for reason why we do this
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
val json = Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack)
val json = new String(Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack),
StandardCharsets.UTF_8)
registerBrokerInZk(brokerIdPath, json)

info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
Expand Down Expand Up @@ -486,15 +487,15 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
}

def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
}

/**
* Get JSON partition to replica map from zookeeper.
*/
def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
Json.encode(Map("version" -> 1, "partitions" -> map))
Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map))
}

/**
Expand Down
Loading

0 comments on commit 0a508a4

Please sign in to comment.