Skip to content

Commit

Permalink
HIVE-9179: Add listeners on JobHandle so job status change can be not…
Browse files Browse the repository at this point in the history
…ified to the client [Spark Branch] (Marcelo via Xuefu)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1653341 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Xuefu Zhang committed Jan 20, 2015
1 parent 86d7d06 commit cf90e67
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 23 deletions.
5 changes: 5 additions & 0 deletions spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ protected static class JobResult<T extends Serializable> implements Serializable

}

protected static class JobStarted implements Serializable {

final String id;

JobStarted(String id) {
this.id = id;
}

JobStarted() {
this(null);
}

}

/**
* Inform the client that a new spark job has been submitted for the client job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,53 @@ public interface JobHandle<T extends Serializable> extends Future<T> {
*/
SparkCounters getSparkCounters();

/**
* Return the current state of the job.
*/
State getState();

/**
* Add a listener to the job handle. If the job's state is not SENT, a callback for the
* corresponding state will be invoked immediately.
*
* @param l The listener to add.
*/
void addListener(Listener<T> l);

/**
* The current state of the submitted job.
*/
static enum State {
SENT,
QUEUED,
STARTED,
CANCELLED,
FAILED,
SUCCEEDED;
}

/**
* A listener for monitoring the state of the job in the remote context. Callbacks are called
* when the corresponding state change occurs.
*/
static interface Listener<T extends Serializable> {

void onJobQueued(JobHandle<T> job);

void onJobStarted(JobHandle<T> job);

void onJobCancelled(JobHandle<T> job);

void onJobFailed(JobHandle<T> job, Throwable cause);

void onJobSucceeded(JobHandle<T> job, T result);

/**
* Called when a monitored Spark job is started on the remote context. This callback
* does not indicate a state change in the client job's status.
*/
void onSparkJobStarted(JobHandle<T> job, int sparkJobId);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.hive.spark.client;

import io.netty.util.concurrent.Promise;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.Promise;

import org.apache.hive.spark.counter.SparkCounters;

Expand All @@ -34,28 +35,30 @@
*/
class JobHandleImpl<T extends Serializable> implements JobHandle<T> {

private final AtomicBoolean cancelled;
private final SparkClientImpl client;
private final String jobId;
private final MetricsCollection metrics;
private final Promise<T> promise;
private final List<Integer> sparkJobIds;
private final List<Listener> listeners;
private volatile State state;
private volatile SparkCounters sparkCounters;

JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
this.cancelled = new AtomicBoolean();
this.client = client;
this.jobId = jobId;
this.promise = promise;
this.listeners = Lists.newLinkedList();
this.metrics = new MetricsCollection();
this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
this.state = State.SENT;
this.sparkCounters = null;
}

/** Requests a running job to be cancelled. */
@Override
public boolean cancel(boolean mayInterrupt) {
if (cancelled.compareAndSet(false, true)) {
if (changeState(State.CANCELLED)) {
client.cancel(jobId);
promise.cancel(mayInterrupt);
return true;
Expand Down Expand Up @@ -114,20 +117,116 @@ public SparkCounters getSparkCounters() {
return sparkCounters;
}

@Override
public State getState() {
return state;
}

@Override
public void addListener(Listener l) {
synchronized (listeners) {
listeners.add(l);
// If current state is a final state, notify of Spark job IDs before notifying about the
// state transition.
if (state.ordinal() >= State.CANCELLED.ordinal()) {
for (Integer i : sparkJobIds) {
l.onSparkJobStarted(this, i);
}
}

fireStateChange(state, l);

// Otherwise, notify about Spark jobs after the state notification.
if (state.ordinal() < State.CANCELLED.ordinal()) {
for (Integer i : sparkJobIds) {
l.onSparkJobStarted(this, i);
}
}
}
}

public void setSparkCounters(SparkCounters sparkCounters) {
this.sparkCounters = sparkCounters;
}

@SuppressWarnings("unchecked")
void setSuccess(Object result) {
promise.setSuccess((T) result);
// The synchronization here is not necessary, but tests depend on it.
synchronized (listeners) {
promise.setSuccess((T) result);
changeState(State.SUCCEEDED);
}
}

void setFailure(Throwable error) {
promise.setFailure(error);
// The synchronization here is not necessary, but tests depend on it.
synchronized (listeners) {
promise.setFailure(error);
changeState(State.FAILED);
}
}

/**
* Changes the state of this job handle, making sure that illegal state transitions are ignored.
* Fires events appropriately.
*
* As a rule, state transitions can only occur if the current state is "higher" than the current
* state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
* CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
* than the CANCELLED enum constant.
*/
boolean changeState(State newState) {
synchronized (listeners) {
if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
state = newState;
for (Listener l : listeners) {
fireStateChange(newState, l);
}
return true;
}
return false;
}
}

void addSparkJobId(int sparkJobId) {
synchronized (listeners) {
sparkJobIds.add(sparkJobId);
for (Listener l : listeners) {
l.onSparkJobStarted(this, sparkJobId);
}
}
}

private void fireStateChange(State s, Listener l) {
switch (s) {
case SENT:
break;
case QUEUED:
l.onJobQueued(this);
break;
case STARTED:
l.onJobStarted(this);
break;
case CANCELLED:
l.onJobCancelled(this);
break;
case FAILED:
l.onJobFailed(this, promise.cause());
break;
case SUCCEEDED:
try {
l.onJobSucceeded(this, promise.get());
} catch (Exception e) {
// Shouldn't really happen.
throw new IllegalStateException(e);
}
break;
default:
throw new IllegalStateException();
}
}

/** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */
/** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
@Override
protected void finalize() {
if (!isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ <T extends Serializable> void jobFinished(String jobId, T result,
clientRpc.call(new JobResult(jobId, result, error, counters));
}

void jobStarted(String jobId) {
clientRpc.call(new JobStarted(jobId));
}

void jobSubmitted(String jobId, int sparkJobId) {
LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
clientRpc.call(new JobSubmitted(jobId, sparkJobId));
Expand Down Expand Up @@ -325,6 +329,8 @@ private class JobWrapper<T extends Serializable> implements Callable<Void> {

@Override
public Void call() throws Exception {
protocol.jobStarted(req.id);

try {
jc.setMonitorCb(new MonitorCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private class ClientProtocol extends BaseProtocol {
<T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
final String jobId = UUID.randomUUID().toString();
final Promise<T> promise = driverRpc.createPromise();
JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
jobs.put(jobId, handle);

final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
Expand All @@ -393,7 +393,9 @@ <T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
if (!f.isSuccess() && !promise.isDone()) {
if (f.isSuccess()) {
handle.changeState(JobHandle.State.QUEUED);
} else if (!promise.isDone()) {
promise.setFailure(f.cause());
}
}
Expand Down Expand Up @@ -456,11 +458,20 @@ private void handle(ChannelHandlerContext ctx, JobResult msg) {
}
}

private void handle(ChannelHandlerContext ctx, JobStarted msg) {
JobHandleImpl<?> handle = jobs.get(msg.id);
if (handle != null) {
handle.changeState(JobHandle.State.STARTED);
} else {
LOG.warn("Received event for unknown job {}", msg.id);
}
}

private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
if (handle != null) {
LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
handle.getSparkJobIds().add(msg.sparkJobId);
handle.addSparkJobId(msg.sparkJobId);
} else {
LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
}
Expand Down
Loading

0 comments on commit cf90e67

Please sign in to comment.