Skip to content

Commit

Permalink
KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i…
Browse files Browse the repository at this point in the history
…s used.

Author: Jiangjie Qin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes apache#1023 from becketqin/KAFKA-3343
  • Loading branch information
becketqin authored and junrao committed Mar 9, 2016
1 parent 5afa166 commit c428237
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ class GroupMetadataManager(val brokerId: Int,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupPartition = partitionFor(group.groupId)
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition))
timestamp = timestamp, magicValue = magicValue)

val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
partitionOpt.foreach { partition =>
Expand All @@ -169,12 +170,12 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Short => Unit): DelayedStore = {
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(partitionFor(group.groupId))
)
timestamp = timestamp,
magicValue = magicValue)

val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))

Expand Down Expand Up @@ -253,11 +254,12 @@ class GroupMetadataManager(val brokerId: Int,

// construct the message set to append
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
new Message(
key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(partitionFor(groupId))
timestamp = timestamp,
magicValue = magicValue
)
}.toSeq

Expand Down Expand Up @@ -557,8 +559,8 @@ class GroupMetadataManager(val brokerId: Int,
val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)

(offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(offsetsPartition)))
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
(offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
}.groupBy { case (partition, tombstone) => partition }

// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
Expand Down Expand Up @@ -627,11 +629,13 @@ class GroupMetadataManager(val brokerId: Int,
config.offsetsTopicNumPartitions
}

private def getMessageFormatVersion(partition: Int): Byte = {
private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
(messageFormatVersion, timestamp)
}

/**
Expand Down

0 comments on commit c428237

Please sign in to comment.