Skip to content

Commit

Permalink
KAFKA-6729: Follow up; disable logging for source KTable. (apache#5038)
Browse files Browse the repository at this point in the history
Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
guozhangwang authored May 20, 2018
1 parent f65f3a8 commit 9752cca
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInt
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
// explicitly disable logging for source table materialized stores
materialized.withLoggingDisabled();

final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
.materialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public class InternalTopologyBuilder {

private Map<Integer, Set<String>> nodeGroups = null;

interface StateStoreFactory {
public interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
StateStore build();
Expand Down Expand Up @@ -1799,4 +1799,8 @@ public void updateSubscribedTopics(final Set<String> topics, final String logPre
public synchronized Set<String> getSourceTopicNames() {
return sourceTopicNames;
}

public synchronized Map<String, StateStoreFactory> getStateStores() {
return stateFactories;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ public void shouldReuseSourceTopicAsChangelogs() {

final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());

assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("topic")));
assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));

assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(false));

assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), equalTo(true));
}

@Test(expected = TopologyException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,6 @@ public Object apply(final Object value1, final Object value2) {
expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition", 4);
expectedCreatedInternalTopics.put("topic3", 4); // the source topic is reused as changelog topics

// check if all internal topics were created as expected
assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));
Expand Down

0 comments on commit 9752cca

Please sign in to comment.