Skip to content

Commit

Permalink
HIVE-8984: Enable job metrics/statistics gathering for remote spark c…
Browse files Browse the repository at this point in the history
…ontext [Spark Branch] (Rui via Xuefu)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1642251 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Xuefu Zhang committed Nov 28, 2014
1 parent b329c38 commit 6c8feff
Show file tree
Hide file tree
Showing 22 changed files with 147 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
Expand Down Expand Up @@ -136,7 +136,7 @@ public Serializable call(JobContext jc) throws Exception {
JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
// We use Spark RDD async action to submit job as it's the only way to get jobId now.
JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
jc.monitor(future);
jc.monitor(future, sparkCounters);
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;

import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
Expand Down Expand Up @@ -103,9 +103,10 @@ public int execute(DriverContext driverContext) {
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
if (sparkJobStatus != null) {
sparkCounters = sparkJobStatus.getCounter();
SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
monitor.startMonitor();
// for RSC, we should get the counters after job has finished
sparkCounters = sparkJobStatus.getCounter();
SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.Statistic;

import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounter;
import org.apache.hive.spark.counter.SparkCounterGroup;
import org.apache.hive.spark.counter.SparkCounters;

import java.util.HashMap;
import java.util.LinkedList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark.status;

import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.JobExecutionStatus;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.spark.JobExecutionStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hive.spark.client.MetricsCollection;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hive.spark.client.Job;
Expand All @@ -37,6 +41,7 @@

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -97,12 +102,27 @@ public Map<String, SparkStageProgress> getSparkStageProgress() {

@Override
public SparkCounters getCounter() {
return null;
return jobHandle.getSparkCounters();
}

@Override
public SparkStatistics getSparkStatistics() {
return null;
SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
// add Hive operator level statistics.
sparkStatisticsBuilder.add(getCounter());
// add spark job metrics.
String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics";
MetricsCollection metricsCollection = jobHandle.getMetrics();
if (metricsCollection == null) {
return null;
}

Map<String, Long> flatJobMetric = extractMetrics(metricsCollection);
for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
}

return sparkStatisticsBuilder.build();
}

@Override
Expand Down Expand Up @@ -194,7 +214,7 @@ public JobExecutionStatus status() {
}
}
}
if(jobInfo == null) {
if (jobInfo == null) {
jobInfo = new SparkJobInfo() {
@Override
public int jobId() {
Expand All @@ -216,11 +236,11 @@ public JobExecutionStatus status() {
}
}

private static class GetStageInfoJob implements Job<HiveSparkStageInfo>{
private static class GetStageInfoJob implements Job<HiveSparkStageInfo> {
private final int stageId;

GetStageInfoJob(int stageId){
this.stageId=stageId;
GetStageInfoJob(int stageId) {
this.stageId = stageId;
}

@Override
Expand All @@ -229,4 +249,36 @@ public HiveSparkStageInfo call(JobContext jc) throws Exception {
return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo();
}
}

private Map<String, Long> extractMetrics(MetricsCollection metricsCollection) {
Map<String, Long> results = new LinkedHashMap<String, Long>();
Metrics allMetrics = metricsCollection.getAllMetrics();

results.put("EexcutorDeserializeTime", allMetrics.executorDeserializeTime);
results.put("ExecutorRunTime", allMetrics.executorRunTime);
results.put("ResultSize", allMetrics.resultSize);
results.put("JvmGCTime", allMetrics.jvmGCTime);
results.put("ResultSerializationTime", allMetrics.resultSerializationTime);
results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled);
results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled);
if (allMetrics.inputMetrics != null) {
results.put("BytesRead", allMetrics.inputMetrics.bytesRead);
}
if (allMetrics.shuffleReadMetrics != null) {
ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
long rbf = shuffleReadMetrics.remoteBlocksFetched;
long lbf = shuffleReadMetrics.localBlocksFetched;
results.put("RemoteBlocksFetched", rbf);
results.put("LocalBlocksFetched", lbf);
results.put("TotalBlocksFetched", lbf + rbf);
results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime);
results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead);
}
if (allMetrics.shuffleWriteMetrics != null) {
results.put("ShuffleBytesWritten", allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
results.put("ShuffleWriteTime", allMetrics.shuffleWriteMetrics.shuffleWriteTime);
}

return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hive.spark.counter.SparkCounters;

public class CounterStatsAggregatorSpark
implements StatsAggregator, StatsCollectionTaskIndependent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hive.spark.client;

import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -43,7 +44,7 @@ public interface JobContext {
*
* @return The job (unmodified).
*/
<T> JavaFutureAction<T> monitor(JavaFutureAction<T> job);
<T> JavaFutureAction<T> monitor(JavaFutureAction<T> job, SparkCounters sparkCounters);

/**
* Return a map from client job Id to corresponding JavaFutureActions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hive.spark.client;

import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -43,8 +44,8 @@ public JavaSparkContext sc() {
}

@Override
public <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job) {
monitorCb.get().call(job);
public <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job, SparkCounters sparkCounters) {
monitorCb.get().call(job, sparkCounters);
return job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.Future;

import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.counter.SparkCounters;

/**
* A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
Expand All @@ -48,6 +49,11 @@ public interface JobHandle<T extends Serializable> extends Future<T> {
*/
List<Integer> getSparkJobIds();

/**
* Get the SparkCounters for this job
*/
SparkCounters getSparkCounters();

// TODO: expose job status?

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hive.spark.client;

import org.apache.hive.spark.counter.SparkCounters;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -41,6 +43,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
private Throwable error;

private final List<Integer> sparkJobIds;
private SparkCounters sparkCounters;

JobHandleImpl(SparkClientImpl client, String jobId) {
this.client = client;
Expand All @@ -50,6 +53,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
this.cancelled = new AtomicBoolean();
this.completed = false;
this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
sparkCounters = null;
}

/** Requests a running job to be cancelled. */
Expand Down Expand Up @@ -113,6 +117,11 @@ public List<Integer> getSparkJobIds() {
return sparkJobIds;
}

@Override
public SparkCounters getSparkCounters() {
return sparkCounters;
}

private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
long deadline = System.currentTimeMillis() + timeout;
synchronized (monitor) {
Expand Down Expand Up @@ -150,4 +159,7 @@ void complete(Object result, Throwable error) {
}
}

public void setSparkCounters(SparkCounters sparkCounters) {
this.sparkCounters = sparkCounters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.hive.spark.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.hive.spark.client;

import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.api.java.JavaFutureAction;

interface MonitorCallback {

void call(JavaFutureAction<?> future);
void call(JavaFutureAction<?> future, SparkCounters sparkCounters);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;

import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.counter.SparkCounters;

final class Protocol {

Expand Down Expand Up @@ -112,15 +113,17 @@ static class JobResult<T extends Serializable> implements Serializable {
final String id;
final T result;
final Throwable error;
final SparkCounters sparkCounters;

JobResult(String id, T result, Throwable error) {
JobResult(String id, T result, Throwable error, SparkCounters sparkCounters) {
this.id = id;
this.result = result;
this.error = error;
this.sparkCounters = sparkCounters;
}

JobResult() {
this(null, null, null);
this(null, null, null, null);
}

}
Expand Down
Loading

0 comments on commit 6c8feff

Please sign in to comment.