From 200304e7031ed4c55031238af2e53c32f1bb589e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 30 Sep 2020 20:05:38 -0700 Subject: [PATCH 1/3] refactor unit test --- checkstyle/import-control.xml | 1 + checkstyle/suppressions.xml | 3 ++ .../streams/state/internals/RocksDBStore.java | 1 + .../kafka/streams/KafkaStreamsTest.java | 12 ++----- .../processor/internals/StreamTaskTest.java | 32 ++++++++--------- .../state/KeyValueStoreTestDriver.java | 4 +-- .../internals/RocksDBKeyValueStoreTest.java | 18 ---------- .../state/internals/RocksDBStoreTest.java | 24 +++++++------ .../kafka/test/MockRocksDbConfigSetter.java | 36 +++++++++++++++++++ 9 files changed, 74 insertions(+), 57 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a7dc27fcd69f..4b1e2b8583ad 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -307,6 +307,7 @@ + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index dead182d94b2..454711c2bd32 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -208,6 +208,9 @@ + + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3442d8353d50..20121e6be515 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 326d277ed1dc..4c0b0b4252e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -49,13 +49,13 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.TestUtils; import org.easymock.Capture; import org.easymock.EasyMock; @@ -795,17 +795,9 @@ public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() { PowerMock.verify(Executors.class, rocksDBMetricsRecordingTriggerThread); } - public static class TestRocksDbConfigSetter implements RocksDBConfigSetter { - @Override - public void setConfig(final String storeName, - final org.rocksdb.Options options, - final Map configs) { - } - } - @Test public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() { - props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName()); + props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName()); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 229df657b1cf..30e20b068da8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -2050,22 +2050,22 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { final ProcessorTopology topology = withSources(asList(), mkMap()); final TopologyException exception = assertThrows( - TopologyException.class, - () -> new StreamTask( - taskId, - partitions, - topology, - consumer, - createConfig(false, "100"), - metrics, - stateDirectory, - cache, - time, - stateManager, - recordCollector, - context - ) - ); + TopologyException.class, + () -> new StreamTask( + taskId, + partitions, + topology, + consumer, + createConfig(false, "100"), + metrics, + stateDirectory, + cache, + time, + stateManager, + recordCollector, + context + ) + ); assertThat(exception.getMessage(), equalTo("Invalid topology: " + "Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same " + diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 7a3912181233..7af3f3945acf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -35,10 +35,10 @@ import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StreamsProducer; import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; @@ -193,7 +193,7 @@ private KeyValueStoreTestDriver(final StateSerdes serdes) { props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); - props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); final LogContext logContext = new LogContext("KeyValueStoreTestDriver "); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 5937af03121d..aa6a4b82af89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -21,13 +21,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.junit.Test; -import org.rocksdb.Options; - -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,20 +45,6 @@ protected KeyValueStore createKeyValueStore(final ProcessorContext return store; } - public static class TheRocksDbConfigSetter implements RocksDBConfigSetter { - static boolean called = false; - - @Override - public void setConfig(final String storeName, final Options options, final Map configs) { - called = true; - } - } - - @Test - public void shouldUseCustomRocksDbConfigSetter() { - assertTrue(TheRocksDbConfigSetter.called); - } - @Test public void shouldPerformRangeQueriesWithCachingDisabled() { context.setTime(1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index ca281818f176..4c27c69d6973 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; @@ -297,9 +298,21 @@ public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() public void shouldCallRocksDbConfigSetter() { MockRocksDbConfigSetter.called = false; + final Properties props = StreamsTestUtils.getStreamsConfig(); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); + final Object param = new Object(); + props.put("abc.def", param); + InternalMockProcessorContext context = new InternalMockProcessorContext( + dir, + Serdes.String(), + Serdes.String(), + new StreamsConfig(props) + ); + rocksDBStore.init(context, rocksDBStore); assertTrue(MockRocksDbConfigSetter.called); + assertThat(MockRocksDbConfigSetter.configMap.get("abc.def"), equalTo(param)); } @Test @@ -697,17 +710,6 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() { } } - public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { - static boolean called; - - @Override - public void setConfig(final String storeName, final Options options, final Map configs) { - called = true; - - options.setLevel0FileNumCompactionTrigger(10); - } - } - public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter { static boolean bloomFiltersSet; diff --git a/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java new file mode 100644 index 000000000000..15833de952fd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.state.RocksDBConfigSetter; +import org.rocksdb.Options; + +import java.util.HashMap; +import java.util.Map; + +public class MockRocksDbConfigSetter implements RocksDBConfigSetter { + public static boolean called = false; + public static Map configMap = new HashMap<>(); + + @Override + public void setConfig(final String storeName, final Options options, final Map configs) { + called = true; + + configMap.putAll(configs); + } +} From 8ac47b3f20d58f8580a60c60d89be86e9c769f46 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 30 Sep 2020 20:09:24 -0700 Subject: [PATCH 2/3] remove imports --- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 20121e6be515..3442d8353d50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.Configurable; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; From 52fcd66d6b57b72845fba1ee008d4ca4133b2255 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 1 Oct 2020 16:42:31 -0700 Subject: [PATCH 3/3] fix checkstyle --- .../streams/state/internals/RocksDBStoreTest.java | 10 +++++----- .../org/apache/kafka/test/MockRocksDbConfigSetter.java | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 4c27c69d6973..934b6ed6e87a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -302,11 +302,11 @@ public void shouldCallRocksDbConfigSetter() { props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); final Object param = new Object(); props.put("abc.def", param); - InternalMockProcessorContext context = new InternalMockProcessorContext( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) + final InternalMockProcessorContext context = new InternalMockProcessorContext( + dir, + Serdes.String(), + Serdes.String(), + new StreamsConfig(props) ); rocksDBStore.init(context, rocksDBStore); diff --git a/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java index 15833de952fd..c2e63af3742a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.test; -import org.apache.kafka.common.Configurable; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.Options;