Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14491: [5/N] Basic operations for RocksDB versioned store #13188

Merged
merged 7 commits into from
Feb 11, 2023

Conversation

vcrfxia
Copy link
Contributor

@vcrfxia vcrfxia commented Feb 2, 2023

Introduces the VersionedKeyValueStore interface proposed in KIP-889, along with the RocksDB-based implementation of the interface. This PR includes fully functional put, get, get-with-timestamp, and delete operations, but does not include the ability to restore records from changelog or surrounding store layers (for metrics or writing to the changelog). Those pieces will come in follow-up PRs.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vcrfxia vcrfxia changed the title KAFKA-14491: [5/N] Support basic operations for RocksDB versioned store KAFKA-14491: [5/N] Basic operations for RocksDB versioned store Feb 2, 2023
@mjsax mjsax added streams kip Requires or implements a KIP labels Feb 3, 2023
@vcrfxia vcrfxia force-pushed the kip-889-versioned-store-basics branch from f889052 to ef05b8d Compare February 4, 2023 18:52
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the actually code (not the tests yet).

* how long old record versions should be kept for. In particular, a versioned store guarantees
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
* bound is within history retention of the current observed stream time. (Queries with timestamp
* bound older than the specified history retention are considered invalid.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify what "now" is with regard to history retention time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line above says "where the provided timestamp bound is within history retention of the current observed stream team." Do you think that needs additional clarification?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry. Seems I missed it... disregard my original comment.

*
* @param key The key
* @param value The value, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as a delete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka has an implicit contract that null objects must be serialized to null bytes[] (for "plain" Kafka the impact of not following this contract is smaller, but for KS we would "fall apart" if the contract is violated.

We should just say, "can be null; null is treated as delete".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that null object must always be serialized to null bytes, but it's also possible that a non-null object serializes to null bytes, right? My intention with this javadoc was to clarify that it's the serialization that determines whether the put() is treated as a delete -- even if the value itself is not null, as long as its serialization is null then it counts as a delete.

If you think this clarification is more confusing than its worth, I can remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's also possible that a non-null object serializes to null bytes, right

That also violations of the contract (it must be null-to-null and non-null-to-non-null), because downstream it would imply that a null-byte array is not deserialized to a null object. (Note: on restore, we actually don't deserialize data and would treat the null-byte array as delete, so stuff breaks if it was not a tombstone...)

I understand your intent, but putting it into the JavaDocs might give the wrong impression that it's ok to do this, while it's not...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I found this comment in the code the other day, which gives the impression that it is valid to serialize a non-null value to null bytes:

// Serializing non-null values to null can be useful when working with Optional-like values
// where the Optional.empty case is serialized to null.
// See the discussion here: #7679

Is this no longer true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR was controversial... And I am still not a fan of it -> #7679 (comment)

It potentially breaks stuff -- of course, if users follow the null<->null and non-null<->non-null pattern the PR does not do any harm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for the additional context. I'll update the javadoc as you suggested.

* @param key The key
* @param timestamp The timestamp for this delete
* @return The value and timestamp of the latest record associated with this key
* as of the deletion timestamp (inclusive), or {@code null} if any of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would simplify: case (1) and (2) is basically the same -- "is a tombstone" is a weird formulation IMHO.

or {@code null} if no record for this key exist at the specified timestamp (note: this includes the case if the timestamp is older than history retention time).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add something like this?

Note: the record timestamp {@code r.ts} of the returned {@link VersionedRecord} may be smaller than the provided delete timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. The reason I had separated the cases (1) and (2) was because there was confusion about this case (latest record version is a tombstone) during initial discussion leading up to the KIP and it was requested that the javadocs be explicit about this. But a lot has changed since those initial discussions so let me update it.

Also, I assume it's fine to update javadocs proposed in the KIP after it's been accepted, right? Should I go back and update the KIP too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally -- JavaDocs are not public API. it's well, docs; if we have them in the KIP, it's just a template/placeholder. Not need to go back to the KIP (otherwise we could never change JavaDocs without altering older KIPs 😂)

}

// it's possible the record belongs in this segment, but also possible it belongs
// in an earlier segment. mark as tentative and continue. as an optimization, this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"earlier segment" -> or "latest value store", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this case is only hit if we've already found a segment which contains data later than the current insertion timestamp, so the new record will never be put into the latest value store.

// insert into segment corresponding to foundTs, as foundTs represents the validTo
// timestamp of the current put.
// the new record is either the earliest or the latest in this segment, depending on the
// circumstances of the fall-through. (it cannot belong in the middle because otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment about "why not middle" is gold! Thanks.

}
} else {
final long foundNextTs = segmentValueSchema.getNextTimestamp(segmentValue);
if (foundNextTs <= timestamp) {
Copy link
Member

@mjsax mjsax Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering if this case is actually possible? Above you just say vaguely depending on the circumstances of the fall-through -- can you elaborate?

The only case for "latest" I do understand would be the case when we actually insert into the "latest value store" instead of a segment. If we really insert into a segment, it seems we would always insert as earliest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, latest is possible for a segment as well. It has to do with the "gaps" which may be formed from degenerate segments. Here's a example:

  • segment interval is 100
  • put tombstone with ts=50, so there is a (degenerate) segment with minTs=nextTs=50 (containing a single tombstone).
  • put record with ts=150, so it goes to the latest value store
  • put record with ts=90, so now there is a new segment with minTs=90, nextTs=150, containing the new record.
  • put record with ts=80: this put operation will "fall-through" with foundTs=90, so the segment it finds will be the one which contains a tombstone (with minTs=nextTs=50), so foundNextTs=50 which is less than the insertion timestamp (80)`.

Do you think this is worth putting into a comment in the code? It's a very long explanation, and requires knowledge of "degenerate segments" which this class currently does not require. I'll leave it out for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I think a short comment like latest is possible if the found segment is a degenerated segment; cf RocksDBVersionedStoreSegmentValueFormatter.java should be sufficient (it seem for regular segments it's never possible, so just pointing out this corner case without given details should be enough to avoid people scratching their head about the "latest" case).

Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed review, @mjsax ! I will push a commit with your requested changes shortly. Also left some responses and questions for you inline.

* how long old record versions should be kept for. In particular, a versioned store guarantees
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
* bound is within history retention of the current observed stream time. (Queries with timestamp
* bound older than the specified history retention are considered invalid.)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line above says "where the provided timestamp bound is within history retention of the current observed stream team." Do you think that needs additional clarification?

*
* @param key The key
* @param value The value, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as a delete
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that null object must always be serialized to null bytes, but it's also possible that a non-null object serializes to null bytes, right? My intention with this javadoc was to clarify that it's the serialization that determines whether the put() is treated as a delete -- even if the value itself is not null, as long as its serialization is null then it counts as a delete.

If you think this clarification is more confusing than its worth, I can remove it.

* {@code null} if either (1) the store contains no records for this key or (2) the
* latest record for this key is a tombstone.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can make a quick pass for this in a follow-up after this is merged.


/**
* Get the latest record associated with this key with timestamp not exceeding the specified
* timestamp bound.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that "associated with this timestamp" could be confusing because the returned record will often have timestamp smaller than the timestamp bound. (This happens whenever there is no record version with the exact provided timestamp.)

What do you think about

Get the record associated with this key, as of the specified timestamp (i.e., existing record with the largest timestamp not exceeding the provided bound).

? I'll make the update.

import java.util.Objects;

/**
* Combines a value from a {@link KeyValue} with a timestamp, for use as the return type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied from ValueAndTimestamp (link) 🤷

I'll remove it.

}
} else {
final long foundNextTs = segmentValueSchema.getNextTimestamp(segmentValue);
if (foundNextTs <= timestamp) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, latest is possible for a segment as well. It has to do with the "gaps" which may be formed from degenerate segments. Here's a example:

  • segment interval is 100
  • put tombstone with ts=50, so there is a (degenerate) segment with minTs=nextTs=50 (containing a single tombstone).
  • put record with ts=150, so it goes to the latest value store
  • put record with ts=90, so now there is a new segment with minTs=90, nextTs=150, containing the new record.
  • put record with ts=80: this put operation will "fall-through" with foundTs=90, so the segment it finds will be the one which contains a tombstone (with minTs=nextTs=50), so foundNextTs=50 which is less than the insertion timestamp (80)`.

Do you think this is worth putting into a comment in the code? It's a very long explanation, and requires knowledge of "degenerate segments" which this class currently does not require. I'll leave it out for now.

* wrapper class for the static methods provided by {@link RocksDBVersionedStoreSegmentValueFormatter}.
* See the javadocs in {@link RocksDBVersionedStoreSegmentValueFormatter} for more.
*/
static class SegmentValueSchema {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had wanted to abstract details of the schema so that the put() and get() logic is reusable (for example, when we implement an in-memory versioned store) but I think it's a premature optimization. I'll get rid of this for now, and we can cross this bridge again when the time comes.

* @param key The key
* @param timestamp The timestamp for this delete
* @return The value and timestamp of the latest record associated with this key
* as of the deletion timestamp (inclusive), or {@code null} if any of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. The reason I had separated the cases (1) and (2) was because there was confusion about this case (latest record version is a tombstone) during initial discussion leading up to the KIP and it was requested that the javadocs be explicit about this. But a lot has changed since those initial discussions so let me update it.

Also, I assume it's fine to update javadocs proposed in the KIP after it's been accepted, right? Should I go back and update the KIP too?

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some follow up on PR comments -- did not review again yet (hopefully later today).

* how long old record versions should be kept for. In particular, a versioned store guarantees
* to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
* bound is within history retention of the current observed stream time. (Queries with timestamp
* bound older than the specified history retention are considered invalid.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry. Seems I missed it... disregard my original comment.

*
* @param key The key
* @param value The value, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as a delete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's also possible that a non-null object serializes to null bytes, right

That also violations of the contract (it must be null-to-null and non-null-to-non-null), because downstream it would imply that a null-byte array is not deserialized to a null object. (Note: on restore, we actually don't deserialize data and would treat the null-byte array as delete, so stuff breaks if it was not a tombstone...)

I understand your intent, but putting it into the JavaDocs might give the wrong impression that it's ok to do this, while it's not...

}
} else {
final long foundNextTs = segmentValueSchema.getNextTimestamp(segmentValue);
if (foundNextTs <= timestamp) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I think a short comment like latest is possible if the found segment is a degenerated segment; cf RocksDBVersionedStoreSegmentValueFormatter.java should be sufficient (it seem for regular segments it's never possible, so just pointing out this corner case without given details should be enough to avoid people scratching their head about the "latest" case).

* @param key The key
* @param timestamp The timestamp for this delete
* @return The value and timestamp of the latest record associated with this key
* as of the deletion timestamp (inclusive), or {@code null} if any of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally -- JavaDocs are not public API. it's well, docs; if we have them in the KIP, it's just a template/placeholder. Not need to go back to the KIP (otherwise we could never change JavaDocs without altering older KIPs 😂)


/**
* Get the latest record associated with this key with timestamp not exceeding the specified
* timestamp bound.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concept of a "validity interval" is easiest to understand?

Get the record associated with this key, as of the specified timestamp (i.e., existing record with r.validFrom <= timestamp < r.validTo; r.validFrom == r.timestamp and r.validTo is the timestamp of the next record).

}
}

private static <T extends VersionedStoreSegment> PutStatus putLatestValueStore(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I find it hard to read if there are long parameter lists.

}

// ran out of segments to search. insert into segment as specified by foundTs
putFallThrough(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Having the helpers is useful to make it easier to understand the control flow... (inlining does not sound like the best idea).

verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1);
Copy link
Member

@mjsax mjsax Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting one... User can now also query using "future timestamp" -- nothing wrong with it; just interesting "side effect" :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User can not also query using "future timestamp"

"can not" -> "can now". Yup!


@Test
public void shouldPutNullAsLatest() {
putToStore("k", null, BASE_TIMESTAMP);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not first do putToStore("k", "v", BASE_TIMESTAMP); to ensure that the put(null) really deletes v ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is for validating that the store is able to handle a tombstone as the first thing put to the store (and continue to insert earlier records out-of-order afterwards). There are other tests (e.g., shouldPutRepeatTimestampAsLatest() and shouldPutRepeatTimestamps()) which test the scenario you've requested.

import org.junit.Before;
import org.junit.Test;

public class RocksDBVersionedStoreTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test (maybe in a follow up PR, with retention time zero)? Having no segments and segment_interval = 0 (?) seems to be a corner case we should also support.

Btw: it seems we don't test if delete() does return the right "old value"; can we also add some tests for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test (maybe in a follow up PR, with retention time zero)?

Yes, that's a good point. I'll do it in a follow-up PR because I think we'll need a minor code change as well in order to not instantiate the segments store in this scenario.

it seems we don't test if delete() does return the right "old value"; can we also add some tests for it?

Added a new shouldDelete().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test (maybe in a follow up PR, with retention time zero)?

Added in #13243.

Having no segments and segment_interval = 0 (?) seems to be a corner case we should also support.

Let's discuss in the other PR about whether it makes sense to get rid of the segments store entirely in this case or not.

putToStore("k", "b", BASE_TIMESTAMP);

verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify that "to_be_replaced" was not move and thus we cannot query it with BASE_TIMESTAMP - 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added this check into each of the verifications in this test.

putToStore("k", null, BASE_TIMESTAMP);

verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same: should we verify that k is also not there for BASE_TIMESTAMP - 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added (as above).

putToStore("k", "vn5", SEGMENT_INTERVAL - 5);
putToStore("k", null, SEGMENT_INTERVAL + 20);
putToStore("k", null, SEGMENT_INTERVAL + 20);
putToStore("k", "vn6", SEGMENT_INTERVAL - 6);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of operations and hard to keep track about it without writing it down... could we add some comments about "expected state" (maybe also in-between the puts)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments to the puts regarding why there are so many lines (they test different combinations of null vs non-null being replaced with null vs non-null, and also with timestamps across segments vs in the same segment vs the latest value store).

I didn't add anything regarding the current state at each step since that feels a bit overkill. The way to figure out the end state is to ignore everything that says "to_be_replace" and then de-dup the few remaining rows. If we want to further improve readability, we can break this up into separate tests but my gut says that's unnecessary at the moment.

@Test
public void shouldPutNonLatestTombstoneIntoNewSegmentWithValidTo() {
putToStore("k", "vp30", SEGMENT_INTERVAL + 30);
putToStore("k", null, SEGMENT_INTERVAL - 10); // this put should result in tombstone with validTo=SEGMENT_INTERVAL+30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the degenerated segment case, right? So validTo should be validfrom == SEGMENT_INTERVAL - 10? (or do I remember incorrectly how degenerated segments work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not degenerate. It's not degenerate because there is a well-defined validTo in this case, from the record in the latest value store (vp30 with timestamp SEGMENT_INTERVAL + 30). Degenerate segments only form if the latest value is a tombstone, which means there is no validTo we can use for the tombstone.

}

@Test
public void shouldNotGetExpired() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the validTo-update-case via an "out of order" put (regular record or null) leading to an existing record begin expired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I started adding a new test case for this but then realized we can't actually test this without querying the physical segments store itself. Say we try to set up the test like so:

  • put v1 with ts=0
  • put v3 with ts=100
  • advance stream time so ts=50 is expired
  • put v2 with ts=40, which updates the validTo of the first put to something expired

Now what should the validation be? If we query at ts=40, of course we get null because everything prior to ts=50 is expired (the existing test already checks this). If we query at ts=60, of course we get v2 but this happens regardless of whether v1 is expired or not. So the only way to validate that v1 is actually expired is to check the underlying store, but that responsibility seems like it belongs with LogicalKeyValueSegmentsTest instead.

Copy link
Contributor Author

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for having a look at the test coverage, @mjsax ! Responded to your comments inline, and just pushed a new commit.

As a general note about my test philosophy for this PR: there's a lot of functionality in this new class. I've added bite-sized unit tests for different parts of the functionality, and also verified that the tests together hit every branch of the put and get logic (i.e., 100% code coverage in the main logic flow). I also wrote put/get tests with randomly generated data and ran that hundreds of times with different combinations of history retention and segment interval. I chose not to check in the code for generating random data to test on since that seems like overkill, but my confidence in the correctness of this code comes a lot more from those tests on autogenerated data than the unit tests which are being checked in.

import org.junit.Before;
import org.junit.Test;

public class RocksDBVersionedStoreTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test (maybe in a follow up PR, with retention time zero)?

Yes, that's a good point. I'll do it in a follow-up PR because I think we'll need a minor code change as well in order to not instantiate the segments store in this scenario.

it seems we don't test if delete() does return the right "old value"; can we also add some tests for it?

Added a new shouldDelete().

verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User can not also query using "future timestamp"

"can not" -> "can now". Yup!


@Test
public void shouldPutNullAsLatest() {
putToStore("k", null, BASE_TIMESTAMP);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is for validating that the store is able to handle a tombstone as the first thing put to the store (and continue to insert earlier records out-of-order afterwards). There are other tests (e.g., shouldPutRepeatTimestampAsLatest() and shouldPutRepeatTimestamps()) which test the scenario you've requested.

putToStore("k", "b", BASE_TIMESTAMP);

verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added this check into each of the verifications in this test.

putToStore("k", "vn5", SEGMENT_INTERVAL - 5);
putToStore("k", null, SEGMENT_INTERVAL + 20);
putToStore("k", null, SEGMENT_INTERVAL + 20);
putToStore("k", "vn6", SEGMENT_INTERVAL - 6);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments to the puts regarding why there are so many lines (they test different combinations of null vs non-null being replaced with null vs non-null, and also with timestamps across segments vs in the same segment vs the latest value store).

I didn't add anything regarding the current state at each step since that feels a bit overkill. The way to figure out the end state is to ignore everything that says "to_be_replace" and then de-dup the few remaining rows. If we want to further improve readability, we can break this up into separate tests but my gut says that's unnecessary at the moment.

@Test
public void shouldPutNonLatestTombstoneIntoNewSegmentWithValidTo() {
putToStore("k", "vp30", SEGMENT_INTERVAL + 30);
putToStore("k", null, SEGMENT_INTERVAL - 10); // this put should result in tombstone with validTo=SEGMENT_INTERVAL+30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not degenerate. It's not degenerate because there is a well-defined validTo in this case, from the record in the latest value store (vp30 with timestamp SEGMENT_INTERVAL + 30). Degenerate segments only form if the latest value is a tombstone, which means there is no validTo we can use for the tombstone.

}

@Test
public void shouldNotGetExpired() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I started adding a new test case for this but then realized we can't actually test this without querying the physical segments store itself. Say we try to set up the test like so:

  • put v1 with ts=0
  • put v3 with ts=100
  • advance stream time so ts=50 is expired
  • put v2 with ts=40, which updates the validTo of the first put to something expired

Now what should the validation be? If we query at ts=40, of course we get null because everything prior to ts=50 is expired (the existing test already checks this). If we query at ts=60, of course we get v2 but this happens regardless of whether v1 is expired or not. So the only way to validate that v1 is actually expired is to check the underlying store, but that responsibility seems like it belongs with LogicalKeyValueSegmentsTest instead.

putToStore("k", null, BASE_TIMESTAMP);

verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added (as above).

mjsax pushed a commit that referenced this pull request Mar 21, 2023
Part of KIP-899.

AbstractSegments automatically calls the helper method to clean up expired segments as part of getOrCreateSegmentIfLive(). This works fine for windowed store implementations which call getOrCreateSegmentIfLive() exactly once per put() call, but is inefficient and difficult to reason about for the new RocksDBVersionedStore implementation (cf. #13188) which makes potentially multiple calls to getOrCreateSegmentIfLive() for different segments for a single put() call. This PR addresses this by refactoring the call to clean up expired segments out of getOrCreateSegmentIfLive(), opting to have the different segments implementations specify when cleanup should occur instead. After this PR, RocksDBVersionedStore only cleans up expired segments once per call to put().

Reviewers: Matthias J. Sax <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants