Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14491: [5/N] Basic operations for RocksDB versioned store #13188

Merged
merged 7 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

/**
* A key-value store that stores multiple record versions per key, and supports timestamp-based
* retrieval operations to return the latest record (per key) as of a specified timestamp.
* Only one record is stored per key and timestamp, i.e., a second call to
* {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
* <p>
* Each store instance has an associated, fixed-duration "history retention" which specifies
* how long old record versions should be kept for. In particular, a versioned store guarantees
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
* bound is within history retention of the current observed stream time. (Queries with timestamp
* bound older than the specified history retention are considered invalid.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify what "now" is with regard to history retention time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line above says "where the provided timestamp bound is within history retention of the current observed stream team." Do you think that needs additional clarification?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry. Seems I missed it... disregard my original comment.

*
* @param <K> The key type
* @param <V> The value type
*/
public interface VersionedKeyValueStore<K, V> extends StateStore {

/**
* Add a new record version associated with the specified key and timestamp.
*
* @param key The key
* @param value The value, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as a delete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka has an implicit contract that null objects must be serialized to null bytes[] (for "plain" Kafka the impact of not following this contract is smaller, but for KS we would "fall apart" if the contract is violated.

We should just say, "can be null; null is treated as delete".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that null object must always be serialized to null bytes, but it's also possible that a non-null object serializes to null bytes, right? My intention with this javadoc was to clarify that it's the serialization that determines whether the put() is treated as a delete -- even if the value itself is not null, as long as its serialization is null then it counts as a delete.

If you think this clarification is more confusing than its worth, I can remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's also possible that a non-null object serializes to null bytes, right

That also violations of the contract (it must be null-to-null and non-null-to-non-null), because downstream it would imply that a null-byte array is not deserialized to a null object. (Note: on restore, we actually don't deserialize data and would treat the null-byte array as delete, so stuff breaks if it was not a tombstone...)

I understand your intent, but putting it into the JavaDocs might give the wrong impression that it's ok to do this, while it's not...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I found this comment in the code the other day, which gives the impression that it is valid to serialize a non-null value to null bytes:

// Serializing non-null values to null can be useful when working with Optional-like values
// where the Optional.empty case is serialized to null.
// See the discussion here: #7679

Is this no longer true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR was controversial... And I am still not a fan of it -> #7679 (comment)

It potentially breaks stuff -- of course, if users follow the null<->null and non-null<->non-null pattern the PR does not do any harm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for the additional context. I'll update the javadoc as you suggested.

* @param timestamp The timestamp for this record version
* @throws NullPointerException If {@code null} is used for key.
*/
void put(K key, V value, long timestamp);

/**
* Delete the value associated with this key from the store, at the specified timestamp
* (if there is such a value), and return the deleted value.
* <p>
* This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
* followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
*
* @param key The key
* @param timestamp The timestamp for this delete
* @return The value and timestamp of the record associated with this key as of
* the deletion timestamp (inclusive), or {@code null} if no such record exists
* (including if the deletion timestamp is older than this store's history
* retention time, i.e., the store no longer contains data for the provided
* timestamp). Note that the record timestamp {@code r.timestamp()} of the
* returned {@link VersionedRecord} may be smaller than the provided deletion
* timestamp.
* @throws NullPointerException If {@code null} is used for key.
*/
VersionedRecord<V> delete(K key, long timestamp);

/**
* Get the current (i.e., latest by timestamp) record associated with this key.
*
* @param key The key to fetch
* @return The value and timestamp of the current record associated with this key, or
* {@code null} if there is no current record for this key.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is missing in put() and delete() above -- seems we also don't have this documented on every method on KeyValueStore...

Maybe we should do a follow up PR to check all existing interfaces if the do document this exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can make a quick pass for this in a follow-up after this is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in #13615.

*/
VersionedRecord<V> get(K key);

/**
* Get the record associated with this key as of the specified timestamp (i.e.,
* the existing record with the largest timestamp not exceeding the provided
* timestamp bound).
*
* @param key The key to fetch
* @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record
* (for the specified key) exists with this timestamp, then
* this is the record that will be returned.
* @return The value and timestamp of the record associated with this key
* as of the provided timestamp, or {@code null} if no such record exists
* (including if the provided timestamp bound is older than this store's history
* retention time, i.e., the store no longer contains data for the provided
* timestamp). Note that the record timestamp {@code r.timestamp()} of the
* returned {@link VersionedRecord} may be smaller than the provided timestamp
* bound.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> get(K key, long asOfTimestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import java.util.Objects;

/**
* Combines a value (from a key-value record) with a timestamp, for use as the return type
* from {@link VersionedKeyValueStore#get(Object, long)} and related methods.
*
* @param <V> The value type
*/
public final class VersionedRecord<V> {
private final V value;
private final long timestamp;

/**
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
*
* @param value the value
* @param timestamp the timestamp
*/
public VersionedRecord(final V value, final long timestamp) {
this.value = Objects.requireNonNull(value);
this.timestamp = timestamp;
}

public V value() {
return value;
}

public long timestamp() {
return timestamp;
}

@Override
public String toString() {
return "<" + value + "," + timestamp + ">";
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final VersionedRecord<?> that = (VersionedRecord<?>) o;
return timestamp == that.timestamp &&
Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(value, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
Expand All @@ -43,10 +44,10 @@
* stores a key into a shared physical store by prepending the key with a prefix (unique to
* the specific logical segment), and storing the combined key into the physical store.
*/
class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment, VersionedStoreSegment {
private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);

public final long id;
private final long id;
private final String name;
private final RocksDBStore physicalStore;
private final PrefixKeyFormatter prefixKeyFormatter;
Expand All @@ -63,6 +64,11 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
}

@Override
public long id() {
return id;
}

@Override
public int compareTo(final LogicalKeyValueSegment segment) {
return Long.compare(id, segment.id);
Expand Down
Loading