Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info #5103

Conversation

jadireddi
Copy link
Contributor

https://issues.apache.org/jira/browse/KAFKA-6538

Enhanced exceptions thrown from RocksDBStore with corresponding information for which key/value the operation failed in the wrapping stores (KeyValueStore, WindowedStored, and SessionStore).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jadireddi
Copy link
Contributor Author

@mjsax, Could you please review code changes and let me know if any changes needed.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Couple of comment.

@@ -223,7 +223,7 @@ private void validateStoreOpen() {
try {
return this.db.get(rawKey);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to parse this twice before I understood this. I think, we need to add a comment here -- otherwise, we might remove %s later as no formatter is used here and thus it's unclear why the markup is there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -299,13 +299,13 @@ private void putInternal(final byte[] rawKey,
try {
db.delete(wOptions, rawKey);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while removing key from store " + this.name, e);
throw new ProcessorStateException("Error while removing key %s from store " + this.name, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e);
throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -104,6 +105,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
} else {
underlying.put(entry.key(), entry.newValue());
}
} catch (final ProcessorStateException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I am wondering if caching stores are the best place to fix this. Caching might be disabled and thus, the fix would not cover this case?

Copy link
Contributor Author

@jadireddi jadireddi Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax , Just trying to understand. On the point Caching might be disabled, do you mean kafka wouldn't be supporting this it in future or User might have config option to disable Caching?.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax And does it applies to other Caching files too CachingSessionStore.java and CachingWindowStore.java?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refer to use configs. Users can disable caching. And yes it, also applies to the other caching stores. I think it would be better to use MeteredXXX stores -- those are always used as wrappers and thus fix the issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

bytesStore.put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate));
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), sessionKey,
aggregate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to line above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -223,7 +223,7 @@ private void validateStoreOpen() {
try {
return this.db.get(rawKey);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e); // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the comment in the line above to avoid too long lines :) (same below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

@jadireddi
Copy link
Contributor Author

jadireddi commented Jun 2, 2018

retest this please

@mjsax mjsax added the streams label Jun 5, 2018
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. One follow up question (with cc @guozhangwang )

I am wondering, if we should add some tests for this?

bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic)));
try {
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic)));
} catch (final ProcessorStateException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change here? RocksDBSessionStore should be wrapped with MeteredSessionStore. As an alternative, we can remove the code from MeteredSessionStore thought. Not sure, which class is better for the fix.

\cc @guozhangwang WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all stores will be at least wrapped with MeteredXXStore we can add the error handling only at the metered store layer to be consistent. Besides, the inner RocksDBSessionStore would actually be <Bytes, byte[]> typed always, so this would not help.

Same for the RocksDBWindowStorebelow.

Copy link
Contributor Author

@jadireddi jadireddi Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value));
try {
bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes), serdes.rawValue(value));
} catch (final ProcessorStateException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Not sure if we need this, as the store should be wrapped with MeteredWindowStore.

Copy link
Contributor Author

@jadireddi jadireddi Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change LGTM.

Just note that we cannot yet close KAFKA-6538 since we have not yet fixed the RecordCollector case where the sent bytes have failed. Maybe worth targeting in another PR though as we are approaching the release deadline. cc @mjsax

bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic)));
try {
bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic)));
} catch (final ProcessorStateException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all stores will be at least wrapped with MeteredXXStore we can add the error handling only at the metered store layer to be consistent. Besides, the inner RocksDBSessionStore would actually be <Bytes, byte[]> typed always, so this would not help.

Same for the RocksDBWindowStorebelow.

@mjsax
Copy link
Member

mjsax commented Jun 5, 2018

Thanks @guozhangwang! Doing a follow up PR seems reasonable to me. We should also check, if we can remove the generics from RocksDBSessionStore if it's typed <Byte,byte[]> only anyway.

@guozhangwang
Copy link
Contributor

Ack. Will leave to @mjsax to merge.

@jadireddi jadireddi force-pushed the KAFKA-6538;-To-enhance-RocksDBStore-state-exceptions branch from 1cbaf2b to 8331c26 Compare June 6, 2018 12:12
@mjsax
Copy link
Member

mjsax commented Jun 6, 2018

@jadireddi Why did you not add it to the MeteredXXX stores as suggested by Guozhang?

@jadireddi
Copy link
Contributor Author

jadireddi commented Jun 7, 2018

@mjsax , Added error handling in MeteredSessionStore , MeteredWindowsStore .
As MeteredKeyValueStore and MeteredKeyValueBytesStore using InnerMeteredKeyValueStore to put/get/delete operations. So made changes in InnerMeteredKeyValueStore. Can you please let me know if i am missing something.

@mjsax
Copy link
Member

mjsax commented Jun 7, 2018

@jadireddi Thanks for the explanation. You are right!

@mjsax mjsax merged commit 1509679 into apache:trunk Jun 7, 2018
mjsax pushed a commit that referenced this pull request Jun 7, 2018
…DBStore with more human readable info (#5103)

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
@mjsax
Copy link
Member

mjsax commented Jun 7, 2018

Merged to trunk and cherry-picked to 2.0.

Thanks for the patch @jadireddi!

@mjsax
Copy link
Member

mjsax commented Jun 7, 2018

I closed KAFKA-6538 and created https://issues.apache.org/jira/browse/KAFKA-7015 to track the RecordCollectorImpl issue -- this makes it easier to assign fixes to different versions.

@jadireddi jadireddi deleted the KAFKA-6538;-To-enhance-RocksDBStore-state-exceptions branch June 7, 2018 05:01
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…DBStore with more human readable info (apache#5103)

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants