-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10562: Properly invoke new StateStoreContext init #9388
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,9 @@ | |
import org.apache.kafka.streams.processor.ProcessorContext; | ||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment>, Segment { | ||
|
@@ -45,8 +47,8 @@ public int compareTo(final KeyValueSegment segment) { | |
} | ||
|
||
@Override | ||
public void openDB(final ProcessorContext context) { | ||
super.openDB(context); | ||
public void openDB(final Map<String, Object> configs, final File stateDir) { | ||
super.openDB(configs, stateDir); | ||
Comment on lines
+49
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was able to remove the type-dependency of the context by re-specifying the interface in terms of the only two properties it needed. |
||
// skip the registering step | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.processor.ProcessorContext; | ||
import org.apache.kafka.streams.processor.StateStore; | ||
import org.apache.kafka.streams.processor.StateStoreContext; | ||
import org.apache.kafka.streams.processor.To; | ||
import org.apache.kafka.streams.processor.internals.Task.TaskType; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
|
@@ -147,7 +148,7 @@ public void shouldNotAllowToSchedulePunctuations() { | |
public void shouldNotAllowInitForKeyValueStore() { | ||
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); | ||
try { | ||
store.init((ProcessorContext) null, null); | ||
store.init((StateStoreContext) null, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was actually a bug before, which this PR fixes: the wrapping layers should transmit the init call straight down, rather than translating it. There are a whole set of new unit tests making sure that this works properly for both the new and old init methods. |
||
fail("Should have thrown UnsupportedOperationException."); | ||
} catch (final UnsupportedOperationException expected) { } | ||
} | ||
|
@@ -156,7 +157,7 @@ public void shouldNotAllowInitForKeyValueStore() { | |
public void shouldNotAllowInitForTimestampedKeyValueStore() { | ||
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); | ||
try { | ||
store.init((ProcessorContext) null, null); | ||
store.init((StateStoreContext) null, null); | ||
fail("Should have thrown UnsupportedOperationException."); | ||
} catch (final UnsupportedOperationException expected) { } | ||
} | ||
|
@@ -165,7 +166,7 @@ public void shouldNotAllowInitForTimestampedKeyValueStore() { | |
public void shouldNotAllowInitForWindowStore() { | ||
final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); | ||
try { | ||
store.init((ProcessorContext) null, null); | ||
store.init((StateStoreContext) null, null); | ||
fail("Should have thrown UnsupportedOperationException."); | ||
} catch (final UnsupportedOperationException expected) { } | ||
} | ||
|
@@ -174,7 +175,7 @@ public void shouldNotAllowInitForWindowStore() { | |
public void shouldNotAllowInitForTimestampedWindowStore() { | ||
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); | ||
try { | ||
store.init((ProcessorContext) null, null); | ||
store.init((StateStoreContext) null, null); | ||
fail("Should have thrown UnsupportedOperationException."); | ||
} catch (final UnsupportedOperationException expected) { } | ||
} | ||
|
@@ -183,7 +184,7 @@ public void shouldNotAllowInitForTimestampedWindowStore() { | |
public void shouldNotAllowInitForSessionStore() { | ||
final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); | ||
try { | ||
store.init((ProcessorContext) null, null); | ||
store.init((StateStoreContext) null, null); | ||
fail("Should have thrown UnsupportedOperationException."); | ||
} catch (final UnsupportedOperationException expected) { } | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a handful of these also, just passing the deprecation on to the callers.