Skip to content

Commit

Permalink
KAFKA-3514: Remove min timestamp tracker (apache#5382)
Browse files Browse the repository at this point in the history
1. Remove MinTimestampTracker and its TimestampTracker interface.
2. In RecordQueue, keep track of the head record (deserialized) while put the rest raw bytes records in the fifo queue, the head record as well as the partition timestamp will be updated accordingly.

Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
guozhangwang authored Jul 19, 2018
1 parent 10a0b8a commit 2f6240a
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 342 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ RecordQueue queue() {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
this.partitionQueues = partitionQueues;
totalBuffered = 0;
streamTime = -1;
streamTime = RecordQueue.NOT_KNOWN;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,30 @@
* timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
*/
public class RecordQueue {

static final long NOT_KNOWN = -1L;

private final Logger log;
private final SourceNode source;
private final TimestampExtractor timestampExtractor;
private final TopicPartition partition;
private final ArrayDeque<StampedRecord> fifoQueue;
private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
private final RecordDeserializer recordDeserializer;
private final ProcessorContext processorContext;
private final Logger log;
private final TimestampExtractor timestampExtractor;
private final RecordDeserializer recordDeserializer;
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;

private long partitionTime = TimestampTracker.NOT_KNOWN;
private long partitionTime = NOT_KNOWN;
private StampedRecord headRecord = null;

RecordQueue(final TopicPartition partition,
final SourceNode source,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
final InternalProcessorContext processorContext,
final LogContext logContext) {
this.partition = partition;
this.source = source;
this.timestampExtractor = timestampExtractor;
this.partition = partition;
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
this.timestampExtractor = timestampExtractor;
this.recordDeserializer = new RecordDeserializer(
source,
deserializationExceptionHandler,
Expand Down Expand Up @@ -93,48 +95,10 @@ public TopicPartition partition() {
*/
int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {

final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
// this only happens if the deserializer decides to skip. It has already logged the reason.
continue;
}

final long timestamp;
try {
timestamp = timestampExtractor.extract(record, timeTracker.get());
} catch (final StreamsException internalFatalExtractorException) {
throw internalFatalExtractorException;
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in TimestampExtractor callback for record %s.", record),
fatalUserException);
}
log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);

// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
log.warn(
"Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
);
((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
continue;
}

final StampedRecord stampedRecord = new StampedRecord(record, timestamp);
fifoQueue.addLast(stampedRecord);
timeTracker.addElement(stampedRecord);
fifoQueue.addLast(rawRecord);
}

// update the partition timestamp if its currently
// tracked min timestamp has exceed its value; this will
// usually only take effect for the first added batch
final long timestamp = timeTracker.get();

if (timestamp > partitionTime) {
partitionTime = timestamp;
}
maybeUpdateTimestamp();

return size();
}
Expand All @@ -145,23 +109,12 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
* @return StampedRecord
*/
public StampedRecord poll() {
final StampedRecord elem = fifoQueue.pollFirst();

if (elem == null) {
return null;
}

timeTracker.removeElement(elem);
final StampedRecord recordToReturn = headRecord;
headRecord = null;

// only advance the partition timestamp if its currently
// tracked min timestamp has exceeded its value
final long timestamp = timeTracker.get();
maybeUpdateTimestamp();

if (timestamp > partitionTime) {
partitionTime = timestamp;
}

return elem;
return recordToReturn;
}

/**
Expand All @@ -170,7 +123,8 @@ public StampedRecord poll() {
* @return the number of records
*/
public int size() {
return fifoQueue.size();
// plus one deserialized head record for timestamp tracking
return fifoQueue.size() + (headRecord == null ? 0 : 1);
}

/**
Expand All @@ -179,7 +133,7 @@ public int size() {
* @return true if the queue is empty, otherwise false
*/
public boolean isEmpty() {
return fifoQueue.isEmpty();
return fifoQueue.isEmpty() && headRecord == null;
}

/**
Expand All @@ -196,16 +150,48 @@ public long timestamp() {
*/
public void clear() {
fifoQueue.clear();
timeTracker.clear();
partitionTime = TimestampTracker.NOT_KNOWN;
headRecord = null;
partitionTime = NOT_KNOWN;
}

/*
* Returns the timestamp tracker of the record queue
*
* This is only used for testing
*/
TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
return timeTracker;
private void maybeUpdateTimestamp() {
while (headRecord == null && !fifoQueue.isEmpty()) {
final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext, raw);

if (deserialized == null) {
// this only happens if the deserializer decides to skip. It has already logged the reason.
continue;
}

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, partitionTime);
} catch (final StreamsException internalFatalExtractorException) {
throw internalFatalExtractorException;
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in TimestampExtractor callback for record %s.", deserialized),
fatalUserException);
}
log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, deserialized);

// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
log.warn(
"Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
);
((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
continue;
}

headRecord = new StampedRecord(deserialized, timestamp);

// update the partition timestamp if the current head record's timestamp has exceed its value
if (timestamp > partitionTime) {
partitionTime = timestamp;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public Map<TopicPartition, Long> offsets() {
return Collections.emptyMap();
}
};
private long streamTime = TimestampTracker.NOT_KNOWN;

private long streamTime = RecordQueue.NOT_KNOWN;

StandbyContextImpl(final TaskId id,
final StreamsConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ public boolean maybePunctuateStreamTime() {

// if the timestamp is not known yet, meaning there is not enough data accumulated
// to reason stream partition time, then skip.
if (timestamp == TimestampTracker.NOT_KNOWN) {
if (timestamp == RecordQueue.NOT_KNOWN) {
return false;
} else {
return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this);
Expand Down

This file was deleted.

Loading

0 comments on commit 2f6240a

Please sign in to comment.