Skip to content

Commit

Permalink
SAMZA-1155; Validate users configure window.ms when using the fluent API
Browse files Browse the repository at this point in the history
We will compute triggering duration as follows:
- If user configures `task.window.ms` we will honor it as the triggering duration
- If not, we will use the `GCD(windowTriggerDurations, joinTTLs)` as the triggering duration.

Changes in this PR:
- Common Interface for all time based triggers
- Additional APIs in `StreamGraphImpl` to recursively traverse all `OperatorSpec`s
- Recursive computation of `triggerInterval` for each `WindowOperatorSpec`
- Tests for all the above

Author: vjagadish1989 <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>, Jacob Maes <[email protected]>, Xinyu Liu <[email protected]>

Closes apache#160 from vjagadish1989/samza-1155
  • Loading branch information
jagadish-northguard committed May 6, 2017
1 parent d8534b5 commit e6cc3b7
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* A {@link Trigger} that repeats its underlying trigger forever.
*/
class RepeatingTrigger<M> implements Trigger<M> {
public class RepeatingTrigger<M> implements Trigger<M> {

private final Trigger<M> trigger;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.triggers;

import org.apache.samza.annotation.InterfaceStability;

import java.time.Duration;

/**
* A {@link Trigger} whose firing logic is determined by a time duration.
*
* <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
*
* @param <M> the type of the incoming message
*/

@InterfaceStability.Unstable
public interface TimeBasedTrigger<M> extends Trigger<M> {
Duration getDuration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
* the window pane.
*/
public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {
public class TimeSinceFirstMessageTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand All @@ -35,6 +35,7 @@ public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {
this.duration = duration;
}

@Override
public Duration getDuration() {
return duration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
* @param <M> the type of the incoming {@link MessageEnvelope}
*/
public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {
public class TimeSinceLastMessageTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand All @@ -33,6 +33,7 @@ public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {
this.duration = duration;
}

@Override
public Duration getDuration() {
return duration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/*
* A {@link Trigger} that fires after the specified duration in processing time.
*/
public class TimeTrigger<M> implements Trigger<M> {
public class TimeTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand All @@ -32,6 +32,7 @@ public TimeTrigger(Duration duration) {
this.duration = duration;
}

@Override
public Duration getDuration() {
return duration;
}
Expand Down
44 changes: 44 additions & 0 deletions samza-core/src/main/java/org/apache/samza/execution/JobNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -30,6 +31,10 @@
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,6 +109,17 @@ public JobConfig generateConfig(String executionPlanJson) {

List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));

// set triggering interval if a window or join is defined
if (streamGraph.hasWindowOrJoins()) {
if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
long triggerInterval = computeTriggerInterval();
log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName);

configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
}
}

log.info("Job {} has generated configs {}", jobName, configs);

configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
Expand All @@ -113,6 +129,34 @@ public JobConfig generateConfig(String executionPlanJson) {
return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));
}

/**
* Computes the triggering interval to use during the execution of this {@link JobNode}
*/
private long computeTriggerInterval() {
// Obtain the operator specs from the streamGraph
Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();

// Filter out window operators, and obtain a list of their triggering interval values
List<Long> windowTimerIntervals = operatorSpecs.stream()
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW)
.map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs())
.collect(Collectors.toList());

// Filter out the join operators, and obtain a list of their ttl values
List<Long> joinTtlIntervals = operatorSpecs.stream()
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
.map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs())
.collect(Collectors.toList());

// Combine both the above lists
List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
candidateTimerIntervals.addAll(windowTimerIntervals);

// Compute the gcd of the resultant list
long timerInterval = MathUtils.gcd(candidateTimerIntervals);
return timerInterval;
}

/**
* This function extract the subset of configs from the full config, and use it to override the generated configs
* from the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.stream.InputStreamInternal;
import org.apache.samza.operators.stream.InputStreamInternalImpl;
import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
Expand All @@ -28,11 +29,15 @@
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
Expand Down Expand Up @@ -156,4 +161,47 @@ public ContextManager getContextManager() {
/* package private */ int getNextOpId() {
return this.opId++;
}

/**
* Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl}
*
* @return a set of all available {@link OperatorSpec}s
*/
public Collection<OperatorSpec> getAllOperatorSpecs() {
Collection<InputStreamInternal> inputStreams = inStreams.values();
Set<OperatorSpec> operatorSpecs = new HashSet<>();

for (InputStreamInternal stream : inputStreams) {
doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs);
}
return operatorSpecs;
}

private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> specs) {
Collection<OperatorSpec> registeredOperatorSpecs = stream.getRegisteredOperatorSpecs();
for (OperatorSpec spec : registeredOperatorSpecs) {
specs.add(spec);
MessageStreamImpl nextStream = spec.getNextStream();
if (nextStream != null) {
//Recursively traverse and obtain all reachable operators
doGetOperatorSpecs(nextStream, specs);
}
}
}

/**
* Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
*
* @return <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
*/
public boolean hasWindowOrJoins() {
// Obtain the operator specs from the streamGraph
Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs();

Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream()
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN)
.collect(Collectors.toSet());

return windowOrJoinSpecs.size() != 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@
package org.apache.samza.operators.spec;

import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.triggers.AnyTrigger;
import org.apache.samza.operators.triggers.RepeatingTrigger;
import org.apache.samza.operators.triggers.TimeBasedTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.operators.util.OperatorJsonUtils;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


/**
Expand All @@ -34,6 +45,7 @@
*/
public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {

private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorSpec.class);
private final WindowInternal<M, WK, WV> window;
private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
private final int opId;
Expand Down Expand Up @@ -76,4 +88,53 @@ public int getOpId() {
public String getSourceLocation() {
return sourceLocation;
}

/**
* Get the default triggering interval for this {@link WindowOperatorSpec}
*
* This is defined as the GCD of all triggering intervals across all {@link TimeBasedTrigger}s configured for
* this {@link WindowOperatorSpec}.
*
* @return the default triggering interval
*/
public long getDefaultTriggerMs() {
List<TimeBasedTrigger> timerTriggers = new ArrayList<>();

if (window.getDefaultTrigger() != null) {
timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
}
if (window.getEarlyTrigger() != null) {
timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
}
if (window.getLateTrigger() != null) {
timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
}

LOG.info("Got {} timer triggers", timerTriggers.size());

List<Long> candidateDurations = timerTriggers.stream()
.map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
.collect(Collectors.toList());

return MathUtils.gcd(candidateDurations);
}

private List<TimeBasedTrigger> getTimeBasedTriggers(Trigger rootTrigger) {
List<TimeBasedTrigger> timeBasedTriggers = new ArrayList<>();
// traverse all triggers in the graph starting at the root trigger
if (rootTrigger instanceof TimeBasedTrigger) {
timeBasedTriggers.add((TimeBasedTrigger) rootTrigger);
} else if (rootTrigger instanceof RepeatingTrigger) {
// recurse on the underlying trigger
timeBasedTriggers.addAll(getTimeBasedTriggers(((RepeatingTrigger) rootTrigger).getTrigger()));
} else if (rootTrigger instanceof AnyTrigger) {
List<Trigger> subTriggers = ((AnyTrigger) rootTrigger).getTriggers();

for (Trigger subTrigger: subTriggers) {
// recurse on each sub-trigger
timeBasedTriggers.addAll(getTimeBasedTriggers(subTrigger));
}
}
return timeBasedTriggers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators.util;

import java.util.List;

public class MathUtils {

public static long gcd(long a, long b) {
// use the euclid gcd algorithm
while (b > 0) {
long temp = b;
b = a % b;
a = temp;
}
return a;
}

public static long gcd(List<Long> numbers) {
if (numbers == null) {
throw new IllegalArgumentException("Null list provided");
}
if (numbers.size() == 0) {
throw new IllegalArgumentException("List of size 0 provided");
}

long result = numbers.get(0);
for (int i = 1; i < numbers.size(); i++) {
result = gcd(result, numbers.get(i));
}
return result;
}
}
Loading

0 comments on commit e6cc3b7

Please sign in to comment.