-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Remove deprecated ZkUtils usage from EmbeddedKafkaCluster #5324
Conversation
@mjsax Please take a look at this minor change. |
Can we use |
Yes, We can use AdminClient. Currently Streams tests uses ZKUtils/KafkaZkClient/AdminZkClient classes. If there are no concerns, I can migrate these calls to AdminClient. |
Yes, I think it would be best to avoid using |
retest this please |
@mjsax @guozhangwang Call for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Couple of comments. Mostly nits.
for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) { | ||
if (topicConfig.getKey().equals(changelog)) { | ||
return topicConfig.getValue(); | ||
try (AdminClient adminClient = createAdminClient()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
We apply final
where ever possible in Streams code base -- for all variables, parameters etc. -- I won't comment on the other missing final
, please add them thought the whole PR. There are 5 missing final
in the next lines for example. Thx.
try { | ||
Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get(); | ||
Properties properties = new Properties(); | ||
for (ConfigEntry configEntry: config.entries()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add space configEntry :
-- wondering why this is not detected by checkstyle...
Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get(); | ||
Properties properties = new Properties(); | ||
for (ConfigEntry configEntry: config.entries()) { | ||
if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add { }
-- we use curly braces for all blocks
nit: remove double space after ==
try (AdminClient adminClient = createAdminClient()) { | ||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog); | ||
try { | ||
Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove double space after =
final Set<String> allTopics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get(); | ||
return !allTopics.removeAll(deletedTopics); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is re-throwing the best approach? We could also return false
and time-out?
final Set<String> allTopics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get(); | ||
return allTopics.equals(remainingTopics); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
newTopic.configs((Map) topicConfig); | ||
|
||
int tries = 0; | ||
int maxTries = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we configure AdminClient
with internal retries instead?
private KafkaZkClient createZkClient() { | ||
return KafkaZkClient.apply(zookeeperConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS, | ||
DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType"); | ||
private AdminClient createAdminClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be redundant with the method in EmbeddedKafkaCluster.java
-- might be better to make this method public and call within EmbeddedKafkaCluster.java
?
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Some more nits.
try (final AdminClient adminClient = createAdminClient()) { | ||
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog); | ||
try { | ||
final Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double space after =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, adminClient.describeConfigs
does not read from ZK but from broker cache, and hence maybe subject to race conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. ran multiple times with out any issues. not sure how to handle this? Can we add small Thread.sleep to method? or some kind of waitForCondition implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a bit more, using adminClient.describeConfigs
here would be okay since we never dynamically change the config, we just want to test that the configs specified when the topic was created is expected. So if the topic creation is not yet propagated, it will fail and re-try on admin client anyways. So nvm.
} | ||
return properties; | ||
} catch (InterruptedException | ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) | ||
*/ | ||
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException { | ||
Set<String> topics = brokers[0].listTopics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned using listTopics
than using JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava())
as only the latter can get the source-of-truth on ZK while the former may be subject to race conditions (e.g. if you create the topic and then call listTopics, it may not be included if the metadata was not propagated yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a one-node cluster, though, right?
/**
- Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
*/
public class EmbeddedKafkaCluster extends ExternalResource {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc is out dated: we actually create a KafkaEmbedded[] brokers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are race conditions, we need to fix them. Remember that Streams itself has to use AdminClient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang Since I was using deleteAllTopicsAndWait only in AbstractJoinIntegrationTest.cleanup(), So I thought of using listTopics(). But yes, its is error prone in delete-check-create scenarios. I changed the the implementation back to using KafkaZkClient.
|
||
try (final AdminClient adminClient = createAdminClient()) { | ||
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
} catch (InterruptedException | ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
} | ||
|
||
public void deleteTopic(final String topic) { | ||
log.debug("Deleting topic { name: {} }", topic); | ||
try (final AdminClient adminClient = createAdminClient()) { | ||
adminClient.deleteTopics(Collections.singletonList(topic)).all().get(); | ||
} catch (InterruptedException | ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
try (final AdminClient adminClient = createAdminClient()) { | ||
adminClient.deleteTopics(Collections.singletonList(topic)).all().get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to add a catch-clause for this case instead?
try {
...
} catch (final UnknownTopicOrPartitionException ignoreAndSwallow) {
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnknownTopicOrPartitionException
is the cause of the actual exception e
, so we cannot just catch it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
public Set<String> listTopics() { | ||
try (final AdminClient adminClient = createAdminClient()) { | ||
return adminClient.listTopics(new ListTopicsOptions()).names().get(); | ||
} catch (InterruptedException | ExecutionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
@@ -93,6 +97,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) throws IO | |||
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); | |||
effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000); | |||
effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); | |||
effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma @guozhangwang I am observing transient zk session timeout error in tests. So I updated the zk session timeout to 10s. let me know if any concern.
kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:225)
at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:95)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1580)
at kafka.server.KafkaServer.kafka$server$KafkaServer$$createZkClient$1(KafkaServer.scala:348)
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:372)
at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
at kafka.utils.TestUtils.createServer(TestUtils.scala)
at org.apache.kafka.streams.integration.utils.KafkaEmbedded.<init>(KafkaEmbedded.java:77)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen this before as well, but can never observe it locally. @omkreddy are you certain it is due to short timeout value (default is 6 seconds so it is already pretty large, or put it another way increasing to 10 from 6 may not safely avoid if it is really a timeout race condition right?)
@mjsax @guozhangwang Thanks for the review. Updated the PR. |
The PR lgtm, other than the comment above. @omkreddy could you investigate a bit more on the root cause of ZK timeout? 6 seconds seems pretty large to me, and if it is indeed still not sufficient, I'm fine with increasing it to even larger number, like 30 seconds. |
@guozhangwang ZK timeout error is transient. My random observation is, Its frequent after recent zk library upgrade. But I am not sure. Currently, In core tests, we configured 10seconds as timeout, So we can also use 10 seconds here. |
@omkreddy Does it go away completely after the timeout bump? |
retest this please |
@ijuma yes. I am not able to reproduce with 10sec. I ran few times. But I am getting timeout error with 6seconds. also recent jenkins builds streams tests are failing with timeout error. As suggested by Guozhang, maybe we can increase timeout to a larger number |
Thanks @omkreddy , I'm going to merge this PR as is, and will continue investigating the flaky test. If it is still due to the ZK timeout I will try to further bump it up in another PR. |
Committer Checklist (excluded from commit message)