diff --git a/packages/rstream/src/subscription.ts b/packages/rstream/src/subscription.ts
index 875471f8f1..13a1bc1f8d 100644
--- a/packages/rstream/src/subscription.ts
+++ b/packages/rstream/src/subscription.ts
@@ -157,7 +157,7 @@ export class Subscription
default:
illegalArity(args.length);
}
- if (implementsFunction(sub!, "subscribe")) {
+ if (implementsFunction(sub!, "subscribe") && !opts.xform) {
sub!.parent = this;
} else {
// FIXME inherit options from this sub or defaults?
@@ -226,19 +226,17 @@ export class Subscription
this.cleanup();
return res;
}
- if (this.subs) {
- LOGGER.debug(this.id, "unsub child", sub.id);
- const idx = this.subs.indexOf(sub);
- if (idx >= 0) {
- this.subs.splice(idx, 1);
- if (
- this.closeOut === CloseMode.FIRST ||
- (!this.subs.length && this.closeOut !== CloseMode.NEVER)
- ) {
- this.unsubscribe();
- }
- return true;
+ LOGGER.debug(this.id, "unsub child", sub.id);
+ const idx = this.subs.indexOf(sub);
+ if (idx >= 0) {
+ this.subs.splice(idx, 1);
+ if (
+ this.closeOut === CloseMode.FIRST ||
+ (!this.subs.length && this.closeOut !== CloseMode.NEVER)
+ ) {
+ this.unsubscribe();
}
+ return true;
}
return false;
}
@@ -252,9 +250,7 @@ export class Subscription
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
}
- if (isReduced(acc)) {
- this.done();
- }
+ isReduced(acc) && this.done();
} else {
this.dispatch(x);
}
@@ -273,7 +269,7 @@ export class Subscription
}
}
this.state = State.DONE;
- for (let s of [...this.subs]) {
+ for (let s of this.subs.slice()) {
try {
s.done && s.done();
} catch (e) {
@@ -287,9 +283,10 @@ export class Subscription
error(e: any) {
this.state = State.ERROR;
+ const subs = this.subs;
let notified = false;
- if (this.subs && this.subs.length) {
- for (let s of this.subs.slice()) {
+ if (subs.length) {
+ for (let s of subs.slice()) {
if (s.error) {
s.error(e);
notified = true;
@@ -316,8 +313,9 @@ export class Subscription
// LOGGER.debug(this.id, "dispatch", x);
this.cacheLast && (this.last = x);
const subs = this.subs;
+ let n = subs.length;
let s: ISubscriber;
- if (subs.length === 1) {
+ if (n === 1) {
s = subs[0];
try {
s.next && s.next(x);
@@ -325,8 +323,8 @@ export class Subscription
s.error ? s.error(e) : this.error(e);
}
} else {
- for (let i = subs.length; --i >= 0; ) {
- s = subs[i];
+ for (; --n >= 0; ) {
+ s = subs[n];
try {
s.next && s.next(x);
} catch (e) {
diff --git a/packages/rstream/test/subscription.ts b/packages/rstream/test/subscription.ts
index 34a96c524a..70ea63ec1c 100644
--- a/packages/rstream/test/subscription.ts
+++ b/packages/rstream/test/subscription.ts
@@ -6,6 +6,7 @@ import {
Stream,
fromIterableSync,
CloseMode,
+ subscription,
} from "../src/index";
import { TIMEOUT } from "./config";
@@ -128,8 +129,20 @@ describe("Subscription", () => {
buf.push(x);
},
},
- map((x: number) => x + 10),
- undefined
+ map((x: number) => x + 10)
+ );
+ assert.deepEqual(buf, [11]);
+ });
+
+ it("child sub w/ xform", () => {
+ let buf: any[] = [];
+ fromIterableSync([1], { closeIn: CloseMode.NEVER }).subscribe(
+ subscription({
+ next(x) {
+ buf.push(x);
+ },
+ }),
+ map((x: number) => x + 10)
);
assert.deepEqual(buf, [11]);
});