Skip to content

Commit

Permalink
KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores (a…
Browse files Browse the repository at this point in the history
…pache#13615)

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
vcrfxia authored Apr 24, 2023
1 parent 6dcdb01 commit ab8f285
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void process(final Record<K, Change<V>> record) {
throw new StreamsException("Record key for the grouping KTable should not be null.");
}

if (useVersionedSemantics && !record.value().isLatest) {
final boolean isLatest = record.value().isLatest;
if (useVersionedSemantics && !isLatest) {
// skip out-of-order records when aggregating a versioned table, since the
// aggregate should include latest-by-timestamp records only. as an optimization,
// do not forward the out-of-order record downstream to the repartition topic either.
Expand All @@ -154,8 +155,6 @@ public void process(final Record<K, Change<V>> record) {
final KeyValue<? extends K1, ? extends V1> oldPair = record.value().oldValue == null ? null :
mapper.apply(record.key(), record.value().oldValue);

final boolean isLatest = record.value().isLatest;

// if the selected repartition key or value is null, skip
// forward oldPair first, to be consistent with reduce and aggregate
final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ public void init(final ProcessorContext<K, Change<VOut>> context) {
public void process(final Record<K, Change<V>> record) {
final VOut newValue = valueTransformer.transform(record.key(), record.value().newValue);

if (queryableName == null) {
final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null;
context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)));
} else {
if (queryableName != null) {
final VOut oldValue = sendOldValues ? getValueOrNull(store.get(record.key())) : null;
final long putReturnCode = store.put(record.key(), newValue, record.timestamp());
// if not put to store, do not forward downstream either
if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue, putReturnCode == PUT_RETURN_CODE_IS_LATEST)));
}
} else {
final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null;
context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

import java.util.List;
Expand All @@ -36,6 +37,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
void put(K key, V value);

Expand All @@ -47,6 +49,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V putIfAbsent(K key, V value);

Expand All @@ -56,6 +59,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param entries A list of entries to put into the store;
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
void putAll(List<KeyValue<K, V>> entries);

Expand All @@ -65,6 +69,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param key The key
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V delete(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ default KeyValueIterator<K, V> reverseAll() {
* @param <PS> Prefix Serializer type
* @param <P> Prefix Type.
* @return The iterator for keys having the specified prefix.
* @throws InvalidStateStoreException if the store is not initialized
*/
default <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
* validTo timestamp is undefined. {@code Long.MIN_VALUE} indicates that the record
* was not put, due to grace period having been exceeded.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
long put(K key, V value, long timestamp);

Expand Down Expand Up @@ -96,6 +97,7 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
* returned {@link VersionedRecord} may be smaller than the provided deletion
* timestamp.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> delete(K key, long timestamp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value, long windowStartTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ private class MeteredVersionedKeyValueStoreInternal
extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {

private final VersionedBytesStore inner;
private final Serde<V> rawValueSerde;
private StateSerdes<K, V> rawValueSerdes;
private final Serde<V> plainValueSerde;
private StateSerdes<K, V> plainValueSerdes;

MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
final String metricScope,
Expand All @@ -104,13 +104,13 @@ private class MeteredVersionedKeyValueStoreInternal
: new ValueAndTimestampSerde<>(valueSerde)
);
this.inner = inner;
this.rawValueSerde = valueSerde;
this.plainValueSerde = valueSerde;
}

public long put(final K key, final V value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);
final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time, putSensor);
maybeRecordE2ELatency();
return validTo;
} catch (final ProcessorStateException e) {
Expand Down Expand Up @@ -178,10 +178,10 @@ protected void initStoreSerde(final ProcessorContext context) {
// additionally init raw value serde
final String storeName = super.name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
rawValueSerdes = new StateSerdes<>(
plainValueSerdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(rawValueSerde, new SerdeGetter(context))
prepareValueSerde(plainValueSerde, new SerdeGetter(context))
);
}

Expand All @@ -192,10 +192,10 @@ protected void initStoreSerde(final StateStoreContext context) {
// additionally init raw value serde
final String storeName = super.name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
rawValueSerdes = new StateSerdes<>(
plainValueSerdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(rawValueSerde, new SerdeGetter(context))
prepareValueSerde(plainValueSerde, new SerdeGetter(context))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte

@Override
public long put(final Bytes key, final byte[] value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (timestamp < observedStreamTime - gracePeriod) {
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
Expand All @@ -135,6 +139,9 @@ public long put(final Bytes key, final byte[] value, final long timestamp) {

@Override
public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (timestamp < observedStreamTime - gracePeriod) {
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
LOG.warn("Skipping record for expired delete.");
Expand All @@ -157,6 +164,9 @@ public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {

@Override
public VersionedRecord<byte[]> get(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

// latest value (if present) is guaranteed to be in the latest value store
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) {
Expand All @@ -171,6 +181,8 @@ public VersionedRecord<byte[]> get(final Bytes key) {

@Override
public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (asOfTimestamp < observedStreamTime - historyRetention) {
// history retention exceeded. we still check the latest value store in case the
Expand Down Expand Up @@ -373,6 +385,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
}
}

private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + name + " is currently closed");
}
}

/**
* Generic interface for segment stores. See {@link VersionedStoreClient} for use.
*/
Expand Down Expand Up @@ -847,7 +865,6 @@ private <T extends VersionedStoreSegment> long finishPut(
segment.put(key, segmentValue.serialize());
}
}
return foundTs;
} else {
// insert into segment corresponding to foundTs, as foundTs represents the validTo
// timestamp of the current put.
Expand Down Expand Up @@ -891,8 +908,8 @@ private <T extends VersionedStoreSegment> long finishPut(
segment.put(key, segmentValue.serialize());
}
}
return foundTs;
}
return foundTs;
}

/**
Expand Down
Loading

0 comments on commit ab8f285

Please sign in to comment.