Skip to content

Commit

Permalink
YARN-5716. Add global scheduler interface definition and update Capac…
Browse files Browse the repository at this point in the history
…ityScheduler to use it. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Nov 7, 2016
1 parent acd509d commit de3b4aa
Show file tree
Hide file tree
Showing 49 changed files with 4,212 additions and 1,334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,13 @@
    </Or>
<Bug pattern="VO_VOLATILE_INCREMENT" />
</Match>


<!-- Ignore false alert for UL_UNRELEASED_LOCK_EXCEPTION_PATH -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler$ResourceCommitterService"/>
<Method name="run" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
Expand All @@ -46,6 +47,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {

ContainerId getContainerId();

void setContainerId(ContainerId containerId);

ApplicationAttemptId getApplicationAttemptId();

RMContainerState getState();
Expand Down Expand Up @@ -105,4 +108,14 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
* @return If the container was allocated remotely.
*/
boolean isRemotelyAllocated();

/*
* Return reserved resource for reserved containers, return allocated resource
* for other container
*/
Resource getAllocatedOrReservedResource();

boolean completed();

NodeId getNodeId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
RMContainerEvent> stateMachine;
private final ReadLock readLock;
private final WriteLock writeLock;
private final ContainerId containerId;
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
private final Container container;
Expand Down Expand Up @@ -224,7 +223,6 @@ public RMContainerImpl(Container container,
RMContext rmContext, long creationTime, String nodeLabelExpression,
boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
this.container = container;
this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
Expand Down Expand Up @@ -255,15 +253,15 @@ public RMContainerImpl(Container container,
// containers. If false, and if this container is marked as the AM, metrics
// will still be published for this container, but that calculation happens
// later.
if (saveNonAMContainerMetaInfo) {
if (saveNonAMContainerMetaInfo && null != container.getId()) {
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
}

@Override
public ContainerId getContainerId() {
return this.containerId;
return this.container.getId();
}

@Override
Expand Down Expand Up @@ -356,8 +354,8 @@ public long getFinishTime() {
public String getDiagnosticsInfo() {
try {
readLock.lock();
if (getFinishedStatus() != null) {
return getFinishedStatus().getDiagnostics();
if (finishedStatus != null) {
return finishedStatus.getDiagnostics();
} else {
return null;
}
Expand All @@ -374,7 +372,7 @@ public String getLogURL() {
logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext
.getYarnConfiguration()));
logURL.append(WebAppUtils.getRunningLogURL(
container.getNodeHttpAddress(), containerId.toString(),
container.getNodeHttpAddress(), getContainerId().toString(),
user));
return logURL.toString();
} finally {
Expand All @@ -386,8 +384,8 @@ public String getLogURL() {
public int getContainerExitStatus() {
try {
readLock.lock();
if (getFinishedStatus() != null) {
return getFinishedStatus().getExitStatus();
if (finishedStatus != null) {
return finishedStatus.getExitStatus();
} else {
return 0;
}
Expand All @@ -400,8 +398,8 @@ public int getContainerExitStatus() {
public ContainerState getContainerState() {
try {
readLock.lock();
if (getFinishedStatus() != null) {
return getFinishedStatus().getState();
if (finishedStatus != null) {
return finishedStatus.getState();
} else {
return ContainerState.RUNNING;
}
Expand Down Expand Up @@ -431,7 +429,7 @@ public void setResourceRequests(List<ResourceRequest> requests) {

@Override
public String toString() {
return containerId.toString();
return getContainerId().toString();
}

@Override
Expand Down Expand Up @@ -476,7 +474,7 @@ public void handle(RMContainerEvent event) {
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
" on container " + this.containerId);
" on container " + this.getContainerId());
}
if (oldState != getState()) {
LOG.info(event.getContainerId() + " Container Transitioned from "
Expand All @@ -489,10 +487,15 @@ public void handle(RMContainerEvent event) {
}
}

public ContainerStatus getFinishedStatus() {
return finishedStatus;
public boolean completed() {
return finishedStatus != null;
}


@Override
public NodeId getNodeId() {
return nodeId;
}

private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {

Expand All @@ -517,7 +520,7 @@ public RMContainerState transition(RMContainerImpl container,
report.getContainerExitStatus());

new FinishedTransition().transition(container,
new RMContainerFinishedEvent(container.containerId, status,
new RMContainerFinishedEvent(container.getContainerId(), status,
RMContainerEventType.FINISHED));
return RMContainerState.COMPLETED;
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
Expand Down Expand Up @@ -654,11 +657,11 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
} else {
// Something wrong happened, kill the container
LOG.warn("Something wrong happened, container size reported by NM"
+ " is not expected, ContainerID=" + container.containerId
+ " is not expected, ContainerID=" + container.getContainerId()
+ " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
+ nmContainerResource);
container.eventHandler.handle(new RMNodeCleanContainerEvent(
container.nodeId, container.containerId));
container.nodeId, container.getContainerId()));

}
}
Expand Down Expand Up @@ -761,7 +764,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {

// Inform node
container.eventHandler.handle(new RMNodeCleanContainerEvent(
container.nodeId, container.containerId));
container.nodeId, container.getContainerId()));

// Inform appAttempt
super.transition(container, event);
Expand Down Expand Up @@ -831,8 +834,8 @@ public int hashCode() {

@Override
public int compareTo(RMContainer o) {
if (containerId != null && o.getContainerId() != null) {
return containerId.compareTo(o.getContainerId());
if (getContainerId() != null && o.getContainerId() != null) {
return getContainerId().compareTo(o.getContainerId());
}
return -1;
}
Expand Down Expand Up @@ -865,4 +868,35 @@ public ExecutionType getExecutionType() {
public boolean isRemotelyAllocated() {
return isExternallyAllocated;
}

@Override
public Resource getAllocatedOrReservedResource() {
try {
readLock.lock();
if (getState().equals(RMContainerState.RESERVED)) {
return getReservedResource();
} else {
return getAllocatedResource();
}
} finally {
readLock.unlock();
}
}

@Override
public void setContainerId(ContainerId containerId) {
// In some cases, for example, global scheduling. It is possible that
// container created without container-id assigned, so we will publish
// container creation event to timeline service when id assigned.
container.setId(containerId);

// If saveNonAMContainerMetaInfo is true, store system metrics for all
// containers. If false, and if this container is marked as the AM, metrics
// will still be published for this container, but that calculation happens
// later.
if (saveNonAMContainerMetaInfo && null != container.getId()) {
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
}
}
Loading

0 comments on commit de3b4aa

Please sign in to comment.