-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14491: [5/N] Basic operations for RocksDB versioned store #13188
Changes from 5 commits
ecbdeb3
ef05b8d
da5ac54
4eeea87
642651e
ddeb18b
42fb81d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.streams.state; | ||
|
||
import org.apache.kafka.streams.errors.InvalidStateStoreException; | ||
import org.apache.kafka.streams.processor.StateStore; | ||
|
||
/** | ||
* A key-value store that stores multiple record versions per key, and supports timestamp-based | ||
* retrieval operations to return the latest record (per key) as of a specified timestamp. | ||
* Only one record is stored per key and timestamp, i.e., a second call to | ||
* {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. | ||
* <p> | ||
* Each store instance has an associated, fixed-duration "history retention" which specifies | ||
* how long old record versions should be kept for. In particular, a versioned store guarantees | ||
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp | ||
* bound is within history retention of the current observed stream time. (Queries with timestamp | ||
* bound older than the specified history retention are considered invalid.) | ||
* | ||
* @param <K> The key type | ||
* @param <V> The value type | ||
*/ | ||
public interface VersionedKeyValueStore<K, V> extends StateStore { | ||
|
||
/** | ||
* Add a new record version associated with the specified key and timestamp. | ||
* | ||
* @param key The key | ||
* @param value The value, it can be {@code null}; | ||
* if the serialized bytes are also {@code null} it is interpreted as a delete | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kafka has an implicit contract that We should just say, "can be null; null is treated as delete". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense that null object must always be serialized to null bytes, but it's also possible that a non-null object serializes to null bytes, right? My intention with this javadoc was to clarify that it's the serialization that determines whether the put() is treated as a delete -- even if the value itself is not null, as long as its serialization is null then it counts as a delete. If you think this clarification is more confusing than its worth, I can remove it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That also violations of the contract (it must be null-to-null and non-null-to-non-null), because downstream it would imply that a null-byte array is not deserialized to a null object. (Note: on restore, we actually don't deserialize data and would treat the null-byte array as delete, so stuff breaks if it was not a tombstone...) I understand your intent, but putting it into the JavaDocs might give the wrong impression that it's ok to do this, while it's not... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm... I found this comment in the code the other day, which gives the impression that it is valid to serialize a non-null value to null bytes:
Is this no longer true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR was controversial... And I am still not a fan of it -> #7679 (comment) It potentially breaks stuff -- of course, if users follow the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, thanks for the additional context. I'll update the javadoc as you suggested. |
||
* @param timestamp The timestamp for this record version | ||
* @throws NullPointerException If {@code null} is used for key. | ||
*/ | ||
void put(K key, V value, long timestamp); | ||
|
||
/** | ||
* Delete the value associated with this key from the store, at the specified timestamp | ||
* (if there is such a value), and return the deleted value. | ||
* <p> | ||
* This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)} | ||
* followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}. | ||
* | ||
* @param key The key | ||
* @param timestamp The timestamp for this delete | ||
* @return The value and timestamp of the record associated with this key as of | ||
* the deletion timestamp (inclusive), or {@code null} if no such record exists | ||
* (including if the deletion timestamp is older than this store's history | ||
* retention time, i.e., the store no longer contains data for the provided | ||
* timestamp). Note that the record timestamp {@code r.timestamp()} of the | ||
* returned {@link VersionedRecord} may be smaller than the provided deletion | ||
* timestamp. | ||
* @throws NullPointerException If {@code null} is used for key. | ||
*/ | ||
VersionedRecord<V> delete(K key, long timestamp); | ||
|
||
/** | ||
* Get the current (i.e., latest by timestamp) record associated with this key. | ||
* | ||
* @param key The key to fetch | ||
* @return The value and timestamp of the current record associated with this key, or | ||
* {@code null} if there is no current record for this key. | ||
* @throws NullPointerException If null is used for key. | ||
* @throws InvalidStateStoreException if the store is not initialized | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is missing in Maybe we should do a follow up PR to check all existing interfaces if the do document this exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I can make a quick pass for this in a follow-up after this is merged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in #13615. |
||
*/ | ||
VersionedRecord<V> get(K key); | ||
|
||
/** | ||
* Get the record associated with this key as of the specified timestamp (i.e., | ||
* the existing record with the largest timestamp not exceeding the provided | ||
* timestamp bound). | ||
* | ||
* @param key The key to fetch | ||
* @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record | ||
* (for the specified key) exists with this timestamp, then | ||
* this is the record that will be returned. | ||
* @return The value and timestamp of the record associated with this key | ||
* as of the provided timestamp, or {@code null} if no such record exists | ||
* (including if the provided timestamp bound is older than this store's history | ||
* retention time, i.e., the store no longer contains data for the provided | ||
* timestamp). Note that the record timestamp {@code r.timestamp()} of the | ||
* returned {@link VersionedRecord} may be smaller than the provided timestamp | ||
* bound. | ||
* @throws NullPointerException If null is used for key. | ||
* @throws InvalidStateStoreException if the store is not initialized | ||
*/ | ||
VersionedRecord<V> get(K key, long asOfTimestamp); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.streams.state; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* Combines a value (from a key-value record) with a timestamp, for use as the return type | ||
* from {@link VersionedKeyValueStore#get(Object, long)} and related methods. | ||
* | ||
* @param <V> The value type | ||
*/ | ||
public final class VersionedRecord<V> { | ||
private final V value; | ||
private final long timestamp; | ||
|
||
/** | ||
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. | ||
* | ||
* @param value the value | ||
* @param timestamp the timestamp | ||
*/ | ||
public VersionedRecord(final V value, final long timestamp) { | ||
this.value = Objects.requireNonNull(value); | ||
this.timestamp = timestamp; | ||
} | ||
|
||
public V value() { | ||
return value; | ||
} | ||
|
||
public long timestamp() { | ||
return timestamp; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "<" + value + "," + timestamp + ">"; | ||
} | ||
|
||
@Override | ||
public boolean equals(final Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
final VersionedRecord<?> that = (VersionedRecord<?>) o; | ||
return timestamp == that.timestamp && | ||
Objects.equals(value, that.value); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(value, timestamp); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we specify what "now" is with regard to history retention time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line above says "where the provided timestamp bound is within history retention of the current observed stream team." Do you think that needs additional clarification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry. Seems I missed it... disregard my original comment.