Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5697: Use nonblocking poll in Streams #5107

Merged
merged 1 commit into from
Jun 8, 2018
Merged

KAFKA-5697: Use nonblocking poll in Streams #5107

merged 1 commit into from
Jun 8, 2018

Conversation

vvcephei
Copy link
Contributor

Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -81,7 +82,7 @@ public void register(final StateRestorer restorer) {

final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(Duration.ofMillis(10));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10ms isn't enough time to do any real work... It seems like it should either be 0ms (async) or >= 100ms like the rest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using 100ms not 10ms, since this call is in a larger while loop in StreamThread.runOnce, and hence I concern doing poll(0) would unnecessarily increase the restoration latency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use config poll.ms? Or, if present, restore.consumer.poll.ms ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering why we sometimes use hardcoded values and other times use configured ones...

@vvcephei
Copy link
Contributor Author

@bbejeck @guozhangwang @mjsax

Please review this when you get a chance.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. I just have a minor comment about using configuration for the timeouts in some spots.
Also left a comment about running system tests.

@@ -198,7 +199,7 @@ public void register(final StateStore store,
int attempts = 0;
while (true) {
try {
partitionInfos = globalConsumer.partitionsFor(sourceTopic);
partitionInfos = globalConsumer.partitionsFor(sourceTopic, Duration.ofSeconds(30));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to consider using a configuration for this and other spots where we have hard-coded values? With KIP-276 merged users can specify different timeouts per consumer type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general, so I'll make that change in the calls to poll, but I don't think we have an appropriate config for this call to partitionsFor. I just looked up the old call, and it does timeout according to a config. It's just that it waits 305s instead of 30s.

I think I'll revert this line and update the test to set that config lower instead.

@@ -262,7 +263,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,

while (offset < highWatermark) {
try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(Duration.ofMillis(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above and others as well.

@@ -334,7 +335,7 @@ private void consumeAndProduce(final String topic) {
consumer.seekToBeginning(partitions);

while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for sanity checking, maybe we should run SimpleBenchmark system tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I definitely think we should run all the tests we can before the 2.0 release.

@@ -254,7 +255,7 @@ private static void ensureStreamsApplicationDown(final String kafka) {
topics.add("repartition");
}
consumer.subscribe(topics);
consumer.poll(0);
consumer.poll(Duration.ofMillis(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same comment as SimpleBenchmark, but thinking now maybe just run all system tests though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is only to make sure we have updated the metadata. So I'm wondering if we can just get rid of this call, and instead on line 262 partitionFor now use request timeout as the duration, which should be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I overlooked this. We should never call poll and ignore the result. I'll look into @guozhangwang's suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just removing this poll call. As I understand it, the partitionsFor call (and all other interactions with Consumer) should be sufficient on their own. If they start timing out, we should just increase the timeout, IMO.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the two @ignore tags on KafkaStreamsTest class? cc @vvcephei

@@ -81,7 +82,7 @@ public void register(final StateRestorer restorer) {

final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(Duration.ofMillis(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using 100ms not 10ms, since this call is in a larger while loop in StreamThread.runOnce, and hence I concern doing poll(0) would unnecessarily increase the restoration latency.

@@ -1078,7 +1079,7 @@ private void maybeUpdateStandbyTasks(final long now) {
}

try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(Duration.ofMillis(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'd suggest doing the opposite: poll(0) since it is during the normal processing, not during restoration; so we can afford to not having some time in a few iterations. Instead, we want to proceed to the next iteration to call the normal-consumer.poll sooner to not be kicked out of the group.

@@ -254,7 +255,7 @@ private static void ensureStreamsApplicationDown(final String kafka) {
topics.add("repartition");
}
consumer.subscribe(topics);
consumer.poll(0);
consumer.poll(Duration.ofMillis(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is only to make sure we have updated the metadata. So I'm wondering if we can just get rid of this call, and instead on line 262 partitionFor now use request timeout as the duration, which should be sufficient?

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

@@ -81,7 +82,7 @@ public void register(final StateRestorer restorer) {

final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(Duration.ofMillis(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use config poll.ms? Or, if present, restore.consumer.poll.ms ?

streams.close();
try {
streams.start();
} catch (final StreamsException expected) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we expect this exception to happen, we should so a fail after stream.start()?

If we are afraid it might not timeout, but hang, should we put a timeout annotation to the test?

Copy link
Contributor Author

@vvcephei vvcephei Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was unaware of that annotation. I thought they all just timeout and fail after 30s.

I didn't add the fail because it's not important to verify that we get the exception. We are only verifying that the call doesn't hang.

I do think that we'll get the exception every time, so I could add the fail, I just chose to test the behavior we want (non-hanging), not the tightest bound on the behavior we observe (the exception). In other words, we could make a change in the future that doesn't throw the exception but instead exits normally, and that would be fine with this test. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should test for the exception -- if we change the behavior intentionally, we should remove the fail as well as the try-catch instead of allowing both behavior to pass (it's an either or form my point of view). IMHO, tests should be on an as narrow code path as possible: if we change behavior and a test fails, we are forces to reflect on the change, what is good as it guards against undesired behavior changes...

@guozhangwang
Copy link
Contributor

The code change LGTM. @vvcephei have you tried to run the system tests? If yes could you paste its link here? If the system tests passed consistently and Jenkins works fine we can merge then.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 4, 2018

@guozhangwang I'm doing it now. I'll let you know when it's done.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Couple of comments. Maybe @guozhangwang and @bbejeck want to chime in, too.

@@ -76,6 +78,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return the value for "poll.ms" -- Should we check for "global.consumer.poll.ms" and user this value if present instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. poll.ms is not a ConsumerConfig, but a StreamsConfig, so users are not expected to prefix it with the consumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Still I am wondering if a single config for all consumers is smart. Also, the config is only used to pass into Consumer#poll() -- so even if it's not a config parameter, it's effectively still very similar to a "consumer config". I agree that it would required explicit documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is generally better to allow different values for this; on the other hand, I felt it is too much of a lower-level details for people to really figure it out right. For example, today global consumers have two callers of poll(), restore consumer has two as well, and normal consumer has one. We can argue further that all these five calls may prefer different values for the reason we discussed in this PR, like whether it is call in the main loop or not.

Besides, if want to allow prefix for this config we can rush it into 2.0, but it may be considered a public API change as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I just convinced myself that it makes sense what you say. As you pointed out, we call poll() on different consumers with different purpose:

normale processing:

  • main-consumer
  • global-consumer (regular g-table update)

updating standby tasks:

  • restore-consumer

restoring:

  • main-consumer (that is still doing regular work)
  • restore-consumer
  • global-consumer

Applying the same value poll.ms to main and global consumer in normal processing case makes sense to me. For updating standby-tasks, we hard-code "zero" already. Same for the main-consumer during restore phase, to speed up restoring. Both are reasonable choices. The last case, is the restore case for restore-consumer and global-consumer -- it might make sense to apply a different poll-time for thiscase, but I also think that using the same as for the normal processing case is fine.

Thus, long story short, iff we ever introduce a prefix for poll.ms parameters, it should be for "normal processing phase" and "restore phase" but should not be tied to the actual used consumer-instance. Thus, using the consumer prefix does not make sense.

@@ -212,7 +213,7 @@ State setState(final State newState) {
if (newState == State.RUNNING) {
updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
} else {
updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
Copy link
Member

@mjsax mjsax Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Java8! :)

@@ -602,7 +603,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above: should we check for "restore.consumer.poll.ms" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not, following Guozhang's comment.

@@ -710,10 +712,10 @@ public StreamThread(final Time time,
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;

this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above: "main.consumer.poll.ms" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not, following Guozhang's comment.

final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
// poll(0): Since this is during the normal processing, not during restoration.
// We can afford to have slower restore (because we don't wait inside poll for results).
// Instead, we want to proceed to the next iteration to call the normal-consumer.poll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the normal-consumer.poll -> main consumer#poll()

// poll(0): Since this is during the normal processing, not during restoration.
// We can afford to have slower restore (because we don't wait inside poll for results).
// Instead, we want to proceed to the next iteration to call the normal-consumer.poll
// sooner to not be kicked out of the group.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sooner -> as soon as possible

@@ -234,7 +235,6 @@ public boolean conditionMet() {
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}

@Ignore // this test cannot pass as long as GST blocks KS.start()
@Test
public void testGlobalThreadCloseWithoutConnectingToBroker() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: globalThreadShouldTimeoutAndThrowExceptionWhenBrokerConnectionCannotBeEstablished

@@ -243,17 +243,22 @@ public void testGlobalThreadCloseWithoutConnectingToBroker() {
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
props.put("session.timeout.ms", 3001);
props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 3002);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 3001 and 3002 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are dependencies among the timing configs, session timeout must be greater than some other timeouts. But this test doesn't need to take 3s.

I'm pushing a change to be clearer about what's going on here and also reduce the time to 200ms.

@@ -254,7 +256,7 @@ private static void ensureStreamsApplicationDown(final String kafka) {
topics.add("repartition");
}
consumer.subscribe(topics);
consumer.poll(0);
firstPoll = consumer.poll(Duration.ofMillis(100));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix!

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

Thanks for the review, @mjsax.
I need to wait until #5142 is merged, then I'll rebase this (and resolve the comment), and then we should be done with this PR.

@mjsax
Copy link
Member

mjsax commented Jun 6, 2018

Still want to discuss the prefix thing. Also, this PR needs to be rebase because of #5013

@guozhangwang
Copy link
Contributor

I'm +1 assuming that the PR is rebased.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 6, 2018

Note to self: This will need a cherry-pick PR to 2.0 branch.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 7, 2018

Fixed and rebased. The fix I needed was to switch to AdminClient to fetch the committed offsets in EosTestDriver. It would be worth re-reviewing that part.

New round of system tests in progress. I'll paste a link if/when they pass.

Also, refreshing the reminder to myself to cherry-pick this once it's merged.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, assuming system test passed.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits. Not reason to re-run the system tests.

For my own education: why did the EosTestDriver break? Unclear to me atm why using the consumer does not work any longer to get the committed offsets?

@@ -843,15 +845,15 @@ long runOnce(final long recordsProcessedBeforeCommit) {
/**
* Get the next batch of records by polling.
*
* @param pollTimeMs poll time millis parameter for the consumer poll
* @param pollTime poll time millis parameter for the consumer poll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update parameter description; still says "millis"

// few other configs.
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
props.put("session.timeout.ms", 201);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, weird, I was looking for that config, and the only one I found was in an internal class.

for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
try (final org.apache.kafka.clients.admin.AdminClient adminClient = KafkaAdminClient.create(props)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use class import to avoid long name here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scala AdminClient was already in scope. I've refactored to just use the Java one.

final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
try {
topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
} catch (final InterruptedException | ExecutionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this nested try-catch? Can't we use a single one?

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 8, 2018

With this latest change, I'm going to run the EOS system tests again.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 8, 2018

The EOS system tests passed for me locally. Here's the in-progress run of the EOS tests x5: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1780

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 8, 2018

In response to your question, @mjsax,

The call to Consumer#position requires that the consumer instance has already joined the consumer group. The only call in Consumer that will cause it to join the group is poll. Therefore, calling poll is a precondition to calling position. We used to call poll(0), but that's deprecated now, for good reasons. We cannot call poll(Duration) because the intent right here is just to find the committed offsets, specifically not to poll any messages.

We have considered adding a separate method to Consumer to joinGroupIfNeeded or something, and I actually could have created it protected to support internal usage of it. Another option was to make position join the group directly.

Upon reflection, though, both of those are silly for this usage. Why should we create a new Consumer instance and join the group when we intend not to poll; only to find out what the committed offsets are? That was the only operation we were doing on the client before closing it. This operation is more appropriately done via the AdminClient, which is what we're doing now.

And this is ultimately why we opted not to add that extra Consumer method right away. Although it wouldn't be wrong to have it, it's not clear that it's needed. At first (and now second) glance, the only reason you should join the group is to poll messages, so we avoid misleadingly implying otherwise by omitting the (presumably useless) method.

@vvcephei
Copy link
Contributor Author

vvcephei commented Jun 8, 2018

@guozhangwang guozhangwang merged commit 74bdafe into apache:trunk Jun 8, 2018
guozhangwang pushed a commit that referenced this pull request Jun 8, 2018
Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]>
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cherry-picked to 2.0 since it is an important bug fix to improve operational experience.

@vvcephei
Copy link
Contributor Author

Thanks, @guozhangwang !

@vvcephei vvcephei deleted the KAFKA-5697-streams-nonblocking-poll branch June 10, 2018 00:20
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants