Skip to content

Commit

Permalink
Add launcher for trace agent and dogstatsd
Browse files Browse the repository at this point in the history
  • Loading branch information
randomanderson committed Oct 29, 2021
1 parent a910af3 commit 6f1aeb4
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package datadog.communication.ddagent;

import datadog.trace.api.Config;
import datadog.trace.util.ProcessSupervisor;
import java.io.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalAgentLauncher implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ExternalAgentLauncher.class);

private ProcessSupervisor traceProcessSupervisor;
private ProcessSupervisor dogStatsDProcessSupervisor;

public ExternalAgentLauncher(Config config) {
if (config.isAzureAppServices()) {
if (config.getTraceAgentPath() != null) {
ProcessBuilder traceProcessBuilder = new ProcessBuilder(config.getTraceAgentPath());
traceProcessBuilder.command().addAll(config.getTraceAgentArgs());

traceProcessSupervisor = new ProcessSupervisor("Trace Agent", traceProcessBuilder);
} else {
log.warn("Trace agent path not set. Will not start trace agent process");
}

if (config.getDogStatsDPath() != null) {
ProcessBuilder dogStatsDProcessBuilder = new ProcessBuilder(config.getDogStatsDPath());
dogStatsDProcessBuilder.command().addAll(config.getDogStatsDArgs());

dogStatsDProcessSupervisor = new ProcessSupervisor("DogStatsD", dogStatsDProcessBuilder);
} else {
log.warn("DogStatsD path not set. Will not start DogStatsD process");
}
}
}

@Override
public void close() {
if (traceProcessSupervisor != null) {
traceProcessSupervisor.close();
}

if (dogStatsDProcessSupervisor != null) {
dogStatsDProcessSupervisor.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public final class GeneralConfig {
public static final String DOGSTATSD_START_DELAY = "dogstatsd.start-delay";
public static final String DOGSTATSD_HOST = "dogstatsd.host";
public static final String DOGSTATSD_PORT = "dogstatsd.port";
public static final String DOGSTATSD_PATH = "dogstatsd.path";
public static final String DOGSTATSD_ARGS = "dogstatsd.args";

public static final String RUNTIME_METRICS_ENABLED = "runtime.metrics.enabled";
public static final String RUNTIME_ID_ENABLED = "runtime-id.enabled";
Expand All @@ -45,6 +47,7 @@ public final class GeneralConfig {
public static final String TRACER_METRICS_IGNORED_RESOURCES =
"trace.tracer.metrics.ignored.resources";

public static final String AZURE_APP_SERVICES = "azure.app.services";
public static final String INTERNAL_EXIT_ON_FAILURE = "trace.internal.exit.on.failure";

private GeneralConfig() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public final class TracerConfig {
public static final String AGENT_NAMED_PIPE = "trace.pipe.name";
public static final String AGENT_TIMEOUT = "trace.agent.timeout";
public static final String PROXY_NO_PROXY = "proxy.no_proxy";
public static final String TRACE_AGENT_PATH = "trace.agent.path";
public static final String TRACE_AGENT_ARGS = "trace.agent.args";
public static final String PRIORITY_SAMPLING = "priority.sampling";
public static final String PRIORITY_SAMPLING_FORCE = "priority.sampling.force";
@Deprecated public static final String TRACE_RESOLVER_ENABLED = "trace.resolver.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;
import static datadog.trace.util.CollectionUtils.tryMakeImmutableMap;

import datadog.communication.ddagent.ExternalAgentLauncher;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.monitor.Monitoring;
import datadog.communication.monitor.Recording;
Expand Down Expand Up @@ -112,6 +113,7 @@ public static CoreTracerBuilder builder() {
private final IdGenerationStrategy idGenerationStrategy;
private final PendingTrace.Factory pendingTraceFactory;
private final SamplingCheckpointer checkpointer;
private final ExternalAgentLauncher externalAgentLauncher;

/**
* JVM shutdown callback, keeping a reference to it to remove this if DDTracer gets destroyed
Expand Down Expand Up @@ -420,6 +422,8 @@ private CoreTracer(
this.scopeManager = scopeManager;
}

this.externalAgentLauncher = new ExternalAgentLauncher(config);

if (sharedCommunicationObjects == null) {
sharedCommunicationObjects = new SharedCommunicationObjects();
}
Expand Down Expand Up @@ -765,6 +769,7 @@ public void close() {
writer.close();
statsDClient.close();
metricsAggregator.close();
externalAgentLauncher.close();
}

@Override
Expand Down
48 changes: 48 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@
import static datadog.trace.api.config.CwsConfig.CWS_TLS_REFRESH;
import static datadog.trace.api.config.GeneralConfig.API_KEY;
import static datadog.trace.api.config.GeneralConfig.API_KEY_FILE;
import static datadog.trace.api.config.GeneralConfig.AZURE_APP_SERVICES;
import static datadog.trace.api.config.GeneralConfig.CONFIGURATION_FILE;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_HOST;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_PATH;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_PORT;
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_START_DELAY;
import static datadog.trace.api.config.GeneralConfig.ENV;
Expand Down Expand Up @@ -205,6 +208,8 @@
import static datadog.trace.api.config.TracerConfig.SERVICE_MAPPING;
import static datadog.trace.api.config.TracerConfig.SPAN_TAGS;
import static datadog.trace.api.config.TracerConfig.SPLIT_BY_TAGS;
import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_ARGS;
import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_PATH;
import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_PORT;
import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_URL;
import static datadog.trace.api.config.TracerConfig.TRACE_ANALYTICS_ENABLED;
Expand Down Expand Up @@ -453,6 +458,12 @@ public class Config {
private final boolean cwsEnabled;
private final int cwsTlsRefresh;

private final boolean azureAppServices;
private final String traceAgentPath;
private final List<String> traceAgentArgs;
private final String dogStatsDPath;
private final List<String> dogStatsDArgs;

private String env;
private String version;

Expand Down Expand Up @@ -905,6 +916,23 @@ && isJavaVersionAtLeast(8)
cwsEnabled = configProvider.getBoolean(CWS_ENABLED, DEFAULT_CWS_ENABLED);
cwsTlsRefresh = configProvider.getInteger(CWS_TLS_REFRESH, DEFAULT_CWS_TLS_REFRESH);

azureAppServices = configProvider.getBoolean(AZURE_APP_SERVICES, false);
traceAgentPath = configProvider.getString(TRACE_AGENT_PATH);
String traceAgentArgsString = configProvider.getString(TRACE_AGENT_ARGS);
if (traceAgentArgsString == null) {
traceAgentArgs = Collections.emptyList();
} else {
traceAgentArgs = Collections.unmodifiableList(Arrays.asList(traceAgentArgsString.split(" ")));
}

dogStatsDPath = configProvider.getString(DOGSTATSD_PATH);
String dogStatsDArgsString = configProvider.getString(DOGSTATSD_ARGS);
if (dogStatsDArgsString == null) {
dogStatsDArgs = Collections.emptyList();
} else {
dogStatsDArgs = Collections.unmodifiableList(Arrays.asList(dogStatsDArgsString.split(" ")));
}

// Setting this last because we have a few places where this can come from
apiKey = tmpApiKey;

Expand Down Expand Up @@ -1404,6 +1432,26 @@ public int getCwsTlsRefresh() {
return cwsTlsRefresh;
}

public boolean isAzureAppServices() {
return azureAppServices;
}

public String getTraceAgentPath() {
return traceAgentPath;
}

public List<String> getTraceAgentArgs() {
return traceAgentArgs;
}

public String getDogStatsDPath() {
return dogStatsDPath;
}

public List<String> getDogStatsDArgs() {
return dogStatsDArgs;
}

public String getConfigFile() {
return configFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public enum AgentThread {

FLEET_MANAGEMENT_POLLER("dd-fleet-management-poller"),

CWS_TLS("dd-cws-tls");
CWS_TLS("dd-cws-tls"),

PROCESS_SUPERVISOR("dd-process-supervisor");

public final String threadName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package datadog.trace.util;

import static datadog.trace.util.AgentThreadFactory.AgentThread.PROCESS_SUPERVISOR;

import java.io.Closeable;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Starts an external process and restarts the process if it dies */
public class ProcessSupervisor implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProcessSupervisor.class);
private static final long MIN_RESTART_INTERVAL_MS = 30 * 1000;

private final String name;
private final ProcessBuilder processBuilder;
private final Thread supervisorThread;

private long nextRestartTime = 0;
private Process currentProcess;
private volatile boolean stopped = false;

/**
* @param name For logging purposes
* @param processBuilder Builder to create the process
*/
public ProcessSupervisor(String name, ProcessBuilder processBuilder) {
this.name = name;
this.processBuilder = processBuilder;
supervisorThread = AgentThreadFactory.newAgentThread(PROCESS_SUPERVISOR, new SupervisorLoop());
supervisorThread.start();
}

private class SupervisorLoop implements Runnable {
@Override
public void run() {
try {
while (!stopped) {
try {
if (currentProcess == null) {
long restartDelay = nextRestartTime - System.currentTimeMillis();
if (restartDelay > 0) {
Thread.sleep(restartDelay);
continue;
}

log.debug("Starting process: {}", name);
nextRestartTime = System.currentTimeMillis() + MIN_RESTART_INTERVAL_MS;
currentProcess = processBuilder.start();
}

// Block until the process exits
int code = currentProcess.waitFor();
log.debug("Process [{}] has exited with code {}", name, code);

// Process is dead, no longer needs to be tracked
currentProcess = null;
} catch (InterruptedException ignored) {
} catch (IOException e) {
log.error("Exception starting process: {}", name, e);
}
}
} finally {
if (currentProcess != null) {
log.debug("Stopping process [{}]", name);
currentProcess.destroy();
}
}
}
}

@Override
public void close() {
stopped = true;
supervisorThread.interrupt();
}
}

0 comments on commit 6f1aeb4

Please sign in to comment.