Skip to content

Commit

Permalink
refactor(rstream): simplify Subscription, update all impls
Browse files Browse the repository at this point in the history
- use Set for storing child subs
- update Stream, StreamMerge, StreamSync
- only clean sources in StreamMerge/Sync.unsubscribe()
  • Loading branch information
postspectacular committed Mar 20, 2018
1 parent 64c1625 commit 47b6a92
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
4 changes: 2 additions & 2 deletions packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ export class StreamMerge<A, B> extends Subscription<A, B> {
s.unsubscribe();
}
this.state = State.DONE;
delete this.sources;
this.sources.clear();
return true;
}
if (super.unsubscribe(sub)) {
if (!this.subs.length) {
if (!this.subs.size) {
return this.unsubscribe();
}
return true;
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ export class StreamSync<A, B> extends Subscription<A, B> {
s.unsubscribe();
}
this.state = State.DONE;
delete this.sources;
this.sources.clear();
return true;
}
if (super.unsubscribe(sub)) {
if (!this.subs.length) {
if (!this.subs.size) {
return this.unsubscribe();
}
return true;
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ export class Stream<T> extends Subscription<T, T>
subscribe<C>(sub: Partial<ISubscriber<C>>, xform: Transducer<T, C>, id?: string): Subscription<T, C>
subscribe(...args: any[]) {
const wrapped = super.subscribe.apply(this, args);
if (this.subs.length === 1) {
if (this.subs.size === 1) {
this._cancel = (this.src && this.src(this)) || (() => void 0);
}
return wrapped;
}

unsubscribe(sub?: Subscription<T, any>) {
const res = super.unsubscribe(sub);
if (res && (!this.subs || !this.subs.length)) {
if (res && (!this.subs || !this.subs.size)) {
this.done();
}
return res;
Expand Down
44 changes: 22 additions & 22 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ export class Subscription<A, B> implements
id: string;

protected parent: ISubscribable<A>;
protected subs: ISubscriber<B>[] = [];
protected subs: Set<ISubscriber<B>>;
protected xform: Reducer<B[], A>;
protected state: State = State.IDLE;

constructor(sub?: ISubscriber<B>, xform?: Transducer<A, B>, parent?: ISubscribable<A>, id?: string) {
this.parent = parent;
this.id = id || `sub-${Subscription.NEXT_ID++}`;
this.subs = new Set();
if (sub) {
this.subs.push(<ISubscriber<B>>sub);
this.subs.add(<ISubscriber<B>>sub);
}
if (xform) {
this.xform = xform(push());
Expand Down Expand Up @@ -86,9 +87,8 @@ export class Subscription<A, B> implements
}
if (this.subs) {
DEBUG && console.log(this.id, "unsub", sub.id);
const idx = this.subs.indexOf(sub);
if (idx >= 0) {
this.subs.splice(idx, 1);
if (this.subs.has(sub)) {
this.subs.delete(sub);
return true;
}
return false;
Expand All @@ -97,19 +97,21 @@ export class Subscription<A, B> implements
}

next(x: A) {
this.ensureState();
if (this.xform) {
const acc = this.xform[2]([], x);
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
}
if (isReduced(acc)) {
this.done();
// this.ensureState();
if (this.state < State.DONE) {
if (this.xform) {
const acc = this.xform[2]([], x);
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
}
if (isReduced(acc)) {
this.done();
}
} else {
this.dispatch(<any>x);
}
} else {
this.dispatch(<any>x);
}
}

Expand Down Expand Up @@ -138,7 +140,7 @@ export class Subscription<A, B> implements
error(e: any) {
this.state = State.ERROR;
let notified = false;
if (this.subs && this.subs.length) {
if (this.subs && this.subs.size) {
for (let s of [...this.subs]) {
if (s.error) {
s.error(e);
Expand All @@ -156,15 +158,13 @@ export class Subscription<A, B> implements
}

protected addWrapped(wrapped: Subscription<any, any>) {
this.subs.push(wrapped);
this.subs.add(wrapped);
this.state = State.ACTIVE;
return wrapped;
}

protected dispatch(x: B) {
let subs = this.subs;
for (let i = 0, n = subs.length; i < n; i++) {
const s = subs[i];
for (let s of this.subs) {
try {
s.next && s.next(x);
} catch (e) {
Expand Down

0 comments on commit 47b6a92

Please sign in to comment.