Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7021: Reuse source based on config #5163

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public KafkaStreams(final Topology topology,
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
this(topology, config, new DefaultKafkaClientSupplier());
}

/**
Expand Down Expand Up @@ -635,6 +635,10 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
this.config = config;
this.time = time;

// adjust the topology if optimization is turned on.
// TODO: to be removed post 2.0
internalTopologyBuilder.adjust(config);

// The application ID is a required config and hence should always have value
processId = UUID.randomUUID();
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,10 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
final ConsumedInternal<K, V> consumedInternal =
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde()));

return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
materializedInternal);
return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,5 +776,4 @@ public synchronized Topology connectProcessorAndStateStores(final String process
public synchronized TopologyDescription describe() {
return internalTopologyBuilder.describe();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInt
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
// explicitly disable logging for source table materialized stores
materialized.withLoggingDisabled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual fix works as following: in the parsing phase, we create the materialized store normally, which means not enforcing disable logging. And then in adjust we will modify the store, to 1) disable logging, 2) change its changelog topic to the source topic.

As I mentioned in another HOTFIX PR, note that the term "changelog" topic now has two semantics meaning: 1) for restoring, 2) for appending updates to. For example with optimizations turned on, the changelog for 1) will be the source topic, and for 2) it will be none. There are some tricky separate code paths handling those cases now.


final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
.materialize();
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();

final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KTableImpl.SOURCE_NAME);
Expand All @@ -88,7 +84,7 @@ public <K, V> KTable<K, V> table(final String topic,
name);

internalTopologyBuilder.addStateStore(storeBuilder, name);
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);

return kTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public String toString(final String indent) {
return sb.toString();
}

protected Map<TopicPartition, Long> recordCollectorOffsets() {
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit renaming.

return Collections.emptyMap();
}

Expand Down Expand Up @@ -242,7 +242,7 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep
ProcessorStateException exception = null;
log.trace("Closing state manager");
try {
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
} catch (final ProcessorStateException e) {
exception = e;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
Expand Down Expand Up @@ -121,6 +122,9 @@ public class InternalTopologyBuilder {

private Map<Integer, Set<String>> nodeGroups = null;

// TODO: this is only temporary for 2.0 and should be removed
public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();

public interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
Expand Down Expand Up @@ -498,8 +502,14 @@ public final void addProcessor(final String name,

public final void addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
addStateStore(storeBuilder, false, processorNames);
}

public final void addStateStore(final StoreBuilder storeBuilder,
final boolean allowOverride,
final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
if (stateFactories.containsKey(storeBuilder.name())) {
if (!allowOverride && stateFactories.containsKey(storeBuilder.name())) {
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
}

Expand Down Expand Up @@ -566,16 +576,22 @@ public final void connectProcessorAndStateStores(final String processorName,
}
}

// TODO: this method is only used by DSL and we might want to refactor this part
public final void connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}

// TODO: this method is only used by DSL and we might want to refactor this part
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove those comments? I still think, we should refactor some parts here (also applies to other TODOs below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewed Bill's part II PR for optimization, which enhanced the InternalStreamsBuilder for logical plan generation, I think it makes less sense to refactor these into it.

public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
final String topic) {
if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
throw new TopologyException("Source store " + storeBuilder.name() + " is already used.");
}
storeToSourceChangelogTopic.put(storeBuilder, topic);
}

public final void connectProcessors(final String... processorNames) {
if (processorNames.length < 2) {
throw new TopologyException("At least two processors need to participate in the connection.");
Expand All @@ -591,13 +607,11 @@ public final void connectProcessors(final String... processorNames) {
nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
}

// TODO: this method is only used by DSL and we might want to refactor this part
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
}

// TODO: this method is only used by DSL and we might want to refactor this part
public final void copartitionSources(final Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
Expand Down Expand Up @@ -1059,6 +1073,24 @@ public synchronized Map<Integer, TopicsInfo> topicGroups() {
return Collections.unmodifiableMap(topicGroups);
}

// Adjust the generated topology based on the configs.
// Not exposed as public API and should be removed post 2.0
public void adjust(final StreamsConfig config) {
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);

if (enableOptimization20) {
for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
final StoreBuilder storeBuilder = entry.getKey();
final String topicName = entry.getValue();

// update store map to disable logging for this store
storeBuilder.withLoggingDisabled();
addStateStore(storeBuilder, true);
connectSourceStoreAndTopic(storeBuilder.name(), topicName);
}
}
}

private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ProcessorStateManager extends AbstractStateManager {
private final boolean isStandby;
private final ChangelogReader changelogReader;
private final Map<TopicPartition, Long> offsetLimits;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> standbyRestoredOffsets;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
private final Map<String, String> storeToChangelogTopic;
private final List<TopicPartition> changelogPartitions = new ArrayList<>();
Expand Down Expand Up @@ -79,7 +79,7 @@ public ProcessorStateManager(final TaskId taskId,
partitionForTopic.put(source.topic(), source);
}
offsetLimits = new HashMap<>();
restoredOffsets = new HashMap<>();
standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
Expand Down Expand Up @@ -212,7 +212,7 @@ List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(final TopicPartition st
}

// record the restored offset for its change log partition
restoredOffsets.put(storePartition, lastOffset + 1);
standbyRestoredOffsets.put(storePartition, lastOffset + 1);

return remainingRecords;
}
Expand Down Expand Up @@ -293,20 +293,20 @@ public void close(final Map<TopicPartition, Long> ackedOffsets) throws Processor

// write the checkpoint
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
checkpointableOffsets.putAll(changelogReader.restoredOffsets());
public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) {
this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
for (final StateStore store : stores.values()) {
final String storeName = store.name();
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic = storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
if (ackedOffsets.containsKey(topicPartition)) {
if (checkpointableOffsets.containsKey(topicPartition)) {
// store the last offset + 1 (the log position after restoration)
checkpointableOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1);
} else if (restoredOffsets.containsKey(topicPartition)) {
checkpointableOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
this.checkpointableOffsets.put(topicPartition, checkpointableOffsets.get(topicPartition) + 1);
} else if (standbyRestoredOffsets.containsKey(topicPartition)) {
this.checkpointableOffsets.put(topicPartition, standbyRestoredOffsets.get(topicPartition));
}
}
}
Expand All @@ -315,9 +315,9 @@ public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
}

log.trace("Writing checkpoint: {}", checkpointableOffsets);
log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
try {
checkpoint.write(checkpointableOffsets);
checkpoint.write(this.checkpointableOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public StateDirectory(final StreamsConfig config,
* @return directory for the {@link TaskId}
* @throws ProcessorStateException if the task directory does not exists and could not be created
*/
File directoryForTask(final TaskId taskId) {
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void commit(final boolean startNewTransaction) {
flushState();

if (!eosEnabled) {
stateMgr.checkpoint(recordCollectorOffsets());
stateMgr.checkpoint(activeTaskCheckpointableOffsets());
}

commitOffsets(startNewTransaction);
Expand All @@ -391,8 +391,13 @@ void commit(final boolean startNewTransaction) {
}

@Override
protected Map<TopicPartition, Long> recordCollectorOffsets() {
return recordCollector.offsets();
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
}

return checkpointableOffsets;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,41 @@ public void shouldUseDefaultNodeAndStoreNames() {
}

@Test
public void shouldReuseSourceTopicAsChangelogs() {
public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
final Topology topology = builder.build();
final Properties props = StreamsTestUtils.minimalStreamsConfig();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.adjust(new StreamsConfig(props));

assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic")));

assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));

assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(false));

assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), equalTo(true));
}

@Test
public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));

final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId("appId");

assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog")));

assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this check to shouldReuseSourceTopicAsChangelogsWithOptimization20, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.


assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(true));

assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("appId-store-changelog")));
}

@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() {
Expand Down
Loading