Skip to content

Commit

Permalink
YARN-4390. Do surgical preemption based on reserved container in Capa…
Browse files Browse the repository at this point in the history
…cityScheduler. Contributed by Wangda Tan
  • Loading branch information
jian-he committed May 5, 2016
1 parent 35cf503 commit bb62e05
Show file tree
Hide file tree
Showing 31 changed files with 2,565 additions and 906 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;

Expand Down Expand Up @@ -84,10 +85,17 @@ private class PreemptionChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
//invoke the preemption policy at a regular pace
//the policy will generate preemption or kill events
//managed by the dispatcher
invokePolicy();
try {
//invoke the preemption policy at a regular pace
//the policy will generate preemption or kill events
//managed by the dispatcher
invokePolicy();
} catch (YarnRuntimeException e) {
LOG.error("YarnRuntimeException raised while executing preemption"
+ " checker, skip this run..., exception=", e);
}

// Wait before next run
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand All @@ -40,9 +42,9 @@ public static Map<String, Resource> getResToObtainByPartitionForLeafQueue(

// Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
if (Resources.greaterThan(context.getResourceCalculator(),
clusterResource, qT.actuallyToBePreempted, Resources.none())) {
clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
resToObtainByPartition.put(qT.partition,
Resources.clone(qT.actuallyToBePreempted));
Resources.clone(qT.getActuallyToBePreempted()));
}
}

Expand All @@ -62,4 +64,33 @@ public static boolean isContainerAlreadySelected(RMContainer container,
}
return containers.contains(container);
}

public static void deductPreemptableResourcesBasedSelectedCandidates(
CapacitySchedulerPreemptionContext context,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
for (Set<RMContainer> containers : selectedCandidates.values()) {
for (RMContainer c : containers) {
SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
c.getAllocatedNode());
if (null == schedulerNode) {
continue;
}

String partition = schedulerNode.getPartition();
String queue = c.getQueueName();
TempQueuePerPartition tq = context.getQueueByPartition(queue,
partition);

Resource res = c.getReservedResource();
if (null == res) {
res = c.getAllocatedResource();
}

if (null != res) {
tq.deductActuallyToBePreempted(context.getResourceCalculator(),
tq.totalPartitionResource, res);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -55,7 +54,7 @@ public class FifoCandidatesSelector
super(preemptionContext);

preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext);
preemptionContext, false);
}

@Override
Expand All @@ -66,8 +65,13 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed);

Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
new HashMap<>();
// Previous selectors (with higher priority) could have already
// selected containers. We need to deduct preemptable resources
// based on already selected candidates.
CapacitySchedulerPreemptionUtils
.deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
selectedCandidates);

List<RMContainer> skippedAMContainerlist = new ArrayList<>();

// Loop all leaf queues
Expand Down Expand Up @@ -109,7 +113,7 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
continue;
}
boolean preempted = tryPreemptContainerAndDeductResToObtain(
resToObtainByPartition, c, clusterResource, preemptMap,
resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptionAllowed);
if (!preempted) {
continue;
Expand All @@ -132,7 +136,7 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
}

preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, preemptMap,
skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed);
}

Expand All @@ -144,13 +148,13 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
leafQueue.getAbsoluteCapacity()),
leafQueue.getMaxAMResourcePerQueuePercent());

preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed);
}
}

return preemptMap;
return selectedCandidates;
}

/**
Expand Down Expand Up @@ -236,9 +240,9 @@ private boolean tryPreemptContainerAndDeductResToObtain(
resourceToObtainByPartitions.remove(nodePartition);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Marked container=" + rmContainer.getContainerId()
+ " in partition=" + nodePartition
+ " to be preemption candidates");
LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
.getContainerId() + " from partition=" + nodePartition + " queue="
+ rmContainer.getQueueName() + " to be preemption candidates");
}
// Add to preemptMap
addToPreemptMap(preemptMap, attemptId, rmContainer);
Expand Down
Loading

0 comments on commit bb62e05

Please sign in to comment.