From c8c3a7dc48dc1ec6220798294da123bb617a6cd7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 27 Jul 2018 22:11:55 -0700 Subject: [PATCH] KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset (#5430) 1. When we reinitialize the state store due to no CHECKPOINT with EOS turned on, we should update the checkpoint to consumer.seekToBeginnning() / consumer.position() to avoid falling into endless iterations. 2. Fixed a few other logic bugs around needsInitializing and needsRestoring. Reviewers: Jason Gustafson , Bill Bejeck --- .../processor/internals/StateRestorer.java | 10 +- .../internals/StoreChangelogReader.java | 117 ++++++++++-------- .../processor/internals/StreamThread.java | 2 +- .../internals/StoreChangelogReaderTest.java | 17 ++- 4 files changed, 87 insertions(+), 59 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index c1a41cefc23c..3bbf42ead272 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -26,13 +26,13 @@ public class StateRestorer { static final int NO_CHECKPOINT = -1; - private final Long checkpoint; private final long offsetLimit; private final boolean persistent; private final String storeName; private final TopicPartition partition; private final CompositeRestoreListener compositeRestoreListener; + private long checkpointOffset; private long restoredOffset; private long startingOffset; private long endingOffset; @@ -45,7 +45,7 @@ public class StateRestorer { final String storeName) { this.partition = partition; this.compositeRestoreListener = compositeRestoreListener; - this.checkpoint = checkpoint; + this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint; this.offsetLimit = offsetLimit; this.persistent = persistent; this.storeName = storeName; @@ -60,7 +60,11 @@ public String storeName() { } long checkpoint() { - return checkpoint == null ? NO_CHECKPOINT : checkpoint; + return checkpointOffset; + } + + void setCheckpointOffset(final long checkpointOffset) { + this.checkpointOffset = checkpointOffset; } void restoreStarted() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 1927b5a7af71..9185920f242f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader { private final Map endOffsets = new HashMap<>(); private final Map> partitionInfo = new HashMap<>(); private final Map stateRestorers = new HashMap<>(); - private final Map needsRestoring = new HashMap<>(); - private final Map needsInitializing = new HashMap<>(); + private final Set needsRestoring = new HashSet<>(); + private final Set needsInitializing = new HashSet<>(); + private final Set completedRestorers = new HashSet<>(); private final Duration pollTime; public StoreChangelogReader(final Consumer restoreConsumer, @@ -64,9 +65,14 @@ public StoreChangelogReader(final Consumer restoreConsumer, @Override public void register(final StateRestorer restorer) { - restorer.setUserRestoreListener(userStateRestoreListener); - stateRestorers.put(restorer.partition(), restorer); - needsInitializing.put(restorer.partition(), restorer); + if (!stateRestorers.containsKey(restorer.partition())) { + restorer.setUserRestoreListener(userStateRestoreListener); + stateRestorers.put(restorer.partition(), restorer); + + log.trace("Added restorer for changelog {}", restorer.partition()); + } + + needsInitializing.add(restorer.partition()); } public Collection restore(final RestoringTasks active) { @@ -81,16 +87,15 @@ public Collection restore(final RestoringTasks active) { try { final ConsumerRecords records = restoreConsumer.poll(pollTime); - final Iterator iterator = needsRestoring.keySet().iterator(); - while (iterator.hasNext()) { - final TopicPartition partition = iterator.next(); + + for (final TopicPartition partition : needsRestoring) { final StateRestorer restorer = stateRestorers.get(partition); final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition)); restorer.setRestoredOffset(pos); if (restorer.hasCompleted(pos, endOffsets.get(partition))) { restorer.restoreDone(); endOffsets.remove(partition); - iterator.remove(); + completedRestorers.add(partition); } } } catch (final InvalidOffsetException recoverableException) { @@ -98,12 +103,18 @@ public Collection restore(final RestoringTasks active) { final Set partitions = recoverableException.partitions(); for (final TopicPartition partition : partitions) { final StreamTask task = active.restoringTaskFor(partition); - log.info("Reinitializing StreamTask {}", task); + log.info("Reinitializing StreamTask {} for changelog {}", task, partition); + + needsInitializing.remove(partition); + needsRestoring.remove(partition); + task.reinitializeStateStoresForPartitions(recoverableException.partitions()); } restoreConsumer.seekToBeginning(partitions); } + needsRestoring.removeAll(completedRestorers); + if (needsRestoring.isEmpty()) { restoreConsumer.unsubscribe(); } @@ -120,25 +131,24 @@ private void initialize(final RestoringTasks active) { // the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet refreshChangelogInfo(); - final Map initializable = new HashMap<>(); - for (final Map.Entry entry : needsInitializing.entrySet()) { - final TopicPartition topicPartition = entry.getKey(); + final Set initializable = new HashSet<>(); + for (final TopicPartition topicPartition : needsInitializing) { if (hasPartition(topicPartition)) { - initializable.put(entry.getKey(), entry.getValue()); + initializable.add(topicPartition); } } // try to fetch end offsets for the initializable restorers and remove any partitions // where we already have all of the data try { - endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet())); + endOffsets.putAll(restoreConsumer.endOffsets(initializable)); } catch (final TimeoutException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable); return; } - final Iterator iter = initializable.keySet().iterator(); + final Iterator iter = initializable.iterator(); while (iter.hasNext()) { final TopicPartition topicPartition = iter.next(); final Long endOffset = endOffsets.get(topicPartition); @@ -146,13 +156,15 @@ private void initialize(final RestoringTasks active) { // offset should not be null; but since the consumer API does not guarantee it // we add this check just in case if (endOffset != null) { - final StateRestorer restorer = needsInitializing.get(topicPartition); + final StateRestorer restorer = stateRestorers.get(topicPartition); if (restorer.checkpoint() >= endOffset) { restorer.setRestoredOffset(restorer.checkpoint()); iter.remove(); + completedRestorers.add(topicPartition); } else if (restorer.offsetLimit() == 0 || endOffset == 0) { restorer.setRestoredOffset(0); iter.remove(); + completedRestorers.add(topicPartition); } else { restorer.setEndingOffset(endOffset); } @@ -169,51 +181,59 @@ private void initialize(final RestoringTasks active) { } } - private void startRestoration(final Map initialized, + private void startRestoration(final Set initialized, final RestoringTasks active) { - log.debug("Start restoring state stores from changelog topics {}", initialized.keySet()); + log.debug("Start restoring state stores from changelog topics {}", initialized); final Set assignment = new HashSet<>(restoreConsumer.assignment()); - assignment.addAll(initialized.keySet()); + assignment.addAll(initialized); restoreConsumer.assign(assignment); final List needsPositionUpdate = new ArrayList<>(); - for (final StateRestorer restorer : initialized.values()) { + + for (final TopicPartition partition : initialized) { + final StateRestorer restorer = stateRestorers.get(partition); if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { - restoreConsumer.seek(restorer.partition(), restorer.checkpoint()); - logRestoreOffsets(restorer.partition(), - restorer.checkpoint(), - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(restoreConsumer.position(restorer.partition())); + restoreConsumer.seek(partition, restorer.checkpoint()); + logRestoreOffsets(partition, + restorer.checkpoint(), + endOffsets.get(partition)); + restorer.setStartingOffset(restoreConsumer.position(partition)); restorer.restoreStarted(); } else { - final StreamTask task = active.restoringTaskFor(restorer.partition()); - - // If checkpoint does not exist it means the task was not shutdown gracefully before; - // and in this case if EOS is turned on we should wipe out the state and re-initialize the task - if (task.isEosEnabled()) { - log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + - "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition()); - task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition())); - } else { - log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition()); - } - - restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition())); + restoreConsumer.seekToBeginning(Collections.singletonList(partition)); needsPositionUpdate.add(restorer); } } for (final StateRestorer restorer : needsPositionUpdate) { - final long position = restoreConsumer.position(restorer.partition()); - logRestoreOffsets(restorer.partition(), - position, - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(position); - restorer.restoreStarted(); + final TopicPartition partition = restorer.partition(); + + // If checkpoint does not exist it means the task was not shutdown gracefully before; + // and in this case if EOS is turned on we should wipe out the state and re-initialize the task + final StreamTask task = active.restoringTaskFor(partition); + if (task.isEosEnabled()) { + log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + + "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition); + + needsInitializing.remove(partition); + initialized.remove(partition); + restorer.setCheckpointOffset(restoreConsumer.position(partition)); + + task.reinitializeStateStoresForPartitions(Collections.singleton(partition)); + } else { + log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), partition); + + final long position = restoreConsumer.position(restorer.partition()); + logRestoreOffsets(restorer.partition(), + position, + endOffsets.get(restorer.partition())); + restorer.setStartingOffset(position); + restorer.restoreStarted(); + } } - needsRestoring.putAll(initialized); + needsRestoring.addAll(initialized); } private void logRestoreOffsets(final TopicPartition partition, @@ -226,10 +246,7 @@ private void logRestoreOffsets(final TopicPartition partition, } private Collection completed() { - final Set completed = new HashSet<>(stateRestorers.keySet()); - completed.removeAll(needsRestoring.keySet()); - log.trace("The set of restoration completed partitions so far: {}", completed); - return completed; + return completedRestorers; } private void refreshChangelogInfo() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 31de839b1a1d..428aa1d938b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1129,7 +1129,7 @@ private void maybeUpdateStandbyTasks(final long now) { throw new TaskMigratedException(task); } - log.info("Reinitializing StandbyTask {}", task); + log.info("Reinitializing StandbyTask {} from changelogs {}", task, recoverableException.partitions()); task.reinitializeStateStoresForPartitions(recoverableException.partitions()); } restoreConsumer.seekToBeginning(partitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 1e74d47808e2..ae48f57db314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -100,7 +100,11 @@ public Map> listTopics() { public void shouldThrowExceptionIfConsumerHasCurrentSubscription() { final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class); mockRestorer.setUserRestoreListener(stateRestoreListener); - expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0)); + expect(mockRestorer.partition()) + .andReturn(new TopicPartition("sometopic", 0)) + .andReturn(new TopicPartition("sometopic", 0)) + .andReturn(new TopicPartition("sometopic", 0)) + .andReturn(new TopicPartition("sometopic", 0)); EasyMock.replay(mockRestorer); changelogReader.register(mockRestorer); @@ -144,6 +148,9 @@ public Set partitions() { // first restore call "fails" but we should not die with an exception assertEquals(0, changelogReader.restore(active).size()); + + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, + "storeName")); // retry restore should succeed assertEquals(1, changelogReader.restore(active).size()); assertThat(callback.restored.size(), equalTo(messages)); @@ -226,9 +233,9 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception { setupConsumer(3, two); changelogReader - .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1")); - changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); - changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); + .register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE, true, "storeName1")); + changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE, true, "storeName2")); + changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE, true, "storeName3")); expect(active.restoringTaskFor(one)).andReturn(task); expect(active.restoringTaskFor(two)).andReturn(task); @@ -257,7 +264,7 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception { public void shouldOnlyReportTheLastRestoredOffset() { setupConsumer(10, topicPartition); changelogReader - .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1")); + .register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1")); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active);