Skip to content

Commit

Permalink
YARN-9809. Added node manager health status to resource manager regis…
Browse files Browse the repository at this point in the history
…tration call.

           Contributed by Eric Badger via eyang
  • Loading branch information
macroadster committed Jun 30, 2020
1 parent 2a67e2b commit e8dc862
Show file tree
Hide file tree
Showing 31 changed files with 429 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,13 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "health-checker.interval-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;

/** Whether or not to run the node health script before the NM
* starts up.*/
public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
NM_PREFIX + "health-checker.run-before-startup";
public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
false;

/** Health check time out period for all scripts.*/
public static final String NM_HEALTH_CHECK_TIMEOUT_MS =
NM_PREFIX + "health-checker.timeout-ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,13 @@
<value>1200000</value>
</property>

<property>
<description>Whether or not to run the node health script
before the NM starts up.</description>
<name>yarn.nodemanager.health-checker.run-before-startup</name>
<value>false</value>
</property>

<property>
<description>Frequency of running node health scripts.</description>
<name>yarn.nodemanager.health-checker.interval-ms</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;

public abstract class RegisterNodeManagerRequest {
Expand Down Expand Up @@ -53,14 +54,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
Resource physicalResource) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, physicalResource,
null);
null, null);
}

public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource, Set<NodeAttribute> nodeAttributes) {
Resource physicalResource, Set<NodeAttribute> nodeAttributes,
NodeStatus nodeStatus) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
Expand All @@ -72,6 +74,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
request.setNodeAttributes(nodeAttributes);
request.setNodeStatus(nodeStatus);
return request;
}

Expand Down Expand Up @@ -133,4 +136,16 @@ public abstract void setLogAggregationReportsForApps(
public abstract Set<NodeAttribute> getNodeAttributes();

public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);

/**
* Get the status of the node.
* @return The status of the node.
*/
public abstract NodeStatus getNodeStatus();

/**
* Set the status of the node.
* @param nodeStatus The status of the node.
*/
public abstract void setNodeStatus(NodeStatus nodeStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
Expand All @@ -51,7 +52,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;

import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;

public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
Expand All @@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest

/** Physical resources in the node. */
private Resource physicalResource = null;
private NodeStatus nodeStatus;

public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
Expand Down Expand Up @@ -121,6 +125,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
}
}

private void addLogAggregationStatusForAppsToProto() {
Expand Down Expand Up @@ -359,6 +366,28 @@ public synchronized void setPhysicalResource(Resource pPhysicalResource) {
this.physicalResource = pPhysicalResource;
}

@Override
public synchronized NodeStatus getNodeStatus() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeStatus != null) {
return this.nodeStatus;
}
if (!p.hasNodeStatus()) {
return null;
}
this.nodeStatus = convertFromProtoFormat(p.getNodeStatus());
return this.nodeStatus;
}

@Override
public synchronized void setNodeStatus(NodeStatus pNodeStatus) {
maybeInitBuilder();
if (pNodeStatus == null) {
builder.clearNodeStatus();
}
this.nodeStatus = pNodeStatus;
}

@Override
public int hashCode() {
return getProto().hashCode();
Expand Down Expand Up @@ -533,4 +562,12 @@ public synchronized void setLogAggregationReportsForApps(
}
this.logAggregationReportsForApps = logAggregationStatusForApps;
}

private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) {
return new NodeStatusPBImpl(s);
}

private NodeStatusProto convertToProtoFormat(NodeStatus s) {
return ((NodeStatusPBImpl)s).getProto();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ message RegisterNodeManagerRequestProto {
optional ResourceProto physicalResource = 9;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
optional NodeAttributesProto nodeAttributes = 11;
optional NodeStatusProto nodeStatus = 12;
}

message RegisterNodeManagerResponseProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,11 @@ protected void registerWithRM()
// during RM recovery
synchronized (this.context) {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
NodeStatus nodeStatus = getNodeStatus(0);
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels, physicalResource, nodeAttributes);
nodeLabels, physicalResource, nodeAttributes, nodeStatus);

if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public class NodeHealthScriptRunner extends TimedHealthReporterService {
"Node health script timed out";

private NodeHealthScriptRunner(String scriptName, long checkInterval,
long timeout, String[] scriptArgs) {
super(NodeHealthScriptRunner.class.getName(), checkInterval);
long timeout, String[] scriptArgs, boolean runBeforeStartup) {
super(NodeHealthScriptRunner.class.getName(), checkInterval,
runBeforeStartup);
this.nodeHealthScript = scriptName;
this.scriptTimeout = timeout;
setTimerTask(new NodeHealthMonitorExecutor(scriptArgs));
Expand Down Expand Up @@ -91,6 +92,10 @@ public static NodeHealthScriptRunner newInstance(String scriptName,
"interval-ms can not be set to a negative number.");
}

boolean runBeforeStartup = conf.getBoolean(
YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);

// Determine time out
String scriptTimeoutConfig = String.format(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE,
Expand All @@ -113,7 +118,7 @@ public static NodeHealthScriptRunner newInstance(String scriptName,
String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{});

return new NodeHealthScriptRunner(nodeHealthScript,
checkIntervalMs, scriptTimeout, scriptArgs);
checkIntervalMs, scriptTimeout, scriptArgs, runBeforeStartup);
}

private enum HealthCheckerExitStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,25 @@ public abstract class TimedHealthReporterService extends AbstractService
private Timer timer;
private TimerTask task;
private long intervalMs;
private boolean runBeforeStartup;

TimedHealthReporterService(String name, long intervalMs) {
super(name);
this.isHealthy = true;
this.healthReport = "";
this.lastReportedTime = System.currentTimeMillis();
this.intervalMs = intervalMs;
this.runBeforeStartup = false;
}

TimedHealthReporterService(String name, long intervalMs,
boolean runBeforeStartup) {
super(name);
this.isHealthy = true;
this.healthReport = "";
this.lastReportedTime = System.currentTimeMillis();
this.intervalMs = intervalMs;
this.runBeforeStartup = runBeforeStartup;
}

@VisibleForTesting
Expand All @@ -73,7 +85,13 @@ public void serviceStart() throws Exception {
throw new Exception("Health reporting task hasn't been set!");
}
timer = new Timer("HealthReporterService-Timer", true);
timer.scheduleAtFixedRate(task, 0, intervalMs);
long delay = 0;
if (runBeforeStartup) {
delay = intervalMs;
task.run();
}

timer.scheduleAtFixedRate(task, delay, intervalMs);
super.serviceStart();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.yarn.server.nodemanager;

import static org.mockito.Mockito.mock;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -134,6 +136,9 @@ public long getRMIdentifier() {
new DummyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler);
nodeStatusUpdater.init(conf);
NodeResourceMonitorImpl nodeResourceMonitor = mock(
NodeResourceMonitorImpl.class);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
((NMContext)context).setContainerManager(containerManager);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doNothing;

import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -156,32 +157,20 @@ public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
protected NodeHealthCheckerService nodeHealthChecker;
protected LocalDirsHandlerService dirsHandler;
protected final long DUMMY_RM_IDENTIFIER = 1234;
private NodeResourceMonitorImpl nodeResourceMonitor = mock(
NodeResourceMonitorImpl.class);
private NodeHealthCheckerService nodeHealthCheckerService;
private NodeStatusUpdater nodeStatusUpdater;
protected ContainerManagerImpl containerManager = null;

protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics) {
@Override
protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};

@Override
protected void stopRMProxy() {
return;
}

@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}

@Override
public long getRMIdentifier() {
// There is no real RM registration, simulate and set RMIdentifier
return DUMMY_RM_IDENTIFIER;
}
};
public NodeStatusUpdater getNodeStatusUpdater() {
return nodeStatusUpdater;
}

protected ContainerManagerImpl containerManager = null;
public void setNodeStatusUpdater(
NodeStatusUpdater nodeStatusUpdater) {
this.nodeStatusUpdater = nodeStatusUpdater;
}

protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor();
Expand Down Expand Up @@ -218,11 +207,36 @@ public void setup() throws IOException {
delSrvc.init(conf);

dirsHandler = new LocalDirsHandlerService();
nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
nodeHealthChecker.init(conf);
dirsHandler.init(conf);
nodeHealthCheckerService = new NodeHealthCheckerService(dirsHandler);
nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) {
@Override
protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};

@Override
protected void stopRMProxy() {
return;
}

@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}

@Override
public long getRMIdentifier() {
// There is no real RM registration, simulate and set RMIdentifier
return DUMMY_RM_IDENTIFIER;
}
};

containerManager = createContainerManager(delSrvc);
((NMContext)context).setContainerManager(containerManager);
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setNodeResourceMonitor(nodeResourceMonitor);
nodeStatusUpdater.init(conf);
containerManager.init(conf);
nodeStatusUpdater.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public int launchContainer(ContainerStartContext ctx)
@Override
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
return new ContainerManagerImpl(context, exec, delSrvc,
getNodeStatusUpdater(), metrics, dirsHandler) {

@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
Expand Down Expand Up @@ -1704,7 +1704,7 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
@Test
public void testNullTokens() throws Exception {
ContainerManagerImpl cMgrImpl =
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
new ContainerManagerImpl(context, exec, delSrvc, getNodeStatusUpdater(),
metrics, dirsHandler);
String strExceptionMsg = "";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public void setUp() throws Exception {
@Override
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
return new ContainerManagerImpl(context, exec, delSrvc,
getNodeStatusUpdater(), metrics, dirsHandler) {

@Override
public StartContainersResponse startContainers(
Expand Down
Loading

0 comments on commit e8dc862

Please sign in to comment.