Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7021: Reuse source based on config #5163

Merged

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Jun 7, 2018

This PR actually contains two changes:

  1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic.

  2. fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the ackedOffset from the producer, plus the consumed offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens.

  3. added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Contributor Author

@mjsax mjsax added the streams label Jun 8, 2018
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of comments/question.

I think I understand to overall idea and it makes sense -- need think about some details a little more to really fully understand the change.

/**
* Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization
*/
public static final String OPTIMIZE_AT_20 = "2.0";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Should we not use "all" ? Also, this would be public API change -- we can still update the corresponding KIP, but I don't think we need this, do we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Since this our first optimization, "all" should be sufficient. But this raises another good point if we plan to track optimization versions are we setting ourselves up for a KIP for each optimization release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Sounds good. I'm reverting to use all.
  2. Moving forward, not needing one KIP for each release. We can use a single KIP adding the mechanism, and if two versions are compatible (say if no new incompatible optimizations introduced from X to Y), we can just document it in upgrade section.

final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE_AT_20);

if (enableOptimization20) {
for (Map.Entry<StoreBuilder, String> entry : internalTopologyBuilder.storeToSourceChangelogTopic.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}

// TODO: this method is only used by DSL and we might want to refactor this part
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove those comments? I still think, we should refactor some parts here (also applies to other TODOs below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewed Bill's part II PR for optimization, which enhanced the InternalStreamsBuilder for logical plan generation, I think it makes less sense to refactor these into it.

@@ -112,24 +121,106 @@ public void shutdown() {
}

@Test
public void shouldRestoreState() throws ExecutionException, InterruptedException {
public void shouldRestoreStateFromSourceTopic() throws InterruptedException, IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simplify InterruptedException, IOException to Exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. It can be nicer for the reader to know which checked exceptions we're potentially going to throw.

But it doesn't matter much, so do what you want ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For API calls I agree, but for testing, I think to collapse to Exception is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For regular API calls (ie, "main code") we should list them. For a test, no exception should ever be thrown, and if, the test fails. Which exception is irrelevant IMHO.

leftTable = builder.table(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT);
leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("left").withLoggingDisabled());
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to disable logging explicitly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We want to not restore the stores in between tests (it is tricker to delete the changelogs since appId changes for each case, so I think this is easier).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -72,11 +72,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
// explicitly disable logging for source table materialized stores
materialized.withLoggingDisabled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual fix works as following: in the parsing phase, we create the materialized store normally, which means not enforcing disable logging. And then in adjust we will modify the store, to 1) disable logging, 2) change its changelog topic to the source topic.

As I mentioned in another HOTFIX PR, note that the term "changelog" topic now has two semantics meaning: 1) for restoring, 2) for appending updates to. For example with optimizations turned on, the changelog for 1) will be the source topic, and for 2) it will be none. There are some tricky separate code paths handling those cases now.

// Adjust the generated topology based on the configs.
// Not exposed as public API and should be removed post 2.0
Topology adjust(final StreamsConfig config) {
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE_AT_20);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert the check to avoid NPE: StreamsConfig.OPTIMIZE_AT_20.equals(config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm reading this right... If I say optimize:=all, we won't do this optimization, right? It seems like you want to check for 2.0 or all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@@ -535,7 +535,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
*/
public KafkaStreams(final Topology topology,
final Properties props) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
this(topology.adjust(new StreamsConfig(props)).internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we call adjust in each constructor? Doing it once in the private constructor should be sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be missing something, but the private constructor takes an InternalTopologyBuilder instance and adjust needs to execute to enable source topic reuse before returning the InternalTopologyBuilder.
But maybe we could shift the adjust method to the InternalTopologyBuilder thus only need to call once in the private constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I originally thought is exactly as @bbejeck explained. I think changing to InternalTopologyBuilder would be a better idea, will try it out.

// update store map to disable logging for this store
storeBuilder.withLoggingDisabled();
internalTopologyBuilder.addStateStore(storeBuilder, true);
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding: we are overwriting the changelog-topic name with the source-topic name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


// update store map to disable logging for this store
storeBuilder.withLoggingDisabled();
internalTopologyBuilder.addStateStore(storeBuilder, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to add the store again? Isn't storeBuilder the same object that is already present and we overwrite it with itself here? storeBuilder.withLoggingDisabled() is a mutable call, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not.. note that addStateStore directly constructs the StoreFactory and put that into the map stateFactories. And StoreFactory is immutable. So I have to overwrite the key with a new StoreFactory in this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -121,6 +121,9 @@

private Map<Integer, Set<String>> nodeGroups = null;

// this is only temporary for 2.0 and should be removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe mark it TODO so we can search for it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acj,

@vvcephei
Copy link
Contributor

vvcephei commented Jun 8, 2018

I left a few comments, and I was also curious about several of @mjsax 's concerns, but assuming you hash those out, I'm +1 overall.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I agree with the approach. Just left a couple of minor comments. I'll probably take another pass over the tests later to confirm my understanding of the offset restore fix

@@ -535,7 +535,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
*/
public KafkaStreams(final Topology topology,
final Properties props) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
this(topology.adjust(new StreamsConfig(props)).internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be missing something, but the private constructor takes an InternalTopologyBuilder instance and adjust needs to execute to enable source topic reuse before returning the InternalTopologyBuilder.
But maybe we could shift the adjust method to the InternalTopologyBuilder thus only need to call once in the private constructor.

/**
* Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization
*/
public static final String OPTIMIZE_AT_20 = "2.0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Since this our first optimization, "all" should be sufficient. But this raises another good point if we plan to track optimization versions are we setting ourselves up for a KIP for each optimization release?

@@ -112,24 +121,106 @@ public void shutdown() {
}

@Test
public void shouldRestoreState() throws ExecutionException, InterruptedException {
public void shouldRestoreStateFromSourceTopic() throws InterruptedException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For API calls I agree, but for testing, I think to collapse to Exception is ok.

@@ -167,7 +167,7 @@ public String toString(final String indent) {
return sb.toString();
}

protected Map<TopicPartition, Long> recordCollectorOffsets() {
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit renaming.

return recordCollector.offsets();
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

@guozhangwang guozhangwang changed the title HOTFIX: Reuse source based on config KAFKA-7021: Reuse source based on config Jun 8, 2018
@mjsax
Copy link
Member

mjsax commented Jun 8, 2018

@guozhangwang Build failed with checkstyle error

@guozhangwang
Copy link
Contributor Author

Found the issue with failed system test, filed another PR: #5170

return recordCollector.offsets();
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final


assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog")));

assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this check to shouldReuseSourceTopicAsChangelogsWithOptimization20, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

final int offsetLimitDelta = 1000;
final int offsetCheckpointed = 1000;
createStateForRestoration(INPUT_STREAM);
setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use a delta, here, instead of the actual offset we want to commit (ie, 4000) ? Might be simpler to understand the test?

Copy link
Contributor Author

@guozhangwang guozhangwang Jun 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that in line 150, I need to check exactly how many data were processed, which should be the diff between committed offset to log end offset.

However, for log end offset it is not always numKeys / 2 (num.partitions). In my tests I saw for example 5005 and 4995. So I have to use the limit to say commit at the log end offset minus that delta.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

final CountDownLatch shutdownLatch = new CountDownLatch(1);

builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
.toStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention should only be 4 spaces?

final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM, 0),
new TopicPartition(INPUT_STREAM, 1));
final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0),
new TopicPartition(topic, 1));

consumer.assign(partitions);
consumer.seekToEnd(partitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we rewrite to use "absolute position" instead of delta, we can remove this.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch.

@guozhangwang guozhangwang merged commit d98ec33 into apache:trunk Jun 11, 2018
@guozhangwang
Copy link
Contributor Author

Thanks for the reviews, I've merged it to trunk.

Will provide a separate PR for upgrade docs, and have separate PRs for older branches.

@guozhangwang guozhangwang deleted the KHotfix-reuse-source-based-on-config branch June 11, 2018 23:09
guozhangwang added a commit that referenced this pull request Jun 12, 2018
This PR actually contains two changes:

1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic.

2. fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the ackedOffset from the producer, plus the consumed offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens.

3. added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct.

Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
@guozhangwang
Copy link
Contributor Author

Cherry-picked to 2.0

ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
This PR actually contains two changes:

1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to reuse the source topic.

2. fixed a long dangling bug that whenever source topic is reused as changelog topic, write the checkpoint file for the consumed offset, this is done by union the ackedOffset from the producer, plus the consumed offset from the consumer, note we will priori ackedOffset since the same topic may show up in both (think about repartition topic), by doing this the consumed offset from source topics can be treated as checkpointed offset when reuse happens.

3. added a few unit and integration tests with / wo the reusing, and make sure the restoration, standby task, and internal topic creation behaviors are all correct.

Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
guozhangwang pushed a commit that referenced this pull request Dec 14, 2018
#6021)

Updating the documentation for table operation because I believe it is incorrect.

In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization.

Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
guozhangwang pushed a commit that referenced this pull request Dec 14, 2018
#6021)

Updating the documentation for table operation because I believe it is incorrect.

In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization.

Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
apache#6021)

Updating the documentation for table operation because I believe it is incorrect.

In PR apache#5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization.

Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants