Skip to content

Commit

Permalink
HOTFIX: Fix unstable Streams application reset integration test
Browse files Browse the repository at this point in the history
Author: Matthias J. Sax <[email protected]>

Reviewers: Eno Thereska <[email protected]>, Ismael Juma <[email protected]>

Closes apache#1673 from mjsax/hotfix

(cherry picked from commit ad1dab9)
Signed-off-by: Ismael Juma <[email protected]>
  • Loading branch information
mjsax authored and ijuma committed Jul 28, 2016
1 parent 071b76c commit 38c65a9
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ResetIntegrationTest {
private static final String INPUT_TOPIC = "inputTopic";
private static final String OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_2 = "outputTopic2";
private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
private static final String INTERMEDIATE_USER_TOPIC = "userTopic";

private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
Expand All @@ -74,16 +75,17 @@ public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
}

@Test
public void testReprocessingFromScratchAfterCleanUp() throws Exception {
public void testReprocessingFromScratchAfterReset() throws Exception {
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = prepareResultConsumer();

prepareInputData();
final KStreamBuilder builder = setupTopology();
final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);

// RUN
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
Expand All @@ -103,10 +105,10 @@ public void testReprocessingFromScratchAfterCleanUp() throws Exception {
Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);

// RE-RUN
streams = new KafkaStreams(setupTopology(), streamsConfiguration);
streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 1).get(0);
streams.close();

assertThat(resultRerun, equalTo(result));
Expand Down Expand Up @@ -163,7 +165,7 @@ private void prepareInputData() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L);
}

private KStreamBuilder setupTopology() {
private KStreamBuilder setupTopology(final String outputTopic2) {
final KStreamBuilder builder = new KStreamBuilder();

final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
Expand Down Expand Up @@ -201,7 +203,7 @@ public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
return new KeyValue<>(key.window().start() + key.window().end(), value);
}
});
windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2);
windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2);

return builder;
}
Expand Down Expand Up @@ -229,6 +231,7 @@ private void assertInternalTopicsGotDeleted() {
expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");

Set<String> allTopics;
Expand Down

0 comments on commit 38c65a9

Please sign in to comment.