Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Application Default Credential if gcp.credential is absent #1531

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a226c75
Set version to 0.11.0-SNAPSHOT
Dec 15, 2020
d9fe89e
Add v0_11 as the target of CI.
Dec 16, 2020
f078faf
Merge pull request #1504 from yoyama/v0_11_fix-version
yoyama Dec 16, 2020
d1e3578
Fix CI to release snapshot for v0_11 branch.
Dec 16, 2020
f8dda61
Merge pull request #1505 from yoyama/v0_11-fix-ci-release-snapshot
yoyama Dec 16, 2020
f5a1dc7
Exclude v0_10 branch from snapshot release
Dec 21, 2020
c978727
Merge pull request #1508 from yoyama/disable-snapshot-release-of-v0_1…
yoyama Dec 21, 2020
fc02db4
Set `started_at` in group tasks as well.
komamitsu Jan 8, 2021
9abf713
Add some comments
komamitsu Jan 12, 2021
9fae04a
Merge pull request #1518 from treasure-data/started_at-in-group-tasks
komamitsu Jan 12, 2021
9206279
Add 'wait' operator
komamitsu Jan 12, 2021
68c16fe
Add test for `wait` operator
komamitsu Jan 14, 2021
4d00284
Add description about `wait` operator
komamitsu Jan 14, 2021
56e349d
Refactoring
komamitsu Jan 14, 2021
fc87d47
Add `blocking` and `poll_interval` options
komamitsu Jan 15, 2021
e9343cb
Small refactoring
komamitsu Jan 15, 2021
bc268c1
Replace nullable value with Optional
komamitsu Jan 15, 2021
a7a01db
Make the key name clearer
komamitsu Jan 15, 2021
52c411a
Fix typo
komamitsu Jan 15, 2021
d61a7bd
Add missing test case
komamitsu Jan 15, 2021
162c686
Refactoring
komamitsu Jan 21, 2021
81e3b72
Use a safer expected value
komamitsu Jan 21, 2021
cc7260e
Merge pull request #1520 from treasure-data/wait-oprator
komamitsu Jan 21, 2021
b7562ff
use application default credential if gcp.credential is absent
rhase Feb 2, 2021
15d3da3
fixed indentation
rhase Feb 4, 2021
a053642
add test
rhase Feb 16, 2021
2c3f593
fix indentation
rhase Feb 16, 2021
5c9a981
bugfix
rhase Feb 16, 2021
0c2e9a1
consideration for windows
rhase Feb 16, 2021
532b6df
organize imports
rhase Feb 16, 2021
ee00c6d
Merge branch 'master' into feature/use-gcp-application-default-creden…
rhase May 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Set started_at in group tasks as well.
But just doing that would change current task TTL timeout behavior.
So, this commit also takes care of it.
  • Loading branch information
komamitsu committed Jan 8, 2021
commit fc02db49c902af010e168ba3fee2a669b1fbcf8d
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,11 @@ public int trySetChildrenBlockedToReadyOrShortCircuitPlannedOrCanceled(long task
" when task_type = " + TaskType.GROUPING_ONLY + " then " + TaskStateCode.PLANNED_CODE +
" when " + bitAnd("state_flags", Integer.toString(TaskStateFlags.CANCEL_REQUESTED)) + " != 0 then " + TaskStateCode.CANCELED_CODE +
" else " + TaskStateCode.READY_CODE +
" end" +
" end, " +
" started_at = case" +
// Update `started_at` only when it's a group task and the column is null.
// Non group tasks' `started_at` are updated by TaskControlStore.setStartedState.
" when task_type = " + TaskType.GROUPING_ONLY + " then coalesce(started_at, now()) else started_at end" +
" where state = " + TaskStateCode.BLOCKED_CODE +
" and parent_id = :parentId" +
" and exists (" +
Expand Down Expand Up @@ -1550,6 +1554,9 @@ public Optional<StoredSessionAttempt> getLastAttemptIfExists(long sessionId)
public <T> T insertRootTask(long attemptId, Task task, SessionBuilderAction<T> func)
{
long taskId = dao.insertTask(attemptId, task.getParentId().orNull(), task.getTaskType().get(), task.getState().get(), task.getStateFlags().get()); // tasks table don't have unique index
// Root task's `started_at` isn't updated by trySetChildrenBlockedToReadyOrShortCircuitPlannedOrCanceled().
// So this line is just for setting `started_at` of root task although there may be room for optimization.
dao.setStartedState(taskId, task.getState().get(), task.getState().get());
dao.insertTaskDetails(taskId, task.getFullName(), task.getConfig().getLocal(), task.getConfig().getExport());
dao.insertEmptyTaskStateDetails(taskId);
return func.call(new DatabaseTaskControlStore(handle), taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -22,8 +23,11 @@
import io.digdag.client.config.ConfigUtils;
import io.digdag.core.DigdagEmbed;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.session.ArchivedTask;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.StoredTask;
import io.digdag.core.session.TaskStateCode;
import io.digdag.metrics.StdDigdagMetrics;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.util.RetryControl;
Expand Down Expand Up @@ -434,6 +438,61 @@ public void randomEnqueue()
}
}

@Test
public void verifySubmittedTasks()
throws Exception
{
Instant startTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
StoredSessionAttemptWithSession attempt = runWorkflow("nested", loadYamlResource("/io/digdag/core/workflow/nested.dig"));
embed.getLocalSite().runUntilDone(attempt.getId());
Instant finishTime = Instant.now().plusSeconds(1).truncatedTo(ChronoUnit.SECONDS);

List<ArchivedTask> tasks = tm.begin(() -> embed.getLocalSite().getSessionStore().getTasksOfAttempt(attempt.getId()));
assertEquals(3, tasks.size());

Instant rootTaskStartedAt;
{
// Root task
StoredTask task = tasks.get(0);
assertEquals("+nested", task.getFullName());
assertTrue(task.getTaskType().isGroupingOnly());
assertEquals(TaskStateCode.SUCCESS, task.getState());
assertThat(task.getUpdatedAt(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getUpdatedAt(), is(lessThanOrEqualTo(finishTime)));
assertThat(task.getStartedAt().get(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getStartedAt().get(), is(lessThanOrEqualTo(finishTime)));
rootTaskStartedAt = task.getStartedAt().get().truncatedTo(ChronoUnit.SECONDS);
}

Instant parentTaskStartedAt;
{
// Group task
StoredTask task = tasks.get(1);
assertEquals("+nested+parent", task.getFullName());
assertTrue(task.getTaskType().isGroupingOnly());
assertEquals(TaskStateCode.SUCCESS, task.getState());
assertThat(task.getUpdatedAt(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getUpdatedAt(), is(lessThanOrEqualTo(finishTime)));
assertThat(task.getStartedAt().get(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getStartedAt().get(), is(lessThanOrEqualTo(finishTime)));
assertThat(task.getStartedAt().get(), is(greaterThanOrEqualTo(rootTaskStartedAt)));
parentTaskStartedAt = task.getStartedAt().get().truncatedTo(ChronoUnit.SECONDS);
}

{
// Nested non-group task
StoredTask task = tasks.get(2);
assertEquals("+nested+parent+child", task.getFullName());
assertFalse(task.getTaskType().isGroupingOnly());
assertEquals(TaskStateCode.SUCCESS, task.getState());
assertThat(task.getUpdatedAt(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getUpdatedAt(), is(lessThanOrEqualTo(finishTime)));
assertThat(task.getStartedAt().get(), is(greaterThanOrEqualTo(startTime)));
assertThat(task.getStartedAt().get(), is(lessThanOrEqualTo(finishTime)));
assertThat(task.getStartedAt().get(), is(greaterThanOrEqualTo(parentTaskStartedAt)));
}
}

private Optional<String> getResult(String fileName, TemporaryFolder folder)
{
try {
Expand All @@ -445,15 +504,15 @@ private Optional<String> getResult(String fileName, TemporaryFolder folder)
}
}

private void runWorkflow(String workflowName, Config config)
private StoredSessionAttemptWithSession runWorkflow(String workflowName, Config config)
throws Exception
{
WorkflowTestingUtils.runWorkflow(embed, folder.getRoot().toPath(), workflowName, config);
return WorkflowTestingUtils.runWorkflow(embed, folder.getRoot().toPath(), workflowName, config);
}

private void runWorkflow(DigdagEmbed embed, String workflowName, Config config)
private StoredSessionAttemptWithSession runWorkflow(DigdagEmbed embed, String workflowName, Config config)
throws Exception
{
WorkflowTestingUtils.runWorkflow(embed, folder.getRoot().toPath(), workflowName, config);
return WorkflowTestingUtils.runWorkflow(embed, folder.getRoot().toPath(), workflowName, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
+parent:

+child:
echo>: Hello
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ public class WorkflowExecutionTimeoutEnforcer
private static final Duration DEFAULT_REAPING_INTERVAL = Duration.ofSeconds(5);

// This is similar to TaskStateCode.notDoneStates() but BLOCKED, PLANNED, and READY are excluded.
// BLOCKED and PLANNED are excluded because there're other running tasks that should be enforced instead.
// READY is excluded the workflow itself is working correctly and number of threads is insufficient.
//
// - BLOCKED and PLANNED are excluded because there're other running tasks that should be enforced instead.
// - READY is excluded the workflow itself is working correctly and number of threads is insufficient.
// - GROUP_RETRY_WAITING is excluded because this status is supposed to be immediately changed to PLANNED
// and long running group tasks with this state could happen only due to WorkflowExecutor trouble
// not due to workflow definitions.
private static final TaskStateCode[] TASK_TTL_ENFORCED_STATE_CODES = new TaskStateCode[] {
TaskStateCode.RETRY_WAITING,
TaskStateCode.GROUP_RETRY_WAITING,
TaskStateCode.RUNNING,
};

Expand Down