-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7021: Reuse source based on config #5163
Changes from all commits
8a15f25
6fcb570
6d2efab
bf776a2
b274c88
f606736
549cc40
1007cb0
1b3ccbc
478cad9
c9d80b9
ca40f17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -167,7 +167,7 @@ public String toString(final String indent) { | |
return sb.toString(); | ||
} | ||
|
||
protected Map<TopicPartition, Long> recordCollectorOffsets() { | ||
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit renaming. |
||
return Collections.emptyMap(); | ||
} | ||
|
||
|
@@ -242,7 +242,7 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep | |
ProcessorStateException exception = null; | ||
log.trace("Closing state manager"); | ||
try { | ||
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); | ||
stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); | ||
} catch (final ProcessorStateException e) { | ||
exception = e; | ||
} finally { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
import org.apache.kafka.common.utils.Utils; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.Topology; | ||
import org.apache.kafka.streams.errors.TopologyException; | ||
import org.apache.kafka.streams.processor.ProcessorSupplier; | ||
|
@@ -121,6 +122,9 @@ public class InternalTopologyBuilder { | |
|
||
private Map<Integer, Set<String>> nodeGroups = null; | ||
|
||
// TODO: this is only temporary for 2.0 and should be removed | ||
public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>(); | ||
|
||
public interface StateStoreFactory { | ||
Set<String> users(); | ||
boolean loggingEnabled(); | ||
|
@@ -498,8 +502,14 @@ public final void addProcessor(final String name, | |
|
||
public final void addStateStore(final StoreBuilder storeBuilder, | ||
final String... processorNames) { | ||
addStateStore(storeBuilder, false, processorNames); | ||
} | ||
|
||
public final void addStateStore(final StoreBuilder storeBuilder, | ||
final boolean allowOverride, | ||
final String... processorNames) { | ||
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); | ||
if (stateFactories.containsKey(storeBuilder.name())) { | ||
if (!allowOverride && stateFactories.containsKey(storeBuilder.name())) { | ||
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added."); | ||
} | ||
|
||
|
@@ -566,16 +576,22 @@ public final void connectProcessorAndStateStores(final String processorName, | |
} | ||
} | ||
|
||
// TODO: this method is only used by DSL and we might want to refactor this part | ||
public final void connectSourceStoreAndTopic(final String sourceStoreName, | ||
final String topic) { | ||
final String topic) { | ||
if (storeToChangelogTopic.containsKey(sourceStoreName)) { | ||
throw new TopologyException("Source store " + sourceStoreName + " is already added."); | ||
} | ||
storeToChangelogTopic.put(sourceStoreName, topic); | ||
} | ||
|
||
// TODO: this method is only used by DSL and we might want to refactor this part | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you remove those comments? I still think, we should refactor some parts here (also applies to other TODOs below). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After reviewed Bill's part II PR for optimization, which enhanced the InternalStreamsBuilder for logical plan generation, I think it makes less sense to refactor these into it. |
||
public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder, | ||
final String topic) { | ||
if (storeToSourceChangelogTopic.containsKey(storeBuilder)) { | ||
throw new TopologyException("Source store " + storeBuilder.name() + " is already used."); | ||
} | ||
storeToSourceChangelogTopic.put(storeBuilder, topic); | ||
} | ||
|
||
public final void connectProcessors(final String... processorNames) { | ||
if (processorNames.length < 2) { | ||
throw new TopologyException("At least two processors need to participate in the connection."); | ||
|
@@ -591,13 +607,11 @@ public final void connectProcessors(final String... processorNames) { | |
nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length)); | ||
} | ||
|
||
// TODO: this method is only used by DSL and we might want to refactor this part | ||
public final void addInternalTopic(final String topicName) { | ||
Objects.requireNonNull(topicName, "topicName can't be null"); | ||
internalTopicNames.add(topicName); | ||
} | ||
|
||
// TODO: this method is only used by DSL and we might want to refactor this part | ||
public final void copartitionSources(final Collection<String> sourceNodes) { | ||
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); | ||
} | ||
|
@@ -1059,6 +1073,24 @@ public synchronized Map<Integer, TopicsInfo> topicGroups() { | |
return Collections.unmodifiableMap(topicGroups); | ||
} | ||
|
||
// Adjust the generated topology based on the configs. | ||
// Not exposed as public API and should be removed post 2.0 | ||
public void adjust(final StreamsConfig config) { | ||
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE); | ||
|
||
if (enableOptimization20) { | ||
for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) { | ||
final StoreBuilder storeBuilder = entry.getKey(); | ||
final String topicName = entry.getValue(); | ||
|
||
// update store map to disable logging for this store | ||
storeBuilder.withLoggingDisabled(); | ||
addStateStore(storeBuilder, true); | ||
connectSourceStoreAndTopic(storeBuilder.name(), topicName); | ||
} | ||
} | ||
} | ||
|
||
private void setRegexMatchedTopicsToSourceNodes() { | ||
if (subscriptionUpdates.hasUpdates()) { | ||
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -273,18 +273,41 @@ public void shouldUseDefaultNodeAndStoreNames() { | |
} | ||
|
||
@Test | ||
public void shouldReuseSourceTopicAsChangelogs() { | ||
public void shouldReuseSourceTopicAsChangelogsWithOptimization20() { | ||
final String topic = "topic"; | ||
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")); | ||
final Topology topology = builder.build(); | ||
final Properties props = StreamsTestUtils.minimalStreamsConfig(); | ||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | ||
|
||
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); | ||
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); | ||
internalTopologyBuilder.adjust(new StreamsConfig(props)); | ||
|
||
assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic"))); | ||
|
||
assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store"))); | ||
|
||
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(false)); | ||
|
||
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), equalTo(true)); | ||
} | ||
|
||
@Test | ||
public void shouldNotReuseSourceTopicAsChangelogsByDefault() { | ||
final String topic = "topic"; | ||
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")); | ||
|
||
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); | ||
internalTopologyBuilder.setApplicationId("appId"); | ||
|
||
assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog"))); | ||
|
||
assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store"))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add this check to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack. |
||
|
||
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(true)); | ||
|
||
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("appId-store-changelog"))); | ||
} | ||
|
||
@Test(expected = TopologyException.class) | ||
public void shouldThrowExceptionWhenNoTopicPresent() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual fix works as following: in the parsing phase, we create the materialized store normally, which means not enforcing disable logging. And then in
adjust
we will modify the store, to 1) disable logging, 2) change its changelog topic to the source topic.As I mentioned in another HOTFIX PR, note that the term "changelog" topic now has two semantics meaning: 1) for restoring, 2) for appending updates to. For example with optimizations turned on, the changelog for 1) will be the source topic, and for 2) it will be none. There are some tricky separate code paths handling those cases now.