Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Application Default Credential if gcp.credential is absent #1531

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a226c75
Set version to 0.11.0-SNAPSHOT
Dec 15, 2020
d9fe89e
Add v0_11 as the target of CI.
Dec 16, 2020
f078faf
Merge pull request #1504 from yoyama/v0_11_fix-version
yoyama Dec 16, 2020
d1e3578
Fix CI to release snapshot for v0_11 branch.
Dec 16, 2020
f8dda61
Merge pull request #1505 from yoyama/v0_11-fix-ci-release-snapshot
yoyama Dec 16, 2020
f5a1dc7
Exclude v0_10 branch from snapshot release
Dec 21, 2020
c978727
Merge pull request #1508 from yoyama/disable-snapshot-release-of-v0_1…
yoyama Dec 21, 2020
fc02db4
Set `started_at` in group tasks as well.
komamitsu Jan 8, 2021
9abf713
Add some comments
komamitsu Jan 12, 2021
9fae04a
Merge pull request #1518 from treasure-data/started_at-in-group-tasks
komamitsu Jan 12, 2021
9206279
Add 'wait' operator
komamitsu Jan 12, 2021
68c16fe
Add test for `wait` operator
komamitsu Jan 14, 2021
4d00284
Add description about `wait` operator
komamitsu Jan 14, 2021
56e349d
Refactoring
komamitsu Jan 14, 2021
fc87d47
Add `blocking` and `poll_interval` options
komamitsu Jan 15, 2021
e9343cb
Small refactoring
komamitsu Jan 15, 2021
bc268c1
Replace nullable value with Optional
komamitsu Jan 15, 2021
a7a01db
Make the key name clearer
komamitsu Jan 15, 2021
52c411a
Fix typo
komamitsu Jan 15, 2021
d61a7bd
Add missing test case
komamitsu Jan 15, 2021
162c686
Refactoring
komamitsu Jan 21, 2021
81e3b72
Use a safer expected value
komamitsu Jan 21, 2021
cc7260e
Merge pull request #1520 from treasure-data/wait-oprator
komamitsu Jan 21, 2021
b7562ff
use application default credential if gcp.credential is absent
rhase Feb 2, 2021
15d3da3
fixed indentation
rhase Feb 4, 2021
a053642
add test
rhase Feb 16, 2021
2c3f593
fix indentation
rhase Feb 16, 2021
5c9a981
bugfix
rhase Feb 16, 2021
0c2e9a1
consideration for windows
rhase Feb 16, 2021
532b6df
organize imports
rhase Feb 16, 2021
ee00c6d
Merge branch 'master' into feature/use-gcp-application-default-creden…
rhase May 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add blocking and poll_interval options
to `wait` operator
  • Loading branch information
komamitsu committed Jan 15, 2021
commit fc87d473faed1ab385e0bfcf2163ce07a3cbe3c2
22 changes: 21 additions & 1 deletion digdag-docs/src/operators/wait.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**wait>** operator waits a specific duration in the workflow.

This operator is similar to `sh>: sleep 5`, but this is a non-blocking operator and should be always available even in security-restricted environment.
This operator seems similar to `sh>: sleep 5`, but this works in both blocking and non-blocking modes and should be always available even in security-restricted environment.

+wait_10s:
wait>: 10s
Expand All @@ -13,3 +13,23 @@ This operator is similar to `sh>: sleep 5`, but this is a non-blocking operator

Duration to wait.

* **blocking**: BOOLEAN

Digdag agent internally executes this operator in blocking mode and the agent keeps waiting if this option is set to true (default: false)

Examples:

```
blocking: true
```

* **poll_interval**: DURATION

This option is used only with non-blocking mode. If it's set, digdag agent internally gets awake and checks at a specific interval if the duration has passed. If not set, digdag agent gets awake only when a specific duration passes.

Examples:

```
poll_interval: 5s
```

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.digdag.standards.operator;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigElement;
Expand All @@ -18,6 +19,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;


public class WaitOperatorFactory
Expand Down Expand Up @@ -57,21 +59,53 @@ private WaitOperator(OperatorContext context)
this.workspace = Workspace.ofTaskRequest(context.getProjectPath(), request);
}

public TaskResult run()
private Duration duration(Config config)
{
Duration duration;
try {
Config config = request.getConfig();
duration = Durations.parseDuration(config.get("_command", String.class));
if (duration == null) {
throw new ConfigException("Wait duration should be specified");
}
catch (RuntimeException re) {
throw new ConfigException("Invalid configuration", re);
}
logger.debug("wait duration: {}", duration);
return duration;
}

private boolean blocking(Config config)
{
boolean blocking = config.get("blocking", boolean.class, false);
logger.debug("wait blocking mode: {}", blocking);
return blocking;
}

private Duration pollInterval(Config config)
{
Duration pollInterval;
try {
Optional<String> pollIntervalStr = config.getOptional("poll_interval", String.class);
if (!pollIntervalStr.isPresent()) {
return null;
}
pollInterval = Durations.parseDuration(pollIntervalStr.get());
}
catch (RuntimeException re) {
throw new ConfigException("Invalid configuration", re);
}
logger.debug("wait poll_interval: {}", pollInterval);
return pollInterval;
}

logger.debug("wait duration: {}", duration);
public TaskResult run()
{
Config config = request.getConfig();

Duration duration = duration(config);
boolean blocking = blocking(config);
Duration pollInterval = pollInterval(config);
if (blocking && pollInterval != null) {
throw new ConfigException("poll_interval can't be specified with blocking:true");
}

Instant now = Instant.now();
Instant start = request.getLastStateParams()
Expand All @@ -81,19 +115,39 @@ public TaskResult run()

if (now.isAfter(start.plusMillis(duration.toMillis()))) {
logger.info("wait finished. start:{}", start);
return TaskResult.empty(request);
}

// Wait at least 1 second
long waitDurationSeconds = Math.max(
Duration.between(now, start.plusMillis(duration.toMillis())).getSeconds(),
1);

if (blocking) {
logger.debug("waiting for {}s", waitDurationSeconds);
try {
TimeUnit.SECONDS.sleep(waitDurationSeconds);
return TaskResult.empty(request);
}
catch (InterruptedException e) {
// The blocking wait will be restart from the beginning when interrupted.
//
// There is room to improve this by making the task resume from when interrupted.
// But this operator, especially blocking mode, is for development use,
// so we'll go with this simple implementation for now.
throw new RuntimeException("`wait` operator with blocking mode is interrupted and this will be restart from the beginning of the wait");
}
}
else {
// Wait at least 1 second
long nextPollSecs = Math.max(
Duration.between(now, start.plusMillis(duration.toMillis())).getSeconds(),
1);
logger.debug("polling after {}s", nextPollSecs);
if (pollInterval != null) {
waitDurationSeconds = pollInterval.getSeconds();
}
logger.debug("polling after {}s", waitDurationSeconds);
throw TaskExecutionException.ofNextPolling(
(int)nextPollSecs,
(int) waitDurationSeconds,
ConfigElement.copyOf(
request.getLastStateParams().set(WAIT_START_TIME_PARAM, start.toEpochMilli())));
}
return TaskResult.empty(request);
}
}
}
52 changes: 44 additions & 8 deletions digdag-tests/src/test/java/acceptance/WaitIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public ExecResult(CommandStatus commandStatus, Duration duration)
}
}


private ExecResult runAndMonitorDuration(Supplier<CommandStatus> task)
{
Instant start = Instant.now();
Expand All @@ -48,30 +47,67 @@ private ExecResult runAndMonitorDuration(Supplier<CommandStatus> task)
return new ExecResult(commandStatus, duration);
}

@Test
public void testRun()
private void testWorkflow(String workflowName, int expectedDuration)
throws Exception
{
String nowaitResourcePath = "acceptance/wait/nowait.dig";
String targetResourcePath = "acceptance/wait/" + workflowName;

Duration baselineDuration;
{
copyResource("acceptance/wait/nowait.dig", root().resolve("nowait.dig"));
copyResource(nowaitResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "nowait.dig"));
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
assertThat(status.errUtf8(), status.code(), is(0));
baselineDuration = result.duration;
}

{
copyResource("acceptance/wait/wait_10s.dig", root().resolve("wait_10s.dig"));
copyResource(targetResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "wait_10s.dig"));
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
assertThat(status.errUtf8(), status.code(), is(0));
assertThat(result.duration, greaterThan(baselineDuration));
assertThat(result.duration, lessThan(
// Actual wait duration can be longer than the specified 10 seconds for some reason
baselineDuration.plusSeconds((long) (10 * 1.5))));
baselineDuration.plusSeconds((long) (expectedDuration * 1.5))));
}
}

@Test
public void testSimpleVersion()
throws Exception
{
testWorkflow("wait.dig", 10);
}

@Test
public void testBlockingMode()
throws Exception
{
testWorkflow("wait_blocking.dig", 10);
}

@Test
public void testPollInterval()
throws Exception
{
testWorkflow("wait_poll_interval.dig", 10);
}

@Test
public void testInvalidConfig()
throws Exception
{
String targetResourcePath = "acceptance/wait/wait_invalid_config.dig";

copyResource(targetResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
// The workflow contains a conflict configuration and it should fail.
assertThat(status.errUtf8(), status.code(), is(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
blocking: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
+wait:
wait>: 10s
blocking: true
poll_interval: 5s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
blocking: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
poll_interval: 5s