From 38c65a9a39081068e05e3222ccb0dbc3c9f56942 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 28 Jul 2016 22:10:06 +0100 Subject: [PATCH] HOTFIX: Fix unstable Streams application reset integration test Author: Matthias J. Sax Reviewers: Eno Thereska , Ismael Juma Closes #1673 from mjsax/hotfix (cherry picked from commit ad1dab9c3d3ae14746ee5d94434ef98ef4889023) Signed-off-by: Ismael Juma --- .../streams/integration/ResetIntegrationTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index bffdfd937a86..85aff269e6aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -64,6 +64,7 @@ public class ResetIntegrationTest { private static final String INPUT_TOPIC = "inputTopic"; private static final String OUTPUT_TOPIC = "outputTopic"; private static final String OUTPUT_TOPIC_2 = "outputTopic2"; + private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; @@ -74,16 +75,17 @@ public static void startKafkaCluster() throws Exception { CLUSTER.createTopic(INPUT_TOPIC); CLUSTER.createTopic(OUTPUT_TOPIC); CLUSTER.createTopic(OUTPUT_TOPIC_2); + CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN); CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); } @Test - public void testReprocessingFromScratchAfterCleanUp() throws Exception { + public void testReprocessingFromScratchAfterReset() throws Exception { final Properties streamsConfiguration = prepareTest(); final Properties resultTopicConsumerConfig = prepareResultConsumer(); prepareInputData(); - final KStreamBuilder builder = setupTopology(); + final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2); // RUN KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); @@ -103,10 +105,10 @@ public void testReprocessingFromScratchAfterCleanUp() throws Exception { Utils.sleep(CLEANUP_CONSUMER_TIMEOUT); // RE-RUN - streams = new KafkaStreams(setupTopology(), streamsConfiguration); + streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.start(); final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10); - final KeyValue resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0); + final KeyValue resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 1).get(0); streams.close(); assertThat(resultRerun, equalTo(result)); @@ -163,7 +165,7 @@ private void prepareInputData() throws Exception { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L); } - private KStreamBuilder setupTopology() { + private KStreamBuilder setupTopology(final String outputTopic2) { final KStreamBuilder builder = new KStreamBuilder(); final KStream input = builder.stream(INPUT_TOPIC); @@ -201,7 +203,7 @@ public KeyValue apply(final Windowed key, final Long value) { return new KeyValue<>(key.window().start() + key.window().end(), value); } }); - windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2); + windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2); return builder; } @@ -229,6 +231,7 @@ private void assertInternalTopicsGotDeleted() { expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC); expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); Set allTopics;