Skip to content

Commit

Permalink
Fix async execution
Browse files Browse the repository at this point in the history
  • Loading branch information
ngallagher committed Jan 20, 2019
1 parent a6a37d4 commit ac4d89e
Show file tree
Hide file tree
Showing 40 changed files with 449 additions and 124 deletions.
95 changes: 95 additions & 0 deletions snap-compile/src/test/java/org/snapscript/compile/AwaitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,97 @@ public class AwaitTest extends ScriptTestCase {
" assert y == 'result for foo()';\n" +
"}).get() == 'result for foo()';\n";

private static final String SUCCESS_11 =
"class Typ {\n"+
" async fun(n): Promise<Integer> {\n"+
" if(n > 0) {\n"+
" const f: String = await foo();\n"+
" println(f);\n"+
" assert f == 'result for foo()';\n"+
" return f;\n"+
" }\n"+
" println('x');\n"+
" }\n"+
" foo() {\n"+
" let n = Thread.currentThread().getName();\n"+
" println(n);\n"+
" return 'result for foo()';\n"+
" }\n"+
"}\n"+
"let x = new Typ().fun(1);\n"+
"println(x.class);\n"+
"assert x.then(y -> {\n"+
" println(`RESULT=${y}`);\n"+
" assert y == 'result for foo()';\n" +
"}).get() == 'result for foo()';\n";

private static final String SUCCESS_12 =
"class Typ {\n"+
" async fun(n): Promise {\n"+
" if(n > 0) {\n"+
" const f: String = await foo();\n"+
" println(f);\n"+
" assert f == 'result for foo()';\n"+
" return f;\n"+
" }\n"+
" println('x');\n"+
" }\n"+
" foo() {\n"+
" let n = Thread.currentThread().getName();\n"+
" println(n);\n"+
" return 'result for foo()';\n"+
" }\n"+
"}\n"+
"let x = new Typ().fun(1);\n"+
"println(x.class);\n"+
"assert x.then(y -> {\n"+
" println(`RESULT=${y}`);\n"+
" assert y == 'result for foo()';\n" +
"}).get() == 'result for foo()';\n";

private static final String SUCCESS_13 =
"class Typ {\n"+
" async fun(n): Promise {\n"+
" return `result for fun(${n})`;\n"+
" }\n"+
" foo() {\n"+
" let n = Thread.currentThread().getName();\n"+
" println(n);\n"+
" return 'result for foo()';\n"+
" }\n"+
"}\n"+
"let x = new Typ().fun(1);\n"+
"println(x.class);\n"+
"assert x.then(y -> {\n"+
" println(`RESULT=${y}`);\n"+
" assert y == 'result for fun(1)';\n" +
"}).get() == 'result for fun(1)';\n";

private static final String SUCCESS_14 =
"class Typ {\n"+
" async fun(n): Promise {\n"+
" if(n > 0) {\n"+
" const f: String = await foo();\n"+
" println(f);\n"+
" assert f == 'result for foo()';\n"+
" return f;\n"+
" }\n"+
" println('x');\n"+
" }\n"+
" foo() {\n"+
" throw new RuntimeException('error occured in foo()');\n"+
" }\n"+
"}\n"+
"let x = new Typ().fun(1);\n"+
"println(x.class);\n"+
"x.fail(y -> {\n"+
" y.printStackTrace();\n"+
" println(`ERROR=${y}`);\n"+
" println(`ERROR=${y.class}`);\n"+
" println(`ERROR=${y.message}`);\n"+
" assert y instanceof Exception;\n" +
"}).block();\n";

public void testAwait() throws Exception {
assertScriptExecutes(SUCCESS_1);
assertScriptExecutes(SUCCESS_2);
Expand All @@ -207,6 +298,10 @@ public void testAwait() throws Exception {
assertScriptExecutes(SUCCESS_8);
assertScriptExecutes(SUCCESS_9);
assertScriptExecutes(SUCCESS_10);
assertScriptExecutes(SUCCESS_11);
assertScriptExecutes(SUCCESS_12);
assertScriptExecutes(SUCCESS_13);
assertScriptExecutes(SUCCESS_14);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.snapscript.core.variable.Value;

public class TraitTest extends TestCase{

private static final String SOURCE_1=
"trait Pet {\n"+
" speak(out) {\n"+
Expand Down
95 changes: 71 additions & 24 deletions snap-core/src/main/java/org/snapscript/core/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;

import org.snapscript.common.Consumer;
Expand All @@ -33,15 +32,19 @@ public <T> Promise<T> schedule(Consumer<Object, T> consumer) {

private static class FuturePromise<T> implements Promise<T> {

private final Set<Consumer<T, Object>> consumers;
private final AtomicReference<T> result;
private final Set<Consumer<Throwable, Object>> failures;
private final Set<Consumer<T, Object>> listeners;
private final AtomicReference<Throwable> error;
private final AtomicReference<T> success;
private final FutureTask<T> future;
private final Callable<T> task;

public FuturePromise(Consumer<Object, T> consumer) {
this.consumers = new CopyOnWriteArraySet<Consumer<T, Object>>();
this.task = new PromiseTask<T>(this, consumer);
this.result = new AtomicReference<T>();
this.failures = new CopyOnWriteArraySet<Consumer<Throwable, Object>>();
this.listeners = new CopyOnWriteArraySet<Consumer<T, Object>>();
this.task = new FutureExecution<T>(this, consumer);
this.error = new AtomicReference<Throwable>();
this.success = new AtomicReference<T>();
this.future = new FutureTask<T>(task);
}

Expand All @@ -63,6 +66,50 @@ public Object get(long wait) {
}
}

@Override
public Promise<T> block() {
try {
future.get();
} catch(Exception e) {
return this;
}
return this;
}

@Override
public Promise<T> block(long wait) {
try {
future.get(wait, MILLISECONDS);
} catch(Exception e) {
return this;
}
return this;
}

@Override
public Promise<T> fail(Consumer<Throwable, Object> consumer) {
if(failures.add(consumer)) {
Throwable value = error.get();

if (value != null) {
consumer.consume(value);
}
}
return this;
}

@Override
public Promise<T> then(Consumer<T, Object> consumer) {
if(listeners.add(consumer)) {
T value = success.get();

if (value != null) {
consumer.consume(value);
}
}
return this;
}

public void execute(Executor executor) {
if(executor != null) {
executor.execute(future);
Expand All @@ -72,40 +119,40 @@ public void execute(Executor executor) {
}

public void complete(T value) {
for(Consumer<T, Object> consumer : consumers) {
consumer.consume(value);
for(Consumer<T, Object> listener : listeners) {
listener.consume(value);
}
result.set(value);
success.compareAndSet(null, value);
}

@Override
public Promise<T> then(Consumer<T, Object> consumer) {
T value = result.get();

if(value != null) {
consumer.consume(value);
public void error(Throwable cause) {
for(Consumer<Throwable, Object> failure : failures) {
failure.consume(cause);
}
consumers.add(consumer);
return this;
error.compareAndSet(null, cause);
}
}


private static class PromiseTask<T> implements Callable<T> {
private static class FutureExecution<T> implements Callable<T> {

private final Consumer<Object, T> consumer;
private final FuturePromise<T> promise;

public PromiseTask(FuturePromise<T> promise, Consumer<Object, T> consumer) {
public FutureExecution(FuturePromise<T> promise, Consumer<Object, T> consumer) {
this.consumer = consumer;
this.promise = promise;
}

@Override
public T call() {
T value = consumer.consume(null);
promise.complete(value);
return value;
public T call() throws Exception {
try {
T value = consumer.consume(null);
promise.complete(value);
return value;
} catch(Exception cause) {
promise.error(cause);
throw cause;
}
}
}
}
3 changes: 3 additions & 0 deletions snap-core/src/main/java/org/snapscript/core/Promise.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
public interface Promise<T> {
Object get();
Object get(long wait);
Promise<T> block();
Promise<T> block(long wait);
Promise<T> then(Consumer<T, Object> task);
Promise<T> fail(Consumer<Throwable, Object> task);
}
92 changes: 92 additions & 0 deletions snap-core/src/main/java/org/snapscript/core/PromiseWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.snapscript.core;

import static org.snapscript.core.constraint.Constraint.NONE;

import java.util.List;

import org.snapscript.common.Consumer;
import org.snapscript.core.constraint.Constraint;
import org.snapscript.core.scope.Scope;
import org.snapscript.core.type.Type;

public class PromiseWrapper {

public PromiseWrapper() {
super();
}

public Promise toPromise(Scope scope, Object object) {
if(!Promise.class.isInstance(object)) {
return new IdentityPromise(object);
}
return (Promise)object;
}

public Object fromPromise(Scope scope, Object object) {
if(Promise.class.isInstance(object)) {
Promise promise = (Promise)object;
return promise.get();
}
return object;
}

public Constraint fromPromise(Scope scope, Constraint returns) {
Type type = returns.getType(scope);

if(type != null) {
Class real = type.getType();

if(real != null) {
if (Promise.class.isAssignableFrom(real)) {
List<Constraint> generics = returns.getGenerics(scope);

for (Constraint generic : generics) {
return generic;
}
return NONE;
}
}
}
return returns;
}

private static class IdentityPromise implements Promise {

public final Object object;

private IdentityPromise(Object object) {
this.object = object;
}

@Override
public Object get() {
return object;
}

@Override
public Object get(long wait) {
return object;
}

@Override
public Promise block() {
return this;
}

@Override
public Promise block(long wait) {
return this;
}

@Override
public Promise fail(Consumer task) {
return this;
}

@Override
public Promise then(Consumer task) {
task.consume(object);
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.snapscript.core.result;

import org.snapscript.core.scope.Scope;
import org.snapscript.core.yield.AwaitResult;
import org.snapscript.core.yield.Resume;
import org.snapscript.core.yield.YieldResult;
import org.snapscript.core.resume.AwaitResult;
import org.snapscript.core.resume.Resume;
import org.snapscript.core.resume.YieldResult;

public abstract class Result {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.snapscript.core.yield;
package org.snapscript.core.resume;

import org.snapscript.core.scope.Scope;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.snapscript.core.yield;
package org.snapscript.core.resume;

import static org.snapscript.core.result.Result.NORMAL;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.snapscript.core.yield;
package org.snapscript.core.resume;

import org.snapscript.core.result.Result;
import org.snapscript.core.scope.Scope;
Expand Down
Loading

0 comments on commit ac4d89e

Please sign in to comment.