Skip to content

Commit

Permalink
test(rstream): add sync() test (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Apr 3, 2019
1 parent 8b2f3fe commit 4f907f3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 54 deletions.
8 changes: 7 additions & 1 deletion packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
partitionSync,
Transducer
} from "@thi.ng/transducers";
import { CloseMode, ISubscribable, State } from "./api";
import {
CloseMode,
DEBUG,
ISubscribable,
State
} from "./api";
import { Subscription } from "./subscription";
import { closeMode } from "./utils/close";
import { nextID } from "./utils/idgen";
Expand Down Expand Up @@ -207,6 +212,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
const sub = this.sources.get(src);
if (sub) {
const id = this.invRealSourceIDs.get(src.id);
DEBUG && console.log(`removing src: ${src.id} (${id})`);
this.sourceIDs.delete(id);
this.realSourceIDs.delete(id);
this.idSources.delete(src.id);
Expand Down
131 changes: 78 additions & 53 deletions packages/rstream/test/stream-sync.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,54 @@
import * as assert from "assert";

import { Atom } from "@thi.ng/atom";
import * as tx from "@thi.ng/transducers";

import * as assert from "assert";
import * as rs from "../src/index";

describe("StreamSync", () => {

function adder() {
return tx.map(
(ports) => {
let sum = 0;
for (let p in ports) {
sum += ports[p];
}
return sum;
return tx.map((ports) => {
let sum = 0;
for (let p in ports) {
sum += ports[p];
}
);
return sum;
});
}

it("dataflow & teardown", () => {
let a, b, c;
let a1done = false, a2done = false;
let a1done = false,
a2done = false;
let a1buf, a2buf;
const db = new Atom<any>({
a1: { ins: { a: 1, b: 2 } },
a2: { ins: { b: 10 } },
a2: { ins: { b: 10 } }
});
const a1 = rs.sync({
src: [
a = rs.fromView(db, "a1.ins.a"),
b = rs.fromView(db, "a1.ins.b"),
(a = rs.fromView(db, "a1.ins.a")),
(b = rs.fromView(db, "a1.ins.b"))
],
xform: adder(),
xform: adder()
});
const a1res = a1.subscribe({
next(x) { a1buf = x; },
done() { a1done = true; }
next(x) {
a1buf = x;
},
done() {
a1done = true;
}
});
const a2 = rs.sync({
src: [
a1,
c = rs.fromView(db, "a2.ins.b"),
],
xform: adder(),
src: [a1, (c = rs.fromView(db, "a2.ins.b"))],
xform: adder()
});
const res = a2.subscribe({
next(x) { a2buf = x; },
done() { a2done = true; }
next(x) {
a2buf = x;
},
done() {
a2done = true;
}
});
assert.equal(a1buf, 3);
assert.equal(a2buf, 13);
Expand Down Expand Up @@ -82,17 +83,18 @@ describe("StreamSync", () => {
c: rs.stream()
};
const res = [];
const sync = rs.sync({ src, mergeOnly: true })
.subscribe({
next: (x) => res.push(x),
done: () => {
assert.deepEqual(
res,
[{ c: 1 }, { c: 1, b: 2 }, { c: 1, b: 2, a: 3 }, { c: 1, b: 2, a: 4 }]
);
done();
}
});
const sync = rs.sync({ src, mergeOnly: true }).subscribe({
next: (x) => res.push(x),
done: () => {
assert.deepEqual(res, [
{ c: 1 },
{ c: 1, b: 2 },
{ c: 1, b: 2, a: 3 },
{ c: 1, b: 2, a: 4 }
]);
done();
}
});

src.c.next(1);
src.b.next(2);
Expand All @@ -108,27 +110,50 @@ describe("StreamSync", () => {
c: rs.stream()
};
const res = [];
const sync = rs.sync({
src,
mergeOnly: true
}).transform(
// ensure `a` & `b` are present
tx.filter((tuple: any) => tuple.a != null && tuple.b != null)
).subscribe({
next: (x) => res.push(x),
done: () => {
assert.deepEqual(
res,
[{ c: 1, b: 2, a: 3 }, { c: 1, b: 2, a: 4 }]
);
done();
}
});
const sync = rs
.sync({
src,
mergeOnly: true
})
.transform(
// ensure `a` & `b` are present
tx.filter((tuple: any) => tuple.a != null && tuple.b != null)
)
.subscribe({
next: (x) => res.push(x),
done: () => {
assert.deepEqual(res, [
{ c: 1, b: 2, a: 3 },
{ c: 1, b: 2, a: 4 }
]);
done();
}
});

src.c.next(1);
src.b.next(2);
src.a.next(3);
src.a.next(4);
sync.done();
});

it("fromPromise", (done) => {
const delayed = (x, t) =>
new Promise((resolve) => setTimeout(() => resolve(x), t));

rs.transduce(
rs.sync({
src: {
t: rs.fromInterval(5),
a: rs.fromPromise(delayed("aa", 20)),
b: rs.fromPromise(delayed("bb", 40))
}
}),
tx.comp(tx.take(1), tx.map(({ a, b }) => ({ a, b }))),
tx.last()
).then((res) => {
assert.deepEqual(res, { a: "aa", b: "bb" });
done();
});
});
});

0 comments on commit 4f907f3

Please sign in to comment.