Skip to content

Commit

Permalink
fix(rstream): subscribe() w/ xform, add test
Browse files Browse the repository at this point in the history
- fixes `.subscribe(sub, xform)` where child `sub` was an actual instance
  of `Subscription` and the given `xform` transducer was silently
  ignored in that case...
- add test
  • Loading branch information
postspectacular committed Apr 28, 2020
1 parent f495264 commit 20ce586
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
42 changes: 20 additions & 22 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class Subscription<A, B>
default:
illegalArity(args.length);
}
if (implementsFunction(sub!, "subscribe")) {
if (implementsFunction(sub!, "subscribe") && !opts.xform) {
sub!.parent = this;
} else {
// FIXME inherit options from this sub or defaults?
Expand Down Expand Up @@ -226,19 +226,17 @@ export class Subscription<A, B>
this.cleanup();
return res;
}
if (this.subs) {
LOGGER.debug(this.id, "unsub child", sub.id);
const idx = this.subs.indexOf(sub);
if (idx >= 0) {
this.subs.splice(idx, 1);
if (
this.closeOut === CloseMode.FIRST ||
(!this.subs.length && this.closeOut !== CloseMode.NEVER)
) {
this.unsubscribe();
}
return true;
LOGGER.debug(this.id, "unsub child", sub.id);
const idx = this.subs.indexOf(sub);
if (idx >= 0) {
this.subs.splice(idx, 1);
if (
this.closeOut === CloseMode.FIRST ||
(!this.subs.length && this.closeOut !== CloseMode.NEVER)
) {
this.unsubscribe();
}
return true;
}
return false;
}
Expand All @@ -252,9 +250,7 @@ export class Subscription<A, B>
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
}
if (isReduced(acc)) {
this.done();
}
isReduced(acc) && this.done();
} else {
this.dispatch(<any>x);
}
Expand All @@ -273,7 +269,7 @@ export class Subscription<A, B>
}
}
this.state = State.DONE;
for (let s of [...this.subs]) {
for (let s of this.subs.slice()) {
try {
s.done && s.done();
} catch (e) {
Expand All @@ -287,9 +283,10 @@ export class Subscription<A, B>

error(e: any) {
this.state = State.ERROR;
const subs = this.subs;
let notified = false;
if (this.subs && this.subs.length) {
for (let s of this.subs.slice()) {
if (subs.length) {
for (let s of subs.slice()) {
if (s.error) {
s.error(e);
notified = true;
Expand All @@ -316,17 +313,18 @@ export class Subscription<A, B>
// LOGGER.debug(this.id, "dispatch", x);
this.cacheLast && (this.last = x);
const subs = this.subs;
let n = subs.length;
let s: ISubscriber<B>;
if (subs.length === 1) {
if (n === 1) {
s = subs[0];
try {
s.next && s.next(x);
} catch (e) {
s.error ? s.error(e) : this.error(e);
}
} else {
for (let i = subs.length; --i >= 0; ) {
s = subs[i];
for (; --n >= 0; ) {
s = subs[n];
try {
s.next && s.next(x);
} catch (e) {
Expand Down
17 changes: 15 additions & 2 deletions packages/rstream/test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Stream,
fromIterableSync,
CloseMode,
subscription,
} from "../src/index";
import { TIMEOUT } from "./config";

Expand Down Expand Up @@ -128,8 +129,20 @@ describe("Subscription", () => {
buf.push(x);
},
},
map((x: number) => x + 10),
undefined
map((x: number) => x + 10)
);
assert.deepEqual(buf, [11]);
});

it("child sub w/ xform", () => {
let buf: any[] = [];
fromIterableSync([1], { closeIn: CloseMode.NEVER }).subscribe(
subscription({
next(x) {
buf.push(x);
},
}),
map((x: number) => x + 10)
);
assert.deepEqual(buf, [11]);
});
Expand Down

0 comments on commit 20ce586

Please sign in to comment.