Skip to content

Commit

Permalink
KAFKA-14491: [5/N] Basic operations for RocksDB versioned store (#13188)
Browse files Browse the repository at this point in the history
Introduces the VersionedKeyValueStore interface proposed in KIP-889, along with the RocksDB-based implementation of the interface. This PR includes fully functional put, get, get-with-timestamp, and delete operations, but does not include the ability to restore records from changelog or surrounding store layers (for metrics or writing to the changelog). Those pieces will come in follow-up PRs.

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
vcrfxia authored Feb 11, 2023
1 parent b5a9b9d commit df22a9d
Show file tree
Hide file tree
Showing 6 changed files with 1,441 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

/**
* A key-value store that stores multiple record versions per key, and supports timestamp-based
* retrieval operations to return the latest record (per key) as of a specified timestamp.
* Only one record is stored per key and timestamp, i.e., a second call to
* {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
* <p>
* Each store instance has an associated, fixed-duration "history retention" which specifies
* how long old record versions should be kept for. In particular, a versioned store guarantees
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
* bound is within history retention of the current observed stream time. (Queries with timestamp
* bound older than the specified history retention are considered invalid.)
*
* @param <K> The key type
* @param <V> The value type
*/
public interface VersionedKeyValueStore<K, V> extends StateStore {

/**
* Add a new record version associated with the specified key and timestamp.
*
* @param key The key
* @param value The value, it can be {@code null}. {@code null} is interpreted as a delete.
* @param timestamp The timestamp for this record version
* @throws NullPointerException If {@code null} is used for key.
*/
void put(K key, V value, long timestamp);

/**
* Delete the value associated with this key from the store, at the specified timestamp
* (if there is such a value), and return the deleted value.
* <p>
* This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
* followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
*
* @param key The key
* @param timestamp The timestamp for this delete
* @return The value and timestamp of the record associated with this key as of
* the deletion timestamp (inclusive), or {@code null} if no such record exists
* (including if the deletion timestamp is older than this store's history
* retention time, i.e., the store no longer contains data for the provided
* timestamp). Note that the record timestamp {@code r.timestamp()} of the
* returned {@link VersionedRecord} may be smaller than the provided deletion
* timestamp.
* @throws NullPointerException If {@code null} is used for key.
*/
VersionedRecord<V> delete(K key, long timestamp);

/**
* Get the current (i.e., latest by timestamp) record associated with this key.
*
* @param key The key to fetch
* @return The value and timestamp of the current record associated with this key, or
* {@code null} if there is no current record for this key.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> get(K key);

/**
* Get the record associated with this key as of the specified timestamp (i.e.,
* the existing record with the largest timestamp not exceeding the provided
* timestamp bound).
*
* @param key The key to fetch
* @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record
* (for the specified key) exists with this timestamp, then
* this is the record that will be returned.
* @return The value and timestamp of the record associated with this key
* as of the provided timestamp, or {@code null} if no such record exists
* (including if the provided timestamp bound is older than this store's history
* retention time, i.e., the store no longer contains data for the provided
* timestamp). Note that the record timestamp {@code r.timestamp()} of the
* returned {@link VersionedRecord} may be smaller than the provided timestamp
* bound.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> get(K key, long asOfTimestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import java.util.Objects;

/**
* Combines a value (from a key-value record) with a timestamp, for use as the return type
* from {@link VersionedKeyValueStore#get(Object, long)} and related methods.
*
* @param <V> The value type
*/
public final class VersionedRecord<V> {
private final V value;
private final long timestamp;

/**
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
*
* @param value the value
* @param timestamp the timestamp
*/
public VersionedRecord(final V value, final long timestamp) {
this.value = Objects.requireNonNull(value);
this.timestamp = timestamp;
}

public V value() {
return value;
}

public long timestamp() {
return timestamp;
}

@Override
public String toString() {
return "<" + value + "," + timestamp + ">";
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final VersionedRecord<?> that = (VersionedRecord<?>) o;
return timestamp == that.timestamp &&
Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(value, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
Expand All @@ -43,10 +44,10 @@
* stores a key into a shared physical store by prepending the key with a prefix (unique to
* the specific logical segment), and storing the combined key into the physical store.
*/
class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment, VersionedStoreSegment {
private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);

public final long id;
private final long id;
private final String name;
private final RocksDBStore physicalStore;
private final PrefixKeyFormatter prefixKeyFormatter;
Expand All @@ -63,6 +64,11 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
}

@Override
public long id() {
return id;
}

@Override
public int compareTo(final LogicalKeyValueSegment segment) {
return Long.compare(id, segment.id);
Expand Down
Loading

0 comments on commit df22a9d

Please sign in to comment.