Skip to content

Commit

Permalink
HIVE-9063: NPE in RemoteSparkJobStatus.getSparkStatistics [Spark Bran…
Browse files Browse the repository at this point in the history
…ch] (Rui via Xuefu)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1644518 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Xuefu Zhang committed Dec 10, 2014
1 parent cb733fc commit ed67351
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ public SparkCounters getCounter() {

@Override
public SparkStatistics getSparkStatistics() {
MetricsCollection metricsCollection = jobHandle.getMetrics();
if (metricsCollection == null || getCounter() == null) {
return null;
}
SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
// add Hive operator level statistics.
sparkStatisticsBuilder.add(getCounter());
// add spark job metrics.
String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics";
MetricsCollection metricsCollection = jobHandle.getMetrics();
if (metricsCollection == null) {
return null;
}

Map<String, Long> flatJobMetric = extractMetrics(metricsCollection);
for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
private final MetricsCollection metrics;
private final Promise<T> promise;
private final List<Integer> sparkJobIds;
private SparkCounters sparkCounters;
private volatile SparkCounters sparkCounters;

JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
this.cancelled = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,54 @@

package org.apache.hive.spark.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.nio.NioEventLoopGroup;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.*;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Tuple2;

import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Driver code for the Spark client library.
Expand Down Expand Up @@ -289,7 +305,8 @@ public void call(JavaFutureAction<?> future, SparkCounters sparkCounters) {
// re-thrown so that the executor can destroy the affected thread (or the JVM can
// die or whatever would happen if the throwable bubbled up).
LOG.info("Failed to run job " + req.id, t);
protocol.jobFinished(req.id, null, t, null);
protocol.jobFinished(req.id, null, t,
sparkCounters != null ? sparkCounters.snapshot() : null);
throw new ExecutionException(t);
} finally {
jc.setMonitorCb(null);
Expand Down

0 comments on commit ed67351

Please sign in to comment.