Skip to content

Commit

Permalink
YARN-8774. Memory leak when CapacityScheduler allocates from reserved…
Browse files Browse the repository at this point in the history
… container with non-default label. Contributed by Tao Yang.

(cherry picked from commit d7d0e55)
  • Loading branch information
Eric E Payne committed Sep 28, 2018
1 parent 463d1c3 commit 54ef6e2
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,25 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
// When reserving container
RMContainer updatedContainer = reservedContainer;
if (updatedContainer == null) {
SchedulingPlacementSet<FiCaSchedulerNode> ps =
application.getAppSchedulingInfo()
.getSchedulingPlacementSet(schedulerKey);
if (null == ps) {
LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+ " for application=" + application.getApplicationId()
+ " schedulerRequestKey=" + schedulerKey);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
ActivityState.REJECTED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
updatedContainer = new RMContainerImpl(container, schedulerKey,
application.getApplicationAttemptId(), node.getNodeID(),
application.getAppSchedulingInfo().getUser(), rmContext);
application.getAppSchedulingInfo().getUser(), rmContext,
ps.getPrimaryRequestedNodePartition());
}
allocationResult.updatedContainer = updatedContainer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,68 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}

@Test (timeout = 120000)
public void testRMContainerLeakInLeafQueue() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("x")));

// inject node label manager
MockRM rm1 =
new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};

rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = x

// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1");
MockRM.launchAndRegisterAM(app2, rm1, nm1);

// request a container.
am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());

CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");

// Do node heartbeats 1 time
// scheduler will reserve a container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());

// kill app2 then do node heartbeat 1 time
// scheduler will allocate a container from the reserved container on nm1
rm1.killApp(app2.getApplicationId());
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());

// After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should
// be clean, otherwise resource leak happened
rm1.killApp(app1.getApplicationId());
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size());

rm1.close();
}

private void checkPendingResource(MockRM rm, int priority,
ApplicationAttemptId attemptId, int memory) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
Expand Down

0 comments on commit 54ef6e2

Please sign in to comment.