Skip to content

Commit

Permalink
KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback (a…
Browse files Browse the repository at this point in the history
…pache#11114)

There were a few followup things to address from apache#10926, most importantly a number of updates to the javadocs. Also includes a few missing verification checks.

Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax <[email protected]>, Israel Ekpo
  • Loading branch information
ableegoldman authored Jul 23, 2021
1 parent 04fd555 commit 7fbc6b7
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 95 deletions.
33 changes: 20 additions & 13 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,32 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>
<p>
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins.
This period determines how long after a window ends any out-of-order records will still be processed.
Records coming in after the grace period has elapsed will be dropped from those windows.
With a long grace period, though Kafka Streams can handle out-of-order data up to that amount of time, it will also incur a high and confusing latency for users,
e.g. suppression operators with the default won't emit results up for 24 hours, while in practice out-of-order data usually has a much smaller time-skewness.
Instead of abstracting this config from users with a long default value, we introduced new constructs such as <code>TimeWindows#ofSizeAndGrace</code> to let callers always set it upon constructing the windows;
the other setters such as <code>TimeWindows#grace</code> are deprecated and will be removed in the future.
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or
stream-stream joins. This period determines how long after a window ends any out-of-order records will still
be processed. Records coming in after the grace period has elapsed are considered late and will be dropped.
But in operators such as suppression, a large grace period has the drawback of incurring an equally large
output latency. The current API made it all too easy to miss the grace period config completely, leading you
to wonder why your application seems to produce no output -- it actually is, but not for 24 hours.
<p>
To prevent accidentally or unknowingly falling back to the default 24hr grace period, we deprecated all of the
existing static constructors for the <code>Windows</code> classes (such as <code>TimeWindows#of</code>). These
are replaced by new static constructors of two flavors: <code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
(these are for the <code>TimeWindows</code> class; analogous APIs exist for the <code>JoinWindows</code>,
<code>SessionWindows</code>, and SlidingWindows classes). With these new APIs you are forced to set the grace
period explicitly, or else consciously choose to opt out by selecting the <code>WithNoGrace</code> flavor which
sets it to 0 for situations where you really don't care about the grace period, for example during testing or
when playing around with Kafka Streams for the first time. Note that using the new APIs for the
<code>JoinWindows</code> class will also enable a fix for spurious left/outer join results, as described in
the following paragraph. For more details on the grace period and new static constructors, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>
</p>
<p>
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior. Check out
<a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a> for more information.
</p>
<ul>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>: Drop 24 hour default of grace period in Streams</li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a>: Avoid spurious left/outer join results in stream-stream join</li>
</ul>

<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,49 +102,61 @@ private JoinWindows(final long beforeMs,

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
* the duration specified by {@code afterWindowEnd} which means that out of order records arriving
* after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
* <p>
* Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
* means that only out-of-order records arriving more than the grace period after the window end will be dropped.
* The window close, after which any incoming records are considered late and will be rejected, is defined as
* {@code windowEnd + afterWindowEnd}
*
* @param timeDifference join window interval
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @return A new JoinWindows object with the specified window definition and grace period
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);

final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);

return new JoinWindows(timeDifferenceMs, timeDifferenceMs, afterWindowEndMs, true);
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
* which means that out of order records arriving after the window end will be dropped.
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param timeDifference join window interval
* @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
*/
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD));
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
*
* @param timeDifference
* @param timeDifference join window interval
* @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference})
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead
* @deprecated since 3.0. Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead
*/
@Deprecated
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
}

/**
Expand Down Expand Up @@ -203,16 +215,14 @@ public long size() {
*
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
import static java.time.Duration.ofMillis;

Expand Down Expand Up @@ -91,16 +91,16 @@ private SessionWindows(final long gapMs, final long graceMs) {

/**
* Creates a new window specification with the specified inactivity gap.
* Using the method implicitly sets the grace period to zero which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param inactivityGap the gap of inactivity between sessions
* @return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped
* @return a window definition with the window size and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivityGap) {
Expand All @@ -109,38 +109,46 @@ public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivit

/**
* Creates a new window specification with the specified inactivity gap.
* Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
* <p>
* Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
* means that only out-of-order records arriving more than the grace period after the window end will be dropped.
* The window close, after which any incoming records are considered late and will be rejected, is defined as
* {@code windowEnd + afterWindowEnd}
*
* @param inactivityGap the gap of inactivity between sessions
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return A SessionWindows object with the specified inactivity gap and grace period
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
* if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapAndGrace(final Duration inactivityGap, final Duration afterWindowEnd) {
return new SessionWindows(inactivityGap.toMillis(), afterWindowEnd.toMillis());
}
final String inactivityGapMsgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, inactivityGapMsgPrefix);

final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);

return new SessionWindows(inactivityGapMs, afterWindowEndMs);
}

/**
* Create a new window specification with the specified inactivity gap.
*
* @param inactivityGap the gap of inactivity between sessions
* @return a new window specification without specifying a grace period (default to 24 hours minus {@code inactivityGap})
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} instead
* @deprecated since 3.0. Use {@link #ofInactivityGapWithNoGrace(Duration)} instead
*/
@Deprecated
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix);

return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
}

/**
Expand All @@ -153,11 +161,12 @@ public static SessionWindows with(final Duration inactivityGap) {
*
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
*/
@Deprecated
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);

Expand Down
Loading

0 comments on commit 7fbc6b7

Please sign in to comment.