Skip to content

Commit

Permalink
KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (apache#4976)
Browse files Browse the repository at this point in the history
1. Remove the deprecated StateStoreSuppliers, and the corresponding Stores.create() functions and factories: only the base StateStoreSupplier and MockStoreSupplier were still preserved as they are needed by the deprecated TopologyBuilder and KStreamBuilder. Will remove them in a follow-up PR.

2. Add TopologyWrapper.java as the original InternalTopologyBuilderAccessor was removed, but I realized it is still needed as of now.

3. Minor: removed StateStoreTestUtils.java and inline its logic in its callers since now with StoreBuilder it is just a one-liner.

Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
guozhangwang authored May 10, 2018
1 parent 8fb5b37 commit 0b1a118
Show file tree
Hide file tree
Showing 29 changed files with 249 additions and 1,524 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformer;
Expand All @@ -26,12 +24,7 @@
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;

import java.util.HashSet;
import java.util.Objects;
Expand Down Expand Up @@ -81,38 +74,6 @@ public R apply(T2 value2, T1 value1) {
};
}

@SuppressWarnings({"unchecked", "deprecation"})
static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
Objects.requireNonNull(storeName, "storeName can't be null");
Topic.validate(storeName);
return storeFactory(keySerde, aggValueSerde, storeName).build();
}

@SuppressWarnings({"unchecked", "deprecation"})
static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
final Serde<T> aggValSerde,
final Windows<W> windows,
final String storeName) {
Objects.requireNonNull(storeName, "storeName can't be null");
Topic.validate(storeName);
return storeFactory(keySerde, aggValSerde, storeName)
.windowed(windows.size(), windows.maintainMs(), windows.segments, false)
.build();
}

@SuppressWarnings("deprecation")
static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
return Stores.create(storeName)
.withKeys(keySerde)
.withValues(aggValueSerde)
.persistent()
.enableCaching();
}

static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return new ValueMapperWithKey<K, V, VR>() {
Expand Down
414 changes: 0 additions & 414 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java

Large diffs are not rendered by default.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@
* * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
* If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
*
* @param <K> The key type
* @param <V> The value type
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
*
* @param <K> the type of keys
* @param <V> the type of values
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
@Deprecated
public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
* Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
* If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
private final String name;
private final long retentionPeriod;

private static final int NUM_SEGMENTS = 3;

public RocksDbSessionBytesStoreSupplier(final String name,
final long retentionPeriod) {
this.name = name;
Expand All @@ -36,13 +38,12 @@ public String name() {
return name;
}

@SuppressWarnings("deprecation")
@Override
public SessionStore<Bytes, byte[]> get() {
final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
name,
retentionPeriod,
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
NUM_SEGMENTS,
new SessionKeySchema());
return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
}
Expand All @@ -52,11 +53,10 @@ public String metricsScope() {
return "rocksdb-session";
}

@SuppressWarnings("deprecation")
@Override
public long segmentIntervalMs() {
return Segments.segmentInterval(
retentionPeriod,
org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
NUM_SEGMENTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
private final long windowSize;
private final boolean retainDuplicates;

@SuppressWarnings("deprecation")
private static final int MIN_SEGMENTS = 2;

public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final int segments,
final long windowSize,
final boolean retainDuplicates) {
if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
if (segments < MIN_SEGMENTS) {
throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
}
this.name = name;
this.retentionPeriod = retentionPeriod;
Expand Down
Loading

0 comments on commit 0b1a118

Please sign in to comment.