Skip to content

Commit

Permalink
Update JobMonitor to use exponential back-off to check on job completion
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed May 14, 2018
1 parent 48201bb commit 79c3d06
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.genie.core.properties;

import lombok.Getter;
import lombok.Setter;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.Min;

/**
* Properties to configure an ExponentialBackOffTrigger.
*
* @author mprimi
* @since 3.3.9
*/

@Getter
@Setter
@Validated
public class ExponentialBackOffTriggerProperties {
@Min(value = 1)
private long minInterval = 100;
private long maxInterval = 10_000;
private float factor = 1.2f;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public class JobsProperties {

@NotNull
private JobsUsersProperties users = new JobsUsersProperties();

@NotNull
private ExponentialBackOffTriggerProperties completionCheckBackOff = new ExponentialBackOffTriggerProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.netflix.genie.core.properties;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
* Unit test for ExponentialBackOffTriggerProperties.
*
* @author mprimi
* @since 3.3.9
*/
public class ExponentialBackOffTriggerPropertiesUnitTest {

private ExponentialBackOffTriggerProperties properties;

/**
* Setup for the tests.
*/
@Before
public void setUp() {
properties = new ExponentialBackOffTriggerProperties();
}

/**
* Make sure the constructor sets defaults.
*/
@Test
public void canConstruct() {
Assert.assertEquals(100, properties.getMinInterval());
Assert.assertEquals(10_000, properties.getMaxInterval());
Assert.assertEquals(1.2, properties.getFactor(), 0.001);
}

/**
* Make sure we can set the minInterval field.
*/
@Test
public void canSetMinInterval() {
this.properties.setMinInterval(1234);
Assert.assertEquals(1234, this.properties.getMinInterval());
}

/**
* Make sure we can set the maxInterval field.
*/
@Test
public void canSetMaxInterval() {
this.properties.setMaxInterval(1234);
Assert.assertEquals(1234, this.properties.getMaxInterval());
}

/**
* Make sure we can set the factor field.
*/
@Test
public void canSetFactor() {
this.properties.setFactor(2.4f);
Assert.assertEquals(2.4f, this.properties.getFactor(), 0.001);
}
}
12 changes: 12 additions & 0 deletions genie-docs/src/docs/asciidoc/_properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ to work.
|The maximum number of active jobs a user is allowed to have. Once a user hits this limit, jobs submitted are rejected. This is property is ignored unless `genie.jobs.users.activeLimit.enabled` is set to true.
|100

|genie.jobs.completionCheckBackOff.minInterval
|The minimum time between checks for job completion in milliseconds. Must be greater than zero.
|100

|genie.jobs.completionCheckBackOff.maxInterval
|The maximum time between checks for job completion in milliseconds. This is a fallback value, the value used in most cases is specified as part of the `Command` entity for a particular job.
|10000

|genie.jobs.completionCheckBackOff.factor
|Multiplication factor that grows the delay between checks for job completions. Must be greater than 1.
|1.2

|genie.leader.enabled
|Whether this node should be the leader of the cluster or not. Should only be used if leadership is not being
determined by Zookeeper or other mechanism via Spring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.genie.core.events.JobFinishedReason;
import com.netflix.genie.core.events.KillJobEvent;
import com.netflix.genie.core.properties.JobsProperties;
import com.netflix.genie.core.util.ExponentialBackOffTrigger;
import com.netflix.genie.core.util.ProcessChecker;
import com.netflix.genie.core.util.UnixProcessChecker;
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
Expand All @@ -36,6 +37,7 @@
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.SystemUtils;
import org.springframework.scheduling.Trigger;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class JobMonitor extends NodeTask {
private final File stdErr;
private final long maxStdOutLength;
private final long maxStdErrLength;
private final Trigger trigger;

// Metrics
private final Counter successfulCheckRate;
Expand Down Expand Up @@ -112,6 +115,13 @@ public class JobMonitor extends NodeTask {
this.maxStdOutLength = jobsProperties.getMax().getStdOutSize();
this.maxStdErrLength = jobsProperties.getMax().getStdErrSize();

this.trigger = new ExponentialBackOffTrigger(
ExponentialBackOffTrigger.DelayType.FROM_PREVIOUS_SCHEDULING,
jobsProperties.getCompletionCheckBackOff().getMinInterval(),
execution.getCheckDelay().orElse(jobsProperties.getCompletionCheckBackOff().getMaxInterval()),
jobsProperties.getCompletionCheckBackOff().getFactor()
);

this.successfulCheckRate = registry.counter("genie.jobs.successfulStatusCheck.rate");
this.timeoutRate = registry.counter("genie.jobs.timeout.rate");
this.finishedRate = registry.counter("genie.jobs.finished.rate");
Expand Down Expand Up @@ -205,14 +215,14 @@ public void run() {
*/
@Override
public GenieTaskScheduleType getScheduleType() {
return GenieTaskScheduleType.FIXED_DELAY;
return GenieTaskScheduleType.TRIGGER;
}

/**
* {@inheritDoc}
*/
@Override
public long getFixedDelay() {
return this.execution.getCheckDelay().orElseThrow(IllegalArgumentException::new);
public Trigger getTrigger() {
return trigger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class JobRestControllerIntegrationTests extends RestControllerIntegration
private static final String JOB_CLUSTER_LINK_PATH = "$._links.cluster.href";
private static final String JOB_APPLICATIONS_LINK_PATH = "$._links.applications.href";

private static final long CHECK_DELAY = 1L;
private static final long CHECK_DELAY = 500L;

private static final String BASE_DIR
= "com/netflix/genie/web/controllers/JobRestControllerIntegrationTests/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,30 +347,30 @@ public void cantGetStatusIfErrorOnUnixLikeSystem() throws IOException {
*/
@Test
public void canGetScheduleType() {
Assert.assertThat(this.monitor.getScheduleType(), Matchers.is(GenieTaskScheduleType.FIXED_DELAY));
Assert.assertThat(this.monitor.getScheduleType(), Matchers.is(GenieTaskScheduleType.TRIGGER));
}

/**
* Make sure asking for a trigger isn't allowed.
* Make sure asking for a trigger is returns one.
*/
@Test(expected = UnsupportedOperationException.class)
public void cantGetTrigger() {
this.monitor.getTrigger();
@Test
public void canGetTrigger() {
Assert.assertNotNull(this.monitor.getTrigger());
}

/**
* Make sure asking for a trigger isn't allowed.
* Make sure asking for a fixed rate isn't allowed.
*/
@Test(expected = UnsupportedOperationException.class)
public void cantGetFixedRate() {
this.monitor.getFixedRate();
}

/**
* Make sure the fixed delay value is what we expect.
* Make sure asking for a fixed delay isn't allowed.
*/
@Test
public void canGetFixedDelay() {
@Test(expected = UnsupportedOperationException.class)
public void cantGetFixedDelay() {
Assert.assertThat(DELAY, Matchers.is(this.monitor.getFixedDelay()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -156,7 +157,7 @@ public void canAttachToRunningJobs() throws GenieException {
this.coordinator.onJobStarted(event1);
Mockito
.verify(this.scheduler, Mockito.times(1))
.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY));
.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class));

final Job j1 = Mockito.mock(Job.class);
Mockito.when(j1.getId()).thenReturn(Optional.of(job1Id));
Expand Down Expand Up @@ -186,7 +187,7 @@ public void canAttachToRunningJobs() throws GenieException {
.publishAsynchronousEvent(Mockito.any(JobFinishedEvent.class));
Mockito
.verify(this.scheduler, Mockito.times(3))
.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY));
.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class));
Assert.assertThat(this.coordinator.getNumActiveJobs(), Matchers.is(3));
Assert.assertThat(this.coordinator.getUsedMemory(), Matchers.is(3 * 1024));
}
Expand Down Expand Up @@ -234,7 +235,7 @@ public void canStartJobMonitor() {
final ScheduledFuture future = Mockito.mock(ScheduledFuture.class);

Mockito.when(
this.scheduler.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY))
this.scheduler.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class))
).thenReturn(future);

Assert.assertThat(this.coordinator.getNumActiveJobs(), Matchers.is(4));
Expand All @@ -257,7 +258,7 @@ public void canStartJobMonitor() {

Mockito
.verify(this.scheduler, Mockito.times(5))
.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY));
.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class));
}

/**
Expand Down Expand Up @@ -293,7 +294,7 @@ public void canStopJobMonitor() throws GenieException {
Mockito.when(future2.cancel(true)).thenReturn(false);

Mockito.when(
this.scheduler.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY))
this.scheduler.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class))
).thenReturn(future1, future2);

Assert.assertThat(this.coordinator.getNumActiveJobs(), Matchers.is(0));
Expand All @@ -311,7 +312,7 @@ public void canStopJobMonitor() throws GenieException {

Mockito
.verify(this.scheduler, Mockito.times(2))
.scheduleWithFixedDelay(Mockito.any(JobMonitor.class), Mockito.eq(DELAY));
.schedule(Mockito.any(JobMonitor.class), Mockito.any(Trigger.class));

Assert.assertThat(this.coordinator.getNumActiveJobs(), Matchers.is(2));
Assert.assertThat(this.coordinator.getUsedMemory(), Matchers.is(2048));
Expand Down

0 comments on commit 79c3d06

Please sign in to comment.