-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info #5103
KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info #5103
Conversation
@mjsax, Could you please review code changes and let me know if any changes needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Couple of comment.
@@ -223,7 +223,7 @@ private void validateStoreOpen() { | |||
try { | |||
return this.db.get(rawKey); | |||
} catch (final RocksDBException e) { | |||
throw new ProcessorStateException("Error while getting value for key from store " + this.name, e); | |||
throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to parse this twice before I understood this. I think, we need to add a comment here -- otherwise, we might remove %s
later as no formatter is used here and thus it's unclear why the markup is there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -299,13 +299,13 @@ private void putInternal(final byte[] rawKey, | |||
try { | |||
db.delete(wOptions, rawKey); | |||
} catch (final RocksDBException e) { | |||
throw new ProcessorStateException("Error while removing key from store " + this.name, e); | |||
throw new ProcessorStateException("Error while removing key %s from store " + this.name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
} else { | ||
try { | ||
db.put(wOptions, rawKey, rawValue); | ||
} catch (final RocksDBException e) { | ||
throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e); | ||
throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -104,6 +105,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern | |||
} else { | |||
underlying.put(entry.key(), entry.newValue()); | |||
} | |||
} catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this, I am wondering if caching stores are the best place to fix this. Caching might be disabled and thus, the fix would not cover this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax , Just trying to understand. On the point Caching might be disabled
, do you mean kafka wouldn't be supporting this it in future or User might have config option to disable Caching?.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax And does it applies to other Caching files too CachingSessionStore.java
and CachingWindowStore.java
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refer to use configs. Users can disable caching. And yes it, also applies to the other caching stores. I think it would be better to use MeteredXXX
stores -- those are always used as wrappers and thus fix the issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
bytesStore.put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate)); | ||
} catch (final ProcessorStateException e) { | ||
final String message = String.format(e.getMessage(), sessionKey, | ||
aggregate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -223,7 +223,7 @@ private void validateStoreOpen() { | |||
try { | |||
return this.db.get(rawKey); | |||
} catch (final RocksDBException e) { | |||
throw new ProcessorStateException("Error while getting value for key from store " + this.name, e); | |||
throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e); // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put the comment in the line above to avoid too long lines :) (same below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed :)
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. One follow up question (with cc @guozhangwang )
I am wondering, if we should add some tests for this?
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); | ||
try { | ||
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); | ||
} catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this change here? RocksDBSessionStore
should be wrapped with MeteredSessionStore
. As an alternative, we can remove the code from MeteredSessionStore
thought. Not sure, which class is better for the fix.
\cc @guozhangwang WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all stores will be at least wrapped with MeteredXXStore
we can add the error handling only at the metered store layer to be consistent. Besides, the inner RocksDBSessionStore
would actually be <Bytes, byte[]>
typed always, so this would not help.
Same for the RocksDBWindowStorebelow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value)); | ||
try { | ||
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value)); | ||
} catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above. Not sure if we need this, as the store should be wrapped with MeteredWindowStore
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM.
Just note that we cannot yet close KAFKA-6538 since we have not yet fixed the RecordCollector case where the sent bytes have failed. Maybe worth targeting in another PR though as we are approaching the release deadline. cc @mjsax
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); | ||
try { | ||
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); | ||
} catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all stores will be at least wrapped with MeteredXXStore
we can add the error handling only at the metered store layer to be consistent. Besides, the inner RocksDBSessionStore
would actually be <Bytes, byte[]>
typed always, so this would not help.
Same for the RocksDBWindowStorebelow.
Thanks @guozhangwang! Doing a follow up PR seems reasonable to me. We should also check, if we can remove the generics from |
Ack. Will leave to @mjsax to merge. |
…DBStore with more human readable info.
This reverts commit b07b059580fa1946ffd90e242239eaf9e2468d35.
…XX store classes.
1cbaf2b
to
8331c26
Compare
@jadireddi Why did you not add it to the |
@mjsax , Added error handling in |
@jadireddi Thanks for the explanation. You are right! |
…DBStore with more human readable info (#5103) Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
Merged to Thanks for the patch @jadireddi! |
I closed KAFKA-6538 and created https://issues.apache.org/jira/browse/KAFKA-7015 to track the |
…DBStore with more human readable info (apache#5103) Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
https://issues.apache.org/jira/browse/KAFKA-6538
Enhanced exceptions thrown from
RocksDBStore
with corresponding information for which key/value the operation failed in the wrapping stores (KeyValueStore, WindowedStored, and SessionStore).Committer Checklist (excluded from commit message)