Skip to content

Commit

Permalink
KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not…
Browse files Browse the repository at this point in the history
… exist (apache#5421)

1. As titled and as described in comments.
2. Modified unit test slightly to insert for new keys in committed data to expose this issue.

Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
guozhangwang authored Jul 27, 2018
1 parent a932520 commit 061885e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public String toString() {
return toString("");
}

public boolean isEosEnabled() {
return eosEnabled;
}

/**
* Produces a string representation containing useful information about a Task starting with the given indent.
* This is useful in debugging scenarios.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public TopicPartition partition() {
return partition;
}

public String storeName() {
return storeName;
}

long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void register(final StateRestorer restorer) {

public Collection<TopicPartition> restore(final RestoringTasks active) {
if (!needsInitializing.isEmpty()) {
initialize();
initialize(active);
}

if (needsRestoring.isEmpty()) {
Expand Down Expand Up @@ -111,7 +111,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {
return completed();
}

private void initialize() {
private void initialize(final RestoringTasks active) {
if (!restoreConsumer.subscription().isEmpty()) {
throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")");
}
Expand Down Expand Up @@ -165,11 +165,12 @@ private void initialize() {

// set up restorer for those initializable
if (!initializable.isEmpty()) {
startRestoration(initializable);
startRestoration(initializable, active);
}
}

private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
final RestoringTasks active) {
log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());

final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
Expand All @@ -186,6 +187,18 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
restorer.restoreStarted();
} else {
final StreamTask task = active.restoringTaskFor(restorer.partition());

// If checkpoint does not exist it means the task was not shutdown gracefully before;
// and in this case if EOS is turned on we should wipe out the state and re-initialize the task
if (task.isEosEnabled()) {
log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition());
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
} else {
log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition());
}

restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
needsPositionUpdate.add(restorer);
}
Expand Down Expand Up @@ -280,6 +293,9 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
if (!restoreRecords.isEmpty()) {
restorer.restore(restoreRecords);
restorer.restoreBatchCompleted(lastRestoredOffset, records.size());

log.trace("Restored from {} to {} with {} records, ending offset is {}, next starting position is {}",
restorer.partition(), restorer.storeName(), records.size(), lastRestoredOffset, nextPosition);
}

return nextPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
// the app is supposed to emit all 40 update records into the output topic
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
// in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store
//
// the failure gets inject after 20 committed and 30 uncommitted records got received
// the failure gets inject after 20 committed and 10 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state stores should contain the correct sums
// per key (even if some records got processed twice)
Expand All @@ -402,7 +403,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
streams.start();

final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);

final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
Expand Down Expand Up @@ -610,10 +611,6 @@ public void init(final ProcessorContext context) {

@Override
public KeyValue<Long, Long> transform(final Long key, final Long value) {
if (errorInjected.compareAndSet(true, false)) {
// only tries to fail once on one of the task
throw new RuntimeException("Injected test exception.");
}
if (gcInjected.compareAndSet(true, false)) {
while (doGC) {
try {
Expand All @@ -631,16 +628,27 @@ public KeyValue<Long, Long> transform(final Long key, final Long value) {

if (state != null) {
Long sum = state.get(key);

if (sum == null) {
sum = value;
} else {
sum += value;
}
state.put(key, sum);
context.forward(key, sum);
return null;
state.flush();
}


if (errorInjected.compareAndSet(true, false)) {
// only tries to fail once on one of the task
throw new RuntimeException("Injected test exception.");
}

if (state != null) {
return new KeyValue<>(key, state.get(key));
} else {
return new KeyValue<>(key, value);
}
return new KeyValue<>(key, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
final int messages = 10;
setupConsumer(messages, topicPartition);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

assertThat(callback.restored.size(), equalTo(messages));
}

Expand All @@ -136,8 +139,8 @@ public Set<TopicPartition> partitions() {
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
"storeName"));

EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
EasyMock.replay(active);
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
EasyMock.replay(active, task);

// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
Expand All @@ -164,7 +167,8 @@ public void shouldClearAssignmentAtEndOfRestore() {
setupConsumer(messages, topicPartition);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
"storeName"));

expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
}
Expand All @@ -175,6 +179,8 @@ public void shouldRestoreToLimitWhenSupplied() {
final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true,
"storeName");
changelogReader.register(restorer);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(3));
assertThat(restorer.restoredOffset(), equalTo(3L));
Expand All @@ -192,14 +198,14 @@ public void shouldRestoreMultipleStores() {
setupConsumer(5, one);
setupConsumer(3, two);

changelogReader
.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));

expect(active.restoringTaskFor(one)).andReturn(null);
expect(active.restoringTaskFor(two)).andReturn(null);
replay(active);
expect(active.restoringTaskFor(one)).andStubReturn(task);
expect(active.restoringTaskFor(two)).andStubReturn(task);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

assertThat(callback.restored.size(), equalTo(10));
Expand All @@ -224,9 +230,13 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception {
changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));

expect(active.restoringTaskFor(one)).andReturn(null);
expect(active.restoringTaskFor(two)).andReturn(null);
replay(active);
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

changelogReader.restore(active);

assertThat(callback.restored.size(), equalTo(10));
Expand All @@ -248,6 +258,8 @@ public void shouldOnlyReportTheLastRestoredOffset() {
setupConsumer(10, topicPartition);
changelogReader
.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

assertThat(callback.restored.size(), equalTo(5));
Expand Down Expand Up @@ -306,7 +318,10 @@ public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
public void shouldReturnRestoredOffsetsForPersistentStores() {
setupConsumer(10, topicPartition);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
}
Expand All @@ -315,6 +330,8 @@ public void shouldReturnRestoredOffsetsForPersistentStores() {
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
setupConsumer(10, topicPartition);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
Expand All @@ -330,6 +347,8 @@ public void shouldIgnoreNullKeysWhenRestoring() {
consumer.assign(Collections.singletonList(topicPartition));
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
"storeName"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes))));
Expand All @@ -340,6 +359,9 @@ public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
setupConsumer(0, topicPartition);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);

final Collection<TopicPartition> restored = changelogReader.restore(active);
assertThat(restored, equalTo(expected));
}
Expand All @@ -354,10 +376,9 @@ public void shouldRestorePartitionsRegisteredPostInitialization() {
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));

final TopicPartition postInitialization = new TopicPartition("other", 0);
expect(active.restoringTaskFor(topicPartition)).andReturn(null);
expect(active.restoringTaskFor(topicPartition)).andReturn(null);
expect(active.restoringTaskFor(postInitialization)).andReturn(null);
replay(active);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
expect(active.restoringTaskFor(postInitialization)).andStubReturn(task);
replay(active, task);

assertTrue(changelogReader.restore(active).isEmpty());

Expand Down

0 comments on commit 061885e

Please sign in to comment.