Skip to content

Commit

Permalink
Merge pull request treasure-data#294 from treasure-data/attempt-limit
Browse files Browse the repository at this point in the history
Limit number of active attempts
  • Loading branch information
frsyuki committed Sep 23, 2016
2 parents 49f7ae1 + efde015 commit 1d4da9c
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 9 deletions.
11 changes: 10 additions & 1 deletion digdag-core/src/main/java/io/digdag/core/Limits.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@ public class Limits
private static final long MAX_WORKFLOW_TASKS = Long.valueOf(
System.getProperty("io.digdag.limits.maxWorkflowTasks", "1000"));

private static final long MAX_ATTEMPTS = Long.valueOf(
System.getProperty("io.digdag.limits.maxAttempts", "100"));

// TODO (dano): this should be configurable by config file etc and not just system property

public static long maxWorkflowTasks() {
public static long maxWorkflowTasks()
{
return MAX_WORKFLOW_TASKS;
}

public static long maxAttempts()
{
return MAX_ATTEMPTS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,12 @@ public DatabaseTaskControlStore(Handle handle)
}

@Override
public long getTaskCount(long attemptId)
public long getTaskCountOfAttempt(long attemptId)
{
long count = handle.createQuery(
"select count(*) from tasks t" +
" where t.attempt_id = :attemptId"
)
)
.bind("attemptId", attemptId)
.mapTo(long.class)
.first();
Expand Down Expand Up @@ -944,6 +944,20 @@ public DatabaseSessionStore(int siteId)
this.siteId = siteId;
}

public long getActiveAttemptCount()
{
return autoCommit((handle, dao) ->
handle.createQuery(
"select count(*) from session_attempts" +
" where site_id = :siteId" +
" and " + bitAnd("state_flags", Integer.toString(AttemptStateFlags.DONE_CODE)) + " = 0"
)
.bind("siteId", siteId)
.mapTo(long.class)
.first()
);
}

@Override
public <T> T putAndLockSession(Session session, SessionLockAction<T> func)
throws ResourceConflictException, ResourceNotFoundException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredWorkflowDefinitionWithProject;
import io.digdag.core.workflow.AttemptLimitExceededException;
import io.digdag.core.workflow.SessionAttemptConflictException;
import io.digdag.core.session.Session;
import io.digdag.core.session.AttemptStateFlags;
Expand Down Expand Up @@ -130,11 +131,18 @@ private boolean runSchedule(ScheduleControl lockedSched)
ScheduleTime nextTime = sr.nextScheduleTime(sched.getNextScheduleTime());
return lockedSched.tryUpdateNextScheduleTime(nextTime);
}
catch (AttemptLimitExceededException ex) {
logger.info("Number of attempts exceed limit. Pending this schedule for 10 minutes: {}", sched, ex);
ScheduleTime nextTime = ScheduleTime.of(
sched.getNextScheduleTime(),
ScheduleTime.alignedNow().plusSeconds(600));
return lockedSched.tryUpdateNextScheduleTime(nextTime);
}
catch (RuntimeException ex) {
logger.error("Error during scheduling. Pending this schedule for 1 hour: {}", sched, ex);
ScheduleTime nextTime = ScheduleTime.of(
sched.getNextScheduleTime(),
sched.getNextRunTime().plusSeconds(3600));
ScheduleTime.alignedNow().plusSeconds(3600));
return lockedSched.tryUpdateNextScheduleTime(nextTime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ List<StoredSessionAttemptWithSession> getOtherAttempts(long attemptId)

List<ArchivedTask> getTasksOfAttempt(long attemptId);

long getActiveAttemptCount();

interface SessionLockAction <T>
{
T call(SessionControlStore store, StoredSession storedSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public interface TaskControlStore
{
long getTaskCount(long attemptId);
long getTaskCountOfAttempt(long attemptId);

long addSubtask(long attemptId, Task task);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.digdag.core.workflow;

/**
* An exception thrown when adding more attempts to a site than limit.
*
* Actual number of attempts is not guaranteed because limitation of number of
* attempts is based on a slight old transaction.
*/
public class AttemptLimitExceededException
extends LimitExceededException
{
// TODO (sada): should this be a checked exception?

public AttemptLimitExceededException(String message)
{
super(message);
}

public AttemptLimitExceededException(Throwable cause)
{
super(cause);
}

public AttemptLimitExceededException(String message, Throwable cause)
{
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static long addTasks(TaskControlStore store,

// Limit the total number of tasks in a session.
// Note: This is racy and should not be relied on to guarantee that the limit is not exceeded.
long taskCount = store.getTaskCount(attemptId);
long taskCount = store.getTaskCountOfAttempt(attemptId);
if (taskCount + tasks.size() > Limits.maxWorkflowTasks()) {
throw new TaskLimitExceededException("Too many tasks. Limit: " + Limits.maxWorkflowTasks() + ", Current: " + taskCount + ", Adding: " + tasks.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

/**
* An exception thrown when adding more tasks to a session attempt would cause the task limit to be exceeded.
*
* Actual number of tasks is not guaranteed because limitation of number of
* tasks is based on a slight old transaction.
*/
public class TaskLimitExceededException
extends LimitExceededException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.Limits;
import io.digdag.core.agent.AgentId;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceConflictException;
Expand All @@ -20,6 +21,7 @@
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionAttempt;
import io.digdag.core.session.SessionMonitor;
import io.digdag.core.session.SessionStore;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
Expand Down Expand Up @@ -248,8 +250,14 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar

StoredSessionAttemptWithSession stored;
try {
stored = sm
.getSessionStore(siteId)
SessionStore ss = sm.getSessionStore(siteId);

long activeAttempts = ss.getActiveAttemptCount();
if (activeAttempts + 1 > Limits.maxAttempts()) {
throw new AttemptLimitExceededException("Too many attempts running. Limit: " + Limits.maxAttempts() + ", Current: " + activeAttempts);
}

stored = ss
// putAndLockSession + insertAttempt might be able to be faster by combining them into one method and optimize using a single SQL with CTE
.putAndLockSession(session, (store, storedSession) -> {
StoredProject proj = rm.getProjectStore(siteId).getProjectById(projId);
Expand Down
135 changes: 135 additions & 0 deletions digdag-tests/src/test/java/acceptance/AttemptLimitIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package acceptance;

import com.google.common.base.Optional;
import io.digdag.client.DigdagClient;
import io.digdag.client.api.RestSchedule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import utils.CommandStatus;
import utils.TemporaryDigdagServer;

import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import static utils.TestUtils.copyResource;
import static utils.TestUtils.main;
import static utils.TestUtils.expect;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeThat;
import static java.time.temporal.ChronoUnit.HOURS;

public class AttemptLimitIT
{
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule
public TemporaryDigdagServer server = TemporaryDigdagServer.builder()
.inProcess(false) // setting system property doesn't work with in-process mode
.systemProperty("io.digdag.limits.maxAttempts", "3")
.build();

private Path config;
private Path projectDir;
private DigdagClient client;

@Before
public void setUp()
throws Exception
{
projectDir = folder.getRoot().toPath().resolve("foobar");
config = folder.newFile().toPath();

client = DigdagClient.builder()
.host(server.host())
.port(server.port())
.build();
}

@Test
public void startFailsWithTooManyAttempts()
throws Exception
{
// Create new project
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.code(), is(0));

copyResource("acceptance/attempt_limit/hourly_sleep.dig", projectDir.resolve("hourly_sleep.dig"));

{
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"foobar",
"-c", config.toString(),
"-e", server.endpoint());
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
}

// Start 3 sessions
for (int i = 0; i < 3; i++) {
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"foobar", "hourly_sleep",
"--session", "2016-01-0" + (i + 1));
assertThat(startStatus.errUtf8(), startStatus.code(), is(0));
}

// Next attempt fails with 400 Bad Request
{
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"foobar", "hourly_sleep",
"--session", "2016-01-04");
assertThat(startStatus.errUtf8(), startStatus.code(), is(1));
assertThat(startStatus.errUtf8(), containsString("Too many attempts running"));
assertThat(startStatus.errUtf8(), containsString("\"status\":400"));
}
}

@Test
public void scheduleWithAttemptLimit()
throws Exception
{
// Create a new project
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.code(), is(0));

// Push hourly schedule from now minus 10 hours
Instant startTime = Instant.now().minus(Duration.ofHours(10)).truncatedTo(HOURS);

copyResource("acceptance/attempt_limit/hourly_sleep.dig", projectDir.resolve("hourly_sleep.dig"));
{
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"foobar",
"-c", config.toString(),
"-e", server.endpoint(),
"--schedule-from", Long.toString(startTime.getEpochSecond()));
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
}

// Wait until next run time becomes later than now
expect(Duration.ofMinutes(5), () -> {
List<RestSchedule> scheds = client.getSchedules();
return scheds.size() > 0 && scheds.get(0).getNextRunTime().isAfter(Instant.now());
});

// Number of actually submitted sessions should be 3 = maxAttempts
assertThat(client.getSessionAttempts(Optional.absent()).size(), is(3));

// Although next run time > now, next schedule time is 3-attempt later than start time
assertThat(client.getSchedules().get(0).getNextScheduleTime().toInstant(), is(startTime.plus(Duration.ofHours(3))));
}
}
13 changes: 12 additions & 1 deletion digdag-tests/src/test/java/utils/TemporaryDigdagServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class TemporaryDigdagServer

private final boolean inProcess;
private final Map<String, String> environment;
private final Properties properties;

private Path workdir;
private Process serverProcess;
Expand Down Expand Up @@ -123,6 +124,7 @@ public TemporaryDigdagServer(Builder builder)
this.extraArgs = ImmutableList.copyOf(Objects.requireNonNull(builder.args, "args"));
this.inProcess = builder.inProcess;
this.environment = ImmutableMap.copyOf(builder.environment);
this.properties = builder.properties;

this.executor = Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY);

Expand Down Expand Up @@ -310,6 +312,9 @@ public void start()
if (version.isPresent()) {
processArgs.add("-D" + Version.VERSION_PROPERTY + "=" + version.get());
}
for (String key : properties.stringPropertyNames()) {
processArgs.add("-D" + key + "=" + properties.getProperty(key));
}
if (!isNullOrEmpty(JACOCO_JVM_ARG)) {
processArgs.add(JACOCO_JVM_ARG);
}
Expand Down Expand Up @@ -669,7 +674,6 @@ public static TemporaryDigdagServer of(Version version)

public static class Builder
{

private Builder()
{
}
Expand All @@ -678,6 +682,7 @@ private Builder()
private Optional<Version> version = Optional.absent();
private List<String> configuration = new ArrayList<>();
private Map<String, String> environment = new HashMap<>();
private Properties properties = new Properties();
private boolean inProcess = IN_PROCESS_DEFAULT;
private byte[] stdin = new byte[0];

Expand Down Expand Up @@ -726,6 +731,12 @@ public Builder inProcess(boolean inProcess)
return this;
}

public Builder systemProperty(String key, String value)
{
properties.setProperty(key, value);
return this;
}

public TemporaryDigdagServer build()
{
return new TemporaryDigdagServer(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
timezone: UTC

schedule:
hourly>: 00:00

+foo:
sh>: sleep 20

0 comments on commit 1d4da9c

Please sign in to comment.