Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Remove deprecated ZkUtils usage from EmbeddedKafkaCluster #5324

Merged
merged 5 commits into from
Jul 19, 2018

Conversation

omkreddy
Copy link
Contributor

@omkreddy omkreddy commented Jul 3, 2018

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 3, 2018

@mjsax Please take a look at this minor change.

@mjsax mjsax added the streams label Jul 3, 2018
@ijuma
Copy link
Contributor

ijuma commented Jul 5, 2018

Can we use AdminClient instead?

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 5, 2018

Yes, We can use AdminClient. Currently Streams tests uses ZKUtils/KafkaZkClient/AdminZkClient classes. If there are no concerns, I can migrate these calls to AdminClient.

@ijuma
Copy link
Contributor

ijuma commented Jul 5, 2018

Yes, I think it would be best to avoid using KafkaZkClient outside of core. We should update the checkstyle rules to enforce this once usages have been migrated.

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 6, 2018

retest this please

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 6, 2018

@ijuma @mjsax Updated the PR. Please take a look when you get a chance.

@omkreddy
Copy link
Contributor Author

@mjsax @guozhangwang Call for review.

@guozhangwang
Copy link
Contributor

cc @cmccabe @rajinisivaram

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Couple of comments. Mostly nits.

for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
if (topicConfig.getKey().equals(changelog)) {
return topicConfig.getValue();
try (AdminClient adminClient = createAdminClient()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

We apply final where ever possible in Streams code base -- for all variables, parameters etc. -- I won't comment on the other missing final, please add them thought the whole PR. There are 5 missing final in the next lines for example. Thx.

try {
Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
Properties properties = new Properties();
for (ConfigEntry configEntry: config.entries()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add space configEntry : -- wondering why this is not detected by checkstyle...

Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
Properties properties = new Properties();
for (ConfigEntry configEntry: config.entries()) {
if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add { } -- we use curly braces for all blocks

nit: remove double space after ==

try (AdminClient adminClient = createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog);
try {
Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove double space after =

final Set<String> allTopics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
return !allTopics.removeAll(deletedTopics);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is re-throwing the best approach? We could also return false and time-out?

final Set<String> allTopics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
return allTopics.equals(remainingTopics);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

newTopic.configs((Map) topicConfig);

int tries = 0;
int maxTries = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we configure AdminClient with internal retries instead?

private KafkaZkClient createZkClient() {
return KafkaZkClient.apply(zookeeperConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType");
private AdminClient createAdminClient() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be redundant with the method in EmbeddedKafkaCluster.java -- might be better to make this method public and call within EmbeddedKafkaCluster.java ?

@omkreddy
Copy link
Contributor Author

retest this please

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Some more nits.

try (final AdminClient adminClient = createAdminClient()) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog);
try {
final Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double space after =

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, adminClient.describeConfigs does not read from ZK but from broker cache, and hence maybe subject to race conditions.

Copy link
Contributor Author

@omkreddy omkreddy Jul 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. ran multiple times with out any issues. not sure how to handle this? Can we add small Thread.sleep to method? or some kind of waitForCondition implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, using adminClient.describeConfigs here would be okay since we never dynamically change the config, we just want to test that the configs specified when the topic was created is expected. So if the topic creation is not yet propagated, it will fail and re-try on admin client anyways. So nvm.

}
return properties;
} catch (InterruptedException | ExecutionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
*/
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
Set<String> topics = brokers[0].listTopics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned using listTopics than using JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava()) as only the latter can get the source-of-truth on ZK while the former may be subject to race conditions (e.g. if you create the topic and then call listTopics, it may not be included if the metadata was not propagated yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a one-node cluster, though, right?

/**

  • Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
    */
    public class EmbeddedKafkaCluster extends ExternalResource {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc is out dated: we actually create a KafkaEmbedded[] brokers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are race conditions, we need to fix them. Remember that Streams itself has to use AdminClient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Since I was using deleteAllTopicsAndWait only in AbstractJoinIntegrationTest.cleanup(), So I thought of using listTopics(). But yes, its is error prone in delete-check-create scenarios. I changed the the implementation back to using KafkaZkClient.


try (final AdminClient adminClient = createAdminClient()) {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (InterruptedException | ExecutionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

}

public void deleteTopic(final String topic) {
log.debug("Deleting topic { name: {} }", topic);
try (final AdminClient adminClient = createAdminClient()) {
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
} catch (InterruptedException | ExecutionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

try (final AdminClient adminClient = createAdminClient()) {
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to add a catch-clause for this case instead?

try {
...
} catch (final UnknownTopicOrPartitionException ignoreAndSwallow) {
} catch (final InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownTopicOrPartitionException is the cause of the actual exception e, so we cannot just catch it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

public Set<String> listTopics() {
try (final AdminClient adminClient = createAdminClient()) {
return adminClient.listTopics(new ListTopicsOptions()).names().get();
} catch (InterruptedException | ExecutionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

@@ -93,6 +97,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) throws IO
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 10000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @guozhangwang I am observing transient zk session timeout error in tests. So I updated the zk session timeout to 10s. let me know if any concern.

kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
	at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:225)
	at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
	at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
	at kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
	at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:95)
	at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1580)
	at kafka.server.KafkaServer.kafka$server$KafkaServer$$createZkClient$1(KafkaServer.scala:348)
	at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:372)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
	at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
	at kafka.utils.TestUtils.createServer(TestUtils.scala)
	at org.apache.kafka.streams.integration.utils.KafkaEmbedded.<init>(KafkaEmbedded.java:77)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this before as well, but can never observe it locally. @omkreddy are you certain it is due to short timeout value (default is 6 seconds so it is already pretty large, or put it another way increasing to 10 from 6 may not safely avoid if it is really a timeout race condition right?)

@omkreddy
Copy link
Contributor Author

@mjsax @guozhangwang Thanks for the review. Updated the PR.

@guozhangwang
Copy link
Contributor

The PR lgtm, other than the comment above. @omkreddy could you investigate a bit more on the root cause of ZK timeout? 6 seconds seems pretty large to me, and if it is indeed still not sufficient, I'm fine with increasing it to even larger number, like 30 seconds.

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 19, 2018

@guozhangwang ZK timeout error is transient. My random observation is, Its frequent after recent zk library upgrade. But I am not sure.

Currently, In core tests, we configured 10seconds as timeout, So we can also use 10 seconds here.
I am fine with increasing timeout to 30 seconds.

@ijuma
Copy link
Contributor

ijuma commented Jul 19, 2018

@omkreddy Does it go away completely after the timeout bump?

@omkreddy
Copy link
Contributor Author

retest this please

@omkreddy
Copy link
Contributor Author

omkreddy commented Jul 19, 2018

@ijuma yes. I am not able to reproduce with 10sec. I ran few times. But I am getting timeout error with 6seconds. also recent jenkins builds streams tests are failing with timeout error.

As suggested by Guozhang, maybe we can increase timeout to a larger number

@guozhangwang
Copy link
Contributor

Thanks @omkreddy , I'm going to merge this PR as is, and will continue investigating the flaky test. If it is still due to the ZK timeout I will try to further bump it up in another PR.

@guozhangwang guozhangwang merged commit 96c53e9 into apache:trunk Jul 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants