From 20ce586fdc0b4f9633f93581f7480572334317f1 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Tue, 28 Apr 2020 02:10:59 +0100 Subject: [PATCH] fix(rstream): subscribe() w/ xform, add test - fixes `.subscribe(sub, xform)` where child `sub` was an actual instance of `Subscription` and the given `xform` transducer was silently ignored in that case... - add test --- packages/rstream/src/subscription.ts | 42 +++++++++++++-------------- packages/rstream/test/subscription.ts | 17 +++++++++-- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/packages/rstream/src/subscription.ts b/packages/rstream/src/subscription.ts index 875471f8f1..13a1bc1f8d 100644 --- a/packages/rstream/src/subscription.ts +++ b/packages/rstream/src/subscription.ts @@ -157,7 +157,7 @@ export class Subscription default: illegalArity(args.length); } - if (implementsFunction(sub!, "subscribe")) { + if (implementsFunction(sub!, "subscribe") && !opts.xform) { sub!.parent = this; } else { // FIXME inherit options from this sub or defaults? @@ -226,19 +226,17 @@ export class Subscription this.cleanup(); return res; } - if (this.subs) { - LOGGER.debug(this.id, "unsub child", sub.id); - const idx = this.subs.indexOf(sub); - if (idx >= 0) { - this.subs.splice(idx, 1); - if ( - this.closeOut === CloseMode.FIRST || - (!this.subs.length && this.closeOut !== CloseMode.NEVER) - ) { - this.unsubscribe(); - } - return true; + LOGGER.debug(this.id, "unsub child", sub.id); + const idx = this.subs.indexOf(sub); + if (idx >= 0) { + this.subs.splice(idx, 1); + if ( + this.closeOut === CloseMode.FIRST || + (!this.subs.length && this.closeOut !== CloseMode.NEVER) + ) { + this.unsubscribe(); } + return true; } return false; } @@ -252,9 +250,7 @@ export class Subscription for (let i = 0; i < n; i++) { this.dispatch(uacc[i]); } - if (isReduced(acc)) { - this.done(); - } + isReduced(acc) && this.done(); } else { this.dispatch(x); } @@ -273,7 +269,7 @@ export class Subscription } } this.state = State.DONE; - for (let s of [...this.subs]) { + for (let s of this.subs.slice()) { try { s.done && s.done(); } catch (e) { @@ -287,9 +283,10 @@ export class Subscription error(e: any) { this.state = State.ERROR; + const subs = this.subs; let notified = false; - if (this.subs && this.subs.length) { - for (let s of this.subs.slice()) { + if (subs.length) { + for (let s of subs.slice()) { if (s.error) { s.error(e); notified = true; @@ -316,8 +313,9 @@ export class Subscription // LOGGER.debug(this.id, "dispatch", x); this.cacheLast && (this.last = x); const subs = this.subs; + let n = subs.length; let s: ISubscriber; - if (subs.length === 1) { + if (n === 1) { s = subs[0]; try { s.next && s.next(x); @@ -325,8 +323,8 @@ export class Subscription s.error ? s.error(e) : this.error(e); } } else { - for (let i = subs.length; --i >= 0; ) { - s = subs[i]; + for (; --n >= 0; ) { + s = subs[n]; try { s.next && s.next(x); } catch (e) { diff --git a/packages/rstream/test/subscription.ts b/packages/rstream/test/subscription.ts index 34a96c524a..70ea63ec1c 100644 --- a/packages/rstream/test/subscription.ts +++ b/packages/rstream/test/subscription.ts @@ -6,6 +6,7 @@ import { Stream, fromIterableSync, CloseMode, + subscription, } from "../src/index"; import { TIMEOUT } from "./config"; @@ -128,8 +129,20 @@ describe("Subscription", () => { buf.push(x); }, }, - map((x: number) => x + 10), - undefined + map((x: number) => x + 10) + ); + assert.deepEqual(buf, [11]); + }); + + it("child sub w/ xform", () => { + let buf: any[] = []; + fromIterableSync([1], { closeIn: CloseMode.NEVER }).subscribe( + subscription({ + next(x) { + buf.push(x); + }, + }), + map((x: number) => x + 10) ); assert.deepEqual(buf, [11]); });