Skip to content

Commit

Permalink
KAFKA-5697: Use nonblocking poll in Streams (apache#5107)
Browse files Browse the repository at this point in the history
Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
vvcephei authored and guozhangwang committed Jun 8, 2018
1 parent d2b2fbd commit 74bdafe
Show file tree
Hide file tree
Showing 19 changed files with 188 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
private InternalProcessorContext processorContext;
private final int retries;
private final long retryBackoffMs;
private final Duration pollTime;

public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology,
Expand All @@ -76,6 +78,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
}

@Override
Expand Down Expand Up @@ -262,7 +265,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,

while (offset < highWatermark) {
try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -200,7 +201,7 @@ static class StateConsumer {
private final Consumer<byte[], byte[]> globalConsumer;
private final GlobalStateMaintainer stateMaintainer;
private final Time time;
private final long pollMs;
private final Duration pollTime;
private final long flushInterval;
private final Logger log;

Expand All @@ -210,13 +211,13 @@ static class StateConsumer {
final Consumer<byte[], byte[]> globalConsumer,
final GlobalStateMaintainer stateMaintainer,
final Time time,
final long pollMs,
final Duration pollTime,
final long flushInterval) {
this.log = logContext.logger(getClass());
this.globalConsumer = globalConsumer;
this.stateMaintainer = stateMaintainer;
this.time = time;
this.pollMs = pollMs;
this.pollTime = pollTime;
this.flushInterval = flushInterval;
}

Expand All @@ -235,7 +236,7 @@ void initialize() {

void pollAndUpdate() {
try {
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime);
for (final ConsumerRecord<byte[], byte[]> record : received) {
stateMaintainer.update(record);
}
Expand Down Expand Up @@ -338,8 +339,9 @@ private StateConsumer initialize() {
logContext
),
time,
config.getLong(StreamsConfig.POLL_MS_CONFIG),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
);
stateConsumer.initialize();

return stateConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
private final Duration pollTime;

public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final Duration pollTime,
final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
this.restoreConsumer = restoreConsumer;
this.pollTime = pollTime;
this.log = logContext.logger(getClass());
this.userStateRestoreListener = userStateRestoreListener;
}
Expand All @@ -76,7 +80,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {
}

try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
while (iterator.hasNext()) {
final TopicPartition partition = iterator.next();
Expand Down Expand Up @@ -295,6 +299,7 @@ private boolean hasPartition(final TopicPartition topicPartition) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -212,7 +213,7 @@ State setState(final State newState) {
if (newState == State.RUNNING) {
updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
} else {
updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
}
}

Expand Down Expand Up @@ -555,7 +556,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
}

private final Time time;
private final long pollTimeMs;
private final Duration pollTime;
private final long commitTimeMs;
private final Object stateLock;
private final Logger log;
Expand Down Expand Up @@ -602,7 +603,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext);

Producer<byte[], byte[]> threadProducer = null;
final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
Expand Down Expand Up @@ -710,10 +712,10 @@ public StreamThread(final Time time,
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;

this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);

updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
}

/**
Expand Down Expand Up @@ -801,14 +803,14 @@ long runOnce(final long recordsProcessedBeforeCommit) {
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(0L);
records = pollRequests(Duration.ZERO);

if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
} else {
// try to fetch some records if necessary
records = pollRequests(pollTimeMs);
records = pollRequests(pollTime);

// if state changed after the poll call,
// try to initialize the assigned tasks again
Expand Down Expand Up @@ -843,15 +845,15 @@ long runOnce(final long recordsProcessedBeforeCommit) {
/**
* Get the next batch of records by polling.
*
* @param pollTimeMs poll time millis parameter for the consumer poll
* @param pollTime how long to block in Consumer#poll
* @return Next batch of records or null if no records available.
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
ConsumerRecords<byte[], byte[]> records = null;

try {
records = consumer.poll(pollTimeMs);
records = consumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
}
Expand Down Expand Up @@ -1078,7 +1080,11 @@ private void maybeUpdateStandbyTasks(final long now) {
}

try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
// poll(0): Since this is during the normal processing, not during restoration.
// We can afford to have slower restore (because we don't wait inside poll for results).
// Instead, we want to proceed to the next iteration to call the main consumer#poll()
// as soon as possible so as to not be kicked out of the group.
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(Duration.ZERO);

if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
Expand All @@ -25,6 +27,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -42,7 +45,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -234,26 +236,35 @@ public boolean conditionMet() {
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}

@Ignore // this test cannot pass as long as GST blocks KS.start()
@Test
public void testGlobalThreadCloseWithoutConnectingToBroker() {
public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);

// We want to configure request.timeout.ms, but it must be larger than a
// few other configs.
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201);
props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202);

final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
streams.close();
try {
streams.start();
fail("expected start() to time out and throw an exception.");
} catch (final StreamsException expected) {
// This is a result of not being able to connect to the broker.
}
// There's nothing to assert... We're testing that this operation actually completes.
}

@Ignore // this test cannot pass until we implement KIP-266
@Test
public void testLocalThreadCloseWithoutConnectingToBroker() {
final Properties props = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -464,7 +465,7 @@ private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
while (totalPollTimeMs < waitTime &&
continueConsuming(consumerRecords.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(pollIntervalMs));

for (final ConsumerRecord<K, V> record : records) {
consumerRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.streams.state.WindowStore;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -334,7 +335,7 @@ private void consumeAndProduce(final String topic) {
consumer.seekToBeginning(partitions);

while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
Expand Down Expand Up @@ -372,7 +373,7 @@ private void consume(final String topic) {
consumer.seekToBeginning(partitions);

while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -233,7 +234,7 @@ private AbstractTask createTask(final Consumer consumer,
storeTopicPartitions,
ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics),
consumer,
new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
false,
stateDirectory,
config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -122,7 +123,12 @@ private StreamsConfig createConfig(final File baseDir) throws IOException {

private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
private final StoreChangelogReader changelogReader = new StoreChangelogReader(
restoreStateConsumer,
Duration.ZERO,
stateRestoreListener,
new LogContext("standby-task-test ")
);

private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
Expand Down Expand Up @@ -188,7 +194,7 @@ public void testUpdate() throws IOException {
}

restoreStateConsumer.seekToBeginning(partition);
task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));

StandbyContextImpl context = (StandbyContextImpl) task.context();
MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
Expand Down Expand Up @@ -245,7 +251,7 @@ public void testUpdateKTable() throws IOException {
}

// The commit offset is at 0L. Records should not be processed
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
assertEquals(5, remaining.size());

committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
Expand Down
Loading

0 comments on commit 74bdafe

Please sign in to comment.