Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10562: Properly invoke new StateStoreContext init #9388

Merged
merged 7 commits into from
Oct 8, 2020

Conversation

vvcephei
Copy link
Contributor

@vvcephei vvcephei commented Oct 7, 2020

  • all wrapping stores should pass StateStoreContext init through to the same
    method on the wrapped store and not translate it to ProcessorContext init
  • base-level stores should handle StateStoreContext init so that callers passing
    a non-InternalProcessorContext implementation will be able to initialize the store
  • extra tests are added to verify the desired behavior

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -34,6 +34,7 @@
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
*/
public class WordCountProcessorTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ticket needs to go in to 2.7.0 also, but I split it out for reviewability.

*/
@Deprecated
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the deprecation tag right now lets us be sure we encountered all places this method appears in the codebase.

@Override
public void init(final StateStoreContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are going to be a lot of duplicated init methods. It's not great, but hopefully we can drop the old API before too long.

Comment on lines +54 to +56
public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
return (StreamsMetricsImpl) context.metrics();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a bunch of duplicated extractors here to help keep the implementation classes clean.

Comment on lines +70 to +78
public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced a lot of casts with this checked-cast method, which also lets us get rid of a lot of similar cast-checking blocks, which were inconsistently applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment on lines +49 to +50
public void openDB(final Map<String, Object> configs, final File stateDir) {
super.openDB(configs, stateDir);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to remove the type-dependency of the context by re-specifying the interface in terms of the only two properties it needed.

taskId = context.taskId().toString();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();

registerMetrics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to extract out quite as much common code in the Metered implementations because they need to work regardless of whether the context is an InternalProcessorContext or whether it's a straight mock (for unit tests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.context seems only used in the e2e latency as

final long e2eLatency =  currentTime - context.timestamp();

And in that case we may throw a NPE. Should we augment the condition as

if (e2eLatencySensor.shouldRecord() && context != null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I saw you already did this :)

@@ -122,7 +122,7 @@
}

@SuppressWarnings("unchecked")
void openDB(final ProcessorContext context) {
void openDB(final Map<String, Object> configs, final File stateDir) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the interface change that saved us from needing two openDB methods.

@@ -147,7 +147,7 @@ public void shouldNotAllowToSchedulePunctuations() {
public void shouldNotAllowInitForKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
store.init((ProcessorContext) null, null);
store.init((StateStoreContext) null, null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually a bug before, which this PR fixes: the wrapping layers should transmit the init call straight down, rather than translating it. There are a whole set of new unit tests making sure that this works properly for both the new and old init methods.

@@ -100,6 +100,31 @@ public void after() {
return store;
}

@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the new unit tests I added to make sure that all the store builder wrappers transmit init calls correctly. They are frustratingly similar, but not exactly the same across different test classes because the test classes follow different idioms.

I think it'd be nice to follow up with a general store-verification test that's parameterized by the exact store types so we can specify this test logic once and apply it to all the stores. That would also be handy for most of the rest of these tests. But I don't think we need to distract this PR with that concern.

Comment on lines +70 to +78
public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not import static the function directly like in other classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, probably just overlooked it.

taskId = context.taskId().toString();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();

registerMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.context seems only used in the e2e latency as

final long e2eLatency =  currentTime - context.timestamp();

And in that case we may throw a NPE. Should we augment the condition as

if (e2eLatencySensor.shouldRecord() && context != null)

streamsMetrics = (StreamsMetricsImpl) context.metrics();

registerMetrics();
final Sensor restoreSensor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we should remove the restoreSensor since we no longer restore the state upon init any more? In KIP-444 we no longer have it as a state-store level metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh. I'll double-check and take it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'll do that in a quick follow-up PR, so I can go ahead and merge this.

taskId = context.taskId().toString();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();

registerMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I saw you already did this :)

streamsMetrics = (StreamsMetricsImpl) context.metrics();

registerMetrics();
final Sensor restoreSensor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

@@ -122,7 +158,7 @@ public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listene
@Override
public void put(final K key,
final V value) {
put(key, value, context.timestamp());
put(key, value, context != null ? context.timestamp() : 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more: is this.context only null in unit tests? It seems a bit overkill to let non-testing code to cope with testing code if yes..

Could we let the mock class to extend from InternalProcessorContext as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not something I normally like to do, either. In this case, though, it's necessary. The thing is that all our internal StateStoreContexts are InternalProcessorContext implementations, and therefore, they are also ProcessorContext implementations, so they have a timestamp() method.

The thing that makes this unavoidable is that it's ok for users to init a state store using the MockProcessorContext we provide for them in test-utils. This is a bit of a bleed-over from the next pr, which I'm still finishing up, but it's better if we keep their context "pure". I.e., I'm going to propose to add a new context that's just an api.ProcessorContext and a separate implementation that just a StateStoreContext. We should discuss on that PR whether that's really the best way to present it, but if you ultimately agree, then it means we have to expect a null context here.

Note that the only functionality it affects is the recording of metrics that probably don't matter in unit tests and this stub behavior for a deprecated method that people shouldn't be using.

If after reviewing the next PR, we do wind up converging the implementations, I'll come back and undo these checks here.

@vvcephei
Copy link
Contributor Author

vvcephei commented Oct 8, 2020

Looks like the java 11 build only had a couple of environmental failures:

00:52:22  org.gradle.internal.remote.internal.ConnectException: Could not connect to server [86e7da13-a292-40f6-a626-7774e2173e77 port:42903, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
00:52:22  	at org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
00:52:22  	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:123)
00:52:22  	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
00:52:22  	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
00:52:22  	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
00:52:22  Caused by: java.net.ConnectException: Connection refused
00:52:22  	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
00:52:22  	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:776)
00:52:22  	at java.base/sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:120)
00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
00:52:22  	... 5 more

@vvcephei
Copy link
Contributor Author

vvcephei commented Oct 8, 2020

Thanks for the review, @guozhangwang ! I'll go ahead and merge this, and deal with the couple of cleanup issues in a follow-on PR.

@vvcephei vvcephei merged commit 2804257 into trunk Oct 8, 2020
@vvcephei vvcephei deleted the kip-478-part-5-state-store-wrappers branch October 8, 2020 04:06
javierfreire pushed a commit to javierfreire/kafka that referenced this pull request Oct 8, 2020
* all wrapping stores should pass StateStoreContext init through to the same
  method on the wrapped store and not translate it to ProcessorContext init
* base-level stores should handle StateStoreContext init so that callers passing
  a non-InternalProcessorContext implementation will be able to initialize the store
* extra tests are added to verify the desired behavior

Reviewers: Guozhang Wang <[email protected]>
ijuma added a commit to confluentinc/kafka that referenced this pull request Oct 8, 2020
* commit '2804257fe221f37e5098bd': (67 commits)
  KAFKA-10562: Properly invoke new StateStoreContext init (apache#9388)
  MINOR: trivial cleanups, javadoc errors, omitted StateStore tests, etc. (apache#8130)
  KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (apache#9373)
  KAFKA-9274: fix incorrect default value for `task.timeout.ms` config (apache#9385)
  KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (apache#9247)
  KAFKA-10028: Implement write path for feature versioning system (KIP-584) (apache#9001)
  KAFKA-10402: Upgrade system tests to python3 (apache#9196)
  KAFKA-10186; Abort transaction with pending data with TransactionAbortedException (apache#9280)
  MINOR: Remove `TargetVoters` from `DescribeQuorum` (apache#9376)
  Revert "KAFKA-10469: Resolve logger levels hierarchically (apache#9266)"
  MINOR: Don't publish javadocs for raft module (apache#9336)
  KAFKA-9929: fix: add missing default implementations (apache#9321)
  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (apache#8910)
  KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) (apache#9345)
  KAFKA-10527; Voters should not reinitialize as leader in same epoch (apache#9348)
  MINOR: Refactor unit tests around RocksDBConfigSetter (apache#9358)
  KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter (apache#9099)
  MINOR: Annotate test BlockingConnectorTest as integration test (apache#9379)
  MINOR: Fix failing test due to KAFKA-10556 PR (apache#9372)
  KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (apache#9320)
  ...
@mjsax mjsax added the kip Requires or implements a KIP label Jan 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants