-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7021: Reuse source based on config #5163
KAFKA-7021: Reuse source based on config #5163
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of comments/question.
I think I understand to overall idea and it makes sense -- need think about some details a little more to really fully understand the change.
/** | ||
* Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization | ||
*/ | ||
public static final String OPTIMIZE_AT_20 = "2.0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Should we not use "all"
? Also, this would be public API change -- we can still update the corresponding KIP, but I don't think we need this, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Since this our first optimization, "all"
should be sufficient. But this raises another good point if we plan to track optimization versions are we setting ourselves up for a KIP for each optimization release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Sounds good. I'm reverting to use
all
. - Moving forward, not needing one KIP for each release. We can use a single KIP adding the mechanism, and if two versions are compatible (say if no new incompatible optimizations introduced from X to Y), we can just document it in upgrade section.
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE_AT_20); | ||
|
||
if (enableOptimization20) { | ||
for (Map.Entry<StoreBuilder, String> entry : internalTopologyBuilder.storeToSourceChangelogTopic.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
if (storeToChangelogTopic.containsKey(sourceStoreName)) { | ||
throw new TopologyException("Source store " + sourceStoreName + " is already added."); | ||
} | ||
storeToChangelogTopic.put(sourceStoreName, topic); | ||
} | ||
|
||
// TODO: this method is only used by DSL and we might want to refactor this part |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove those comments? I still think, we should refactor some parts here (also applies to other TODOs below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reviewed Bill's part II PR for optimization, which enhanced the InternalStreamsBuilder for logical plan generation, I think it makes less sense to refactor these into it.
@@ -112,24 +121,106 @@ public void shutdown() { | |||
} | |||
|
|||
@Test | |||
public void shouldRestoreState() throws ExecutionException, InterruptedException { | |||
public void shouldRestoreStateFromSourceTopic() throws InterruptedException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: simplify InterruptedException, IOException
to Exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree. It can be nicer for the reader to know which checked exceptions we're potentially going to throw.
But it doesn't matter much, so do what you want ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For API calls I agree, but for testing, I think to collapse to Exception
is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For regular API calls (ie, "main code") we should list them. For a test, no exception should ever be thrown, and if, the test fails. Which exception is irrelevant IMHO.
leftTable = builder.table(INPUT_TOPIC_LEFT); | ||
rightTable = builder.table(INPUT_TOPIC_RIGHT); | ||
leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("left").withLoggingDisabled()); | ||
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to disable logging explicitly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We want to not restore the stores in between tests (it is tricker to delete the changelogs since appId changes for each case, so I think this is easier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
@@ -72,11 +72,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil | |||
public <K, V> KTable<K, V> table(final String topic, | |||
final ConsumedInternal<K, V> consumed, | |||
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | |||
// explicitly disable logging for source table materialized stores | |||
materialized.withLoggingDisabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual fix works as following: in the parsing phase, we create the materialized store normally, which means not enforcing disable logging. And then in adjust
we will modify the store, to 1) disable logging, 2) change its changelog topic to the source topic.
As I mentioned in another HOTFIX PR, note that the term "changelog" topic now has two semantics meaning: 1) for restoring, 2) for appending updates to. For example with optimizations turned on, the changelog for 1) will be the source topic, and for 2) it will be none. There are some tricky separate code paths handling those cases now.
// Adjust the generated topology based on the configs. | ||
// Not exposed as public API and should be removed post 2.0 | ||
Topology adjust(final StreamsConfig config) { | ||
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE_AT_20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert the check to avoid NPE: StreamsConfig.OPTIMIZE_AT_20.equals(config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I'm reading this right... If I say optimize:=all
, we won't do this optimization, right? It seems like you want to check for 2.0
or all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
@@ -535,7 +535,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store | |||
*/ | |||
public KafkaStreams(final Topology topology, | |||
final Properties props) { | |||
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); | |||
this(topology.adjust(new StreamsConfig(props)).internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we call adjust
in each constructor? Doing it once in the private constructor should be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be missing something, but the private constructor takes an InternalTopologyBuilder
instance and adjust
needs to execute to enable source topic reuse before returning the InternalTopologyBuilder
.
But maybe we could shift the adjust
method to the InternalTopologyBuilder
thus only need to call once in the private constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I originally thought is exactly as @bbejeck explained. I think changing to InternalTopologyBuilder
would be a better idea, will try it out.
// update store map to disable logging for this store | ||
storeBuilder.withLoggingDisabled(); | ||
internalTopologyBuilder.addStateStore(storeBuilder, true); | ||
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding: we are overwriting the changelog-topic name with the source-topic name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
|
||
// update store map to disable logging for this store | ||
storeBuilder.withLoggingDisabled(); | ||
internalTopologyBuilder.addStateStore(storeBuilder, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to add the store again? Isn't storeBuilder
the same object that is already present and we overwrite it with itself here? storeBuilder.withLoggingDisabled()
is a mutable call, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not.. note that addStateStore
directly constructs the StoreFactory
and put that into the map stateFactories
. And StoreFactory
is immutable. So I have to overwrite the key with a new StoreFactory in this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
@@ -121,6 +121,9 @@ | |||
|
|||
private Map<Integer, Set<String>> nodeGroups = null; | |||
|
|||
// this is only temporary for 2.0 and should be removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe mark it TODO so we can search for it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acj,
I left a few comments, and I was also curious about several of @mjsax 's concerns, but assuming you hash those out, I'm +1 overall. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I agree with the approach. Just left a couple of minor comments. I'll probably take another pass over the tests later to confirm my understanding of the offset restore fix
@@ -535,7 +535,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store | |||
*/ | |||
public KafkaStreams(final Topology topology, | |||
final Properties props) { | |||
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); | |||
this(topology.adjust(new StreamsConfig(props)).internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be missing something, but the private constructor takes an InternalTopologyBuilder
instance and adjust
needs to execute to enable source topic reuse before returning the InternalTopologyBuilder
.
But maybe we could shift the adjust
method to the InternalTopologyBuilder
thus only need to call once in the private constructor.
/** | ||
* Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization | ||
*/ | ||
public static final String OPTIMIZE_AT_20 = "2.0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Since this our first optimization, "all"
should be sufficient. But this raises another good point if we plan to track optimization versions are we setting ourselves up for a KIP for each optimization release?
@@ -112,24 +121,106 @@ public void shutdown() { | |||
} | |||
|
|||
@Test | |||
public void shouldRestoreState() throws ExecutionException, InterruptedException { | |||
public void shouldRestoreStateFromSourceTopic() throws InterruptedException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For API calls I agree, but for testing, I think to collapse to Exception
is ok.
@@ -167,7 +167,7 @@ public String toString(final String indent) { | |||
return sb.toString(); | |||
} | |||
|
|||
protected Map<TopicPartition, Long> recordCollectorOffsets() { | |||
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit renaming.
return recordCollector.offsets(); | ||
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() { | ||
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets(); | ||
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
…reuse-source-based-on-config
@guozhangwang Build failed with checkstyle error |
Found the issue with failed system test, filed another PR: #5170 |
return recordCollector.offsets(); | ||
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() { | ||
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets(); | ||
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
|
||
assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog"))); | ||
|
||
assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this check to shouldReuseSourceTopicAsChangelogsWithOptimization20
, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
final int offsetLimitDelta = 1000; | ||
final int offsetCheckpointed = 1000; | ||
createStateForRestoration(INPUT_STREAM); | ||
setCommittedOffset(INPUT_STREAM, offsetLimitDelta); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use a delta, here, instead of the actual offset we want to commit (ie, 4000) ? Might be simpler to understand the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that in line 150, I need to check exactly how many data were processed, which should be the diff between committed offset to log end offset.
However, for log end offset it is not always numKeys / 2 (num.partitions). In my tests I saw for example 5005 and 4995. So I have to use the limit to say commit at the log end offset minus that delta
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
final CountDownLatch shutdownLatch = new CountDownLatch(1); | ||
|
||
builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store")) | ||
.toStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indention should only be 4 spaces?
final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM, 0), | ||
new TopicPartition(INPUT_STREAM, 1)); | ||
final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0), | ||
new TopicPartition(topic, 1)); | ||
|
||
consumer.assign(partitions); | ||
consumer.seekToEnd(partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we rewrite to use "absolute position" instead of delta, we can remove this.
…reuse-source-based-on-config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch.
Thanks for the reviews, I've merged it to trunk. Will provide a separate PR for upgrade docs, and have separate PRs for older branches. |
This PR actually contains two changes: 1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic. 2. fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the ackedOffset from the producer, plus the consumed offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens. 3. added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct. Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
Cherry-picked to 2.0 |
This PR actually contains two changes: 1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic. 2. fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the ackedOffset from the producer, plus the consumed offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens. 3. added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct. Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
#6021) Updating the documentation for table operation because I believe it is incorrect. In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization. Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
#6021) Updating the documentation for table operation because I believe it is incorrect. In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization. Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
apache#6021) Updating the documentation for table operation because I believe it is incorrect. In PR apache#5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization. Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
This PR actually contains two changes:
leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic.
fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the
ackedOffset
from the producer, plus theconsumed
offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens.added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct.
Committer Checklist (excluded from commit message)