Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text #4549

Merged
merged 7 commits into from
Feb 16, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,20 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand All @@ -90,6 +95,8 @@

@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);

private static final int NUM_BROKERS = 1;

@ClassRule
Expand Down Expand Up @@ -133,6 +140,40 @@ private void createTopics() throws InterruptedException {
CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
}

/**
* Try to read inputValues from {@code resources/QueryableStateIntegrationTest/inputValues.txt}, which might be useful
* for larger scale testing. In case of exception, for instance if no such file can be read, return a small list
* which satisfies all the prerequisites of the tests.
*/
private List<String> getInputValues() {
List<String> input = new ArrayList<>();
final ClassLoader classLoader = getClass().getClassLoader();
final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile()))) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
input.add(line);
}
} catch (final Exception e) {
log.warn("Unable to read '{}{}{}'. Using default inputValues list", "resources", File.separator, fileName);
input = Arrays.asList(
"hello world",
"all streams lead to kafka",
"streams",
"kafka streams",
"the cat in the hat",
"green eggs and ham",
"that Sam i am",
"up the creek without a paddle",
"run forest run",
"a tank full of gas",
"eat sleep rave repeat",
"one jolly sailor",
"king of the world");

}
return input;
}

@Before
public void before() throws Exception {
testNo++;
Expand Down Expand Up @@ -167,20 +208,7 @@ public int compare(final KeyValue<String, Long> o1,
return o1.key.compareTo(o2.key);
}
};
inputValues = Arrays.asList(
"hello world",
"all streams lead to kafka",
"streams",
"kafka streams",
"the cat in the hat",
"green eggs and ham",
"that sam i am",
"up the creek without a paddle",
"run forest run",
"a tank full of gas",
"eat sleep rave repeat",
"one jolly sailor",
"king of the world");
inputValues = getInputValues();
inputValuesKeys = new HashSet<>();
for (final String sentence : inputValues) {
final String[] words = sentence.split("\\W+");
Expand Down Expand Up @@ -215,7 +243,7 @@ private KafkaStreams createCountStream(final String inputTopic,
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(final String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
return Arrays.asList(value.split("\\W+"));
}
})
.groupBy(MockMapper.<String, String>selectValueMapper());
Expand Down Expand Up @@ -376,7 +404,7 @@ public void queryOnRebalance() throws InterruptedException {
storeName + "-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
}

// kill N-1 threads
Expand All @@ -391,7 +419,7 @@ public void queryOnRebalance() throws InterruptedException {
storeName + "-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
} finally {
for (int i = 0; i < numThreads; i++) {
if (!streamRunnables[i].isClosed()) {
Expand Down