Skip to content

Commit

Permalink
KAFKA-6906: Fixed to commit transactions if data is produced via wall…
Browse files Browse the repository at this point in the history
… clock punctuation (apache#5105)

Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
jadireddi authored and mjsax committed Jun 11, 2018
1 parent cc4157d commit ee5cc97
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,17 +424,22 @@ private void commitOffsets(final boolean startNewTransaction) {

if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
producer.commitTransaction();
transactionInFlight = false;
if (startNewTransaction) {
producer.beginTransaction();
transactionInFlight = true;
}
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
}
commitOffsetNeeded = false;
} else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
}

if (eosEnabled) {
producer.commitTransaction();
transactionInFlight = false;
if (startNewTransaction) {
producer.beginTransaction();
transactionInFlight = true;
}
}

if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
producer.commitTransaction();
transactionInFlight = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
Expand Down Expand Up @@ -913,6 +914,7 @@ public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
@Test
public void shouldCloseProducerOnCloseWhenEosEnabled() {
task = createStatelessTask(createConfig(true));
task.initializeTopology();
task.close(true, false);
task = null;

Expand Down Expand Up @@ -1028,6 +1030,39 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
}

@Test
public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
task = createStatelessTask(createConfig(true));
try {
task.close(true, false);
fail("should have throw IllegalStateException");
} catch (final IllegalStateException expected) {
// pass
}
task = null;

assertTrue(producer.closed());
}

@Test
public void shouldAlwaysCommitIfEosEnabled() {
final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "StreamTask",
new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));

task = createStatelessTask(createConfig(true));
task.initializeStateStores();
task.initializeTopology();
task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(final long timestamp) {
recordCollector.send("result-topic1", 3, 5, null, 0, time.milliseconds(),
new IntegerSerializer(), new IntegerSerializer());
}
});
task.commit();
assertEquals(1, producer.history().size());
}

private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
final ProcessorTopology topology = ProcessorTopology.with(
Utils.<ProcessorNode>mkList(source1, source2),
Expand Down Expand Up @@ -1144,5 +1179,4 @@ private ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition to
recordValue
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh
new TestCondition() {
@Override
public boolean conditionMet() {
return producer.commitCount() == 1;
return producer.commitCount() == 2;
}
},
"StreamsThread did not commit transaction.");
Expand All @@ -681,7 +681,7 @@ public boolean conditionMet() {
},
"StreamsThread did not remove fenced zombie task.");

assertThat(producer.commitCount(), equalTo(1L));
assertThat(producer.commitCount(), equalTo(2L));
}

@Test
Expand Down

0 comments on commit ee5cc97

Please sign in to comment.