Skip to content

Commit

Permalink
MINOR: internal config objects should not be logged (apache#5389)
Browse files Browse the repository at this point in the history
Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Jul 25, 2018
1 parent c83ecf4 commit 487b954
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
}

public AdminClientConfig(Map<?, ?> props) {
super(CONFIG, props);
this(props, false);
}

protected AdminClientConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}

public static Set<String> configNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,12 @@ public static ConfigDef configDef() {
* @param props properties that specify Kafka Streams and internal consumer/producer configuration
*/
public StreamsConfig(final Map<?, ?> props) {
super(CONFIG, props);
this(props, true);
}

protected StreamsConfig(final Map<?, ?> props,
final boolean doLog) {
super(CONFIG, props, doLog);
eosEnabled = EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -44,6 +43,12 @@ public class InternalTopicManager {
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";

private static final class InternalAdminClientConfig extends AdminClientConfig {
private InternalAdminClientConfig(final Map<?, ?> props) {
super(props, false);
}
}

private final Logger log;
private final long windowChangeLogAdditionalRetention;
private final Map<String, String> defaultTopicConfigs = new HashMap<>();
Expand All @@ -57,12 +62,12 @@ public InternalTopicManager(final AdminClient adminClient,
final StreamsConfig streamsConfig) {
this.adminClient = adminClient;

LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
log = logContext.logger(getClass());

replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
retries = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);

log.debug("Configs:" + Utils.NL,
"\t{} = {}" + Utils.NL,
Expand Down Expand Up @@ -146,12 +151,7 @@ public void makeReady(final Map<String, InternalTopicConfig> topics) {
}

if (retry) {
final Iterator<NewTopic> it = newTopics.iterator();
while (it.hasNext()) {
if (createTopicNames.contains(it.next().name())) {
it.remove();
}
}
newTopics.removeIf(newTopic -> createTopicNames.contains(newTopic.name()));

continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
private int minReceivedMetadataVersion = UNKNOWN;
protected Set<Integer> supportedVersions = new HashSet<>();

private Logger log;
Expand Down Expand Up @@ -194,17 +193,19 @@ public String toString() {
}
}

protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@Override
public int compare(final TopicPartition p1,
final TopicPartition p2) {
final int result = p1.topic().compareTo(p2.topic());
private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) {
super(props, false);
}
}

if (result != 0) {
return result;
} else {
return Integer.compare(p1.partition(), p2.partition());
}
protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = (p1, p2) -> {
final int result = p1.topic().compareTo(p2.topic());

if (result != 0) {
return result;
} else {
return Integer.compare(p1.partition(), p2.partition());
}
};

Expand Down Expand Up @@ -236,7 +237,7 @@ protected TaskManager taskManger() {
*/
@Override
public void configure(final Map<String, ?> configs) {
final StreamsConfig streamsConfig = new StreamsConfig(configs);
final StreamsConfig streamsConfig = new InternalStreamsConfig(configs);

// Setting the logger with the passed in client thread name
logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString(CommonClientConfigs.CLIENT_ID_CONFIG));
Expand Down Expand Up @@ -394,7 +395,8 @@ public Map<String, Assignment> assign(final Cluster metadata,
final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
final Set<String> futureConsumers = new HashSet<>();

minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;

supportedVersions.clear();
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
Expand Down

0 comments on commit 487b954

Please sign in to comment.