Skip to content

Commit

Permalink
KAFKA-12436: Deprecate MirrorMaker v1 (KIP-720) (apache#10805)
Browse files Browse the repository at this point in the history
Reviewers: Luke Chen <[email protected]>, Ismael Juma <[email protected]>, Mickael Maison <[email protected]>
  • Loading branch information
ryannedolan authored Jul 4, 2021
1 parent 08757d0 commit 6d2f563
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
Expand All @@ -58,7 +57,10 @@ import scala.util.{Failure, Success, Try}
* enable.auto.commit=false
* 3. Mirror Maker Setting:
* abort.on.send.failure=true
*
* @deprecated Since 3.0, use the Connect-based MirrorMaker instead (aka MM2).
*/
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
object MirrorMaker extends Logging with KafkaMetricsGroup {

private[tools] var producer: MirrorMakerProducer = null
Expand All @@ -80,6 +82,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

def main(args: Array[String]): Unit = {

warn("This tool is deprecated and may be removed in a future major release.")
info("Starting mirror maker")
try {
val opts = new MirrorMakerOptions(args)
Expand Down Expand Up @@ -191,7 +194,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

setName(threadName)

@nowarn("cat=deprecation")
private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
BaseConsumerRecord(record.topic,
record.partition,
Expand Down Expand Up @@ -414,12 +416,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}

private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.BeforeEach

@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {

override def generateConfigs: Seq[KafkaConfig] =
Expand Down
4 changes: 4 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
<code>DefaultReplicationPolicy</code>, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your
replication topology.
</li>
<li> The original MirrorMaker (MM1) and related classes have been deprecated. Please use the Connect-based
MirrorMaker (MM2), as described in the
<a href="/{{version}}/documentation/#georeplication">Geo-Replication section</a>.
</li>
</ul>
<h5><a id="upgrade_280_notable" href="#upgrade_280_notable">Notable changes in 2.8.0</a></h5>
Expand Down

0 comments on commit 6d2f563

Please sign in to comment.