Skip to content

Commit

Permalink
Merge pull request eoinsha#8 from bryanjos/master
Browse files Browse the repository at this point in the history
Renamed `after` to `timeout`. Added timeout parameter to Push constructor
  • Loading branch information
eoinsha committed Jan 12, 2016
2 parents f19f207 + d034eee commit 669d5d7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
22 changes: 14 additions & 8 deletions src/main/java/org/phoenixframework/channels/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ public class Channel {

private boolean joinedOnce = false;
private ChannelState state = ChannelState.CLOSED;
public static long DEFAULT_TIMEOUT = 5000;

private ConcurrentLinkedDeque<Push> pushBuffer = new ConcurrentLinkedDeque<>();

public Channel(final String topic, final JsonNode payload, final Socket socket) {
this.topic = topic;
this.payload = payload;
this.socket = socket;
this.joinPush = new Push(this, ChannelEvent.JOIN.getPhxEvent(), payload);
this.joinPush = new Push(this, ChannelEvent.JOIN.getPhxEvent(), payload, DEFAULT_TIMEOUT);
this.channelTimer = new Timer("Phx Rejoin timer for " + topic);

this.joinPush.receive("ok", new IMessageCallback() {
Expand Down Expand Up @@ -178,17 +179,18 @@ public boolean canPush() {
*
* @param event The event name
* @param payload The message payload
* @param timeout The number of milliseconds to wait before triggering a timeout
*
* @return The Push instance used to send the message
*
* @throws IOException Thrown if the payload cannot be pushed
* @throws IllegalStateException Thrown if the channel has not yet been joined
*/
public Push push(final String event, final JsonNode payload) throws IOException, IllegalStateException {
public Push push(final String event, final JsonNode payload, final long timeout) throws IOException, IllegalStateException {
if(!this.joinedOnce) {
throw new IllegalStateException("Unable to push event before channel has been joined");
}
final Push pushEvent = new Push(this, event, payload);
final Push pushEvent = new Push(this, event, payload, timeout);
if(this.canPush()) {
pushEvent.send();
}
Expand All @@ -198,6 +200,10 @@ public Push push(final String event, final JsonNode payload) throws IOException,
return pushEvent;
}

public Push push(final String event, final JsonNode payload) throws IOException {
return push(event, payload, DEFAULT_TIMEOUT);
}

public Push push(final String event) throws IOException {
return push(event, null);
}
Expand Down Expand Up @@ -234,12 +240,12 @@ private void sendJoin() throws IOException {
@Override
public String toString() {
return "Channel{" +
"topic='" + topic + '\'' +
", message=" + payload +
", bindings=" + bindings +
'}';
"topic='" + topic + '\'' +
", message=" + payload +
", bindings=" + bindings +
'}';
}

private void scheduleRejoinTimer() {
final TimerTask rejoinTimerTask = new TimerTask() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.phoenixframework.channels;

public interface ITimeoutCallback {
void onTimeout();
}
102 changes: 57 additions & 45 deletions src/main/java/org/phoenixframework/channels/Push.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ public class Push {
private Envelope receivedEnvelope = null;
private Map<String, List<IMessageCallback>> recHooks = new HashMap<>();
private boolean sent = false;
private AfterHook afterHook;
private TimeoutHook timeoutHook;

Push(final Channel channel, final String event, final JsonNode payload) {
Push(final Channel channel, final String event, final JsonNode payload, final long timeout) {
this.channel = channel;
this.event = event;
this.payload = payload;
this.timeoutHook = new TimeoutHook(timeout);
}

/**
Expand Down Expand Up @@ -52,22 +53,23 @@ public Push receive(final String status, final IMessageCallback callback) {
return this;
}

public Push after(final long ms, final Runnable callback) {
if(this.afterHook != null)
/**
* Registers for notification of message response timeout
*
* @param callback The callback handler called when timeout is reached
*
* @return This instance's self
*/
public Push timeout(final ITimeoutCallback callback) {
if(this.timeoutHook.hasCallback())
throw new IllegalStateException("Only a single after hook can be applied to a Push");

TimerTask timerTask = null;
this.timeoutHook.setCallback(callback);

if(this.sent) {
timerTask = new TimerTask() {
@Override
public void run() {
callback.run();
}
};
final Timer timer = new Timer("Phx after hook", true);
this.channel.scheduleTask(timerTask, ms);
startTimeout();
}
this.afterHook = new AfterHook(ms, callback, timerTask);

return this;
}

Expand All @@ -83,45 +85,48 @@ void send() throws IOException {
@Override
public void onMessage(final Envelope envelope) {
Push.this.receivedEnvelope = envelope;
Push.this.matchReceive(receivedEnvelope.getResponseStatus(), envelope, ref);
Push.this.matchReceive(receivedEnvelope.getResponseStatus(), envelope);
Push.this.cancelRefEvent();
Push.this.cancelAfter();
Push.this.cancelTimeout();
}
});

this.startAfter();
this.startTimeout();
this.sent = true;
final Envelope envelope = new Envelope(this.channel.getTopic(), this.event, this.payload, ref);
this.channel.getSocket().push(envelope);
}

private void cancelAfter() {
if(this.afterHook != null) {
this.afterHook.getTimerTask().cancel();
this.afterHook.setTimerTask(null);
}
private void cancelTimeout() {
this.timeoutHook.getTimerTask().cancel();
this.timeoutHook.setTimerTask(null);
}

private void startAfter() {
if(this.afterHook != null) {
final Runnable callback = new Runnable() {
@Override
public void run() {
Push.this.cancelRefEvent();
Push.this.afterHook.getCallback().run();
}
};
this.afterHook.setTimerTask(new TimerTask() {
@Override
public void run() {
callback.run();
private void startTimeout() {
this.timeoutHook.setTimerTask(createTimerTask());
this.channel.scheduleTask(this.timeoutHook.getTimerTask(), this.timeoutHook.getMs());
}

private TimerTask createTimerTask(){
final Runnable callback = new Runnable() {
@Override
public void run() {
Push.this.cancelRefEvent();
if(Push.this.timeoutHook.hasCallback()) {
Push.this.timeoutHook.getCallback().onTimeout();
}
});
this.channel.scheduleTask(this.afterHook.getTimerTask(), this.afterHook.getMs());
}
}
};

return new TimerTask() {
@Override
public void run() {
callback.run();
}
};
}

private void matchReceive(final String status, final Envelope envelope, final String ref) {
private void matchReceive(final String status, final Envelope envelope) {
synchronized (recHooks) {
final List<IMessageCallback> statusCallbacks = this.recHooks.get(status);
if(statusCallbacks != null) {
Expand Down Expand Up @@ -160,30 +165,37 @@ private void cancelRefEvent() {
this.channel.off(this.refEvent);
}

private class AfterHook {
private class TimeoutHook {
private final long ms;
private final Runnable callback;
private ITimeoutCallback callback;
private TimerTask timerTask;

public AfterHook(final long ms, final Runnable callback, final TimerTask timerTask) {
public TimeoutHook(final long ms) {
this.ms = ms;
this.callback = callback;
this.timerTask = timerTask;
}

public long getMs() {
return ms;
}

public Runnable getCallback() {
public ITimeoutCallback getCallback() {
return callback;
}

public TimerTask getTimerTask() {
return timerTask;
}

public void setTimerTask(final TimerTask timerTask) {
this.timerTask = timerTask;
}

public boolean hasCallback(){
return this.callback != null;
}

public void setCallback(final ITimeoutCallback callback){
this.callback = callback;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import spock.util.concurrent.BlockingVariable

class SocketSpec extends Specification {

def socket = new Socket("ws://localhost:4000/ws")
def socket = new Socket("ws://localhost:4000/socket/websocket")

def socketOpenCallback = Mock(ISocketOpenCallback)
def socketCloseCallback = Mock(ISocketCloseCallback)
Expand All @@ -27,7 +27,7 @@ class SocketSpec extends Specification {
when:
socket.connect()
then:
1 * socketOpenCallback .onOpen()
1 * socketOpenCallback.onOpen()
}

def "Channel subscribe"() {
Expand Down

0 comments on commit 669d5d7

Please sign in to comment.