Skip to content

Commit

Permalink
perf(rstream): support (re)named StreamSync inputs
Browse files Browse the repository at this point in the history
- update StreamSyncOpts
- allow objects as `src` option, use object keys as input IDs
- update `add()`, add optional ID arg
- add various maps to handle real vs. alias IDs
  • Loading branch information
postspectacular committed Apr 24, 2018
1 parent 66ec92f commit b392817
Showing 1 changed file with 60 additions and 18 deletions.
78 changes: 60 additions & 18 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { IObjectOf, IID } from "@thi.ng/api/api";
import { isPlainObject } from "@thi.ng/checks/is-plain-object";
import { Transducer } from "@thi.ng/transducers/api";
import { comp } from "@thi.ng/transducers/func/comp";
import { labeled } from "@thi.ng/transducers/xform/labeled";
Expand All @@ -9,7 +10,7 @@ import { ISubscribable, State } from "./api";
import { Subscription } from "./subscription";

export interface StreamSyncOpts<A, B> extends IID<string> {
src: ISubscribable<A>[];
src: ISubscribable<A>[] | IObjectOf<ISubscribable<A>>;
xform: Transducer<IObjectOf<A>, B>;
reset: boolean;
all: boolean;
Expand Down Expand Up @@ -58,7 +59,26 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
*/
export class StreamSync<A, B> extends Subscription<A, B> {

/**
* maps actual inputs to their virtual input subs
*/
sources: Map<ISubscribable<A>, Subscription<A, [string, A]>>;
/**
* maps real source IDs to their actual input
*/
idSources: Map<string, ISubscribable<A>>;
/**
* maps (potentially aliased) input IDs to their actual src.id
*/
realSourceIDs: Map<string, string>;
/**
* maps real src.id to (potentially aliased) input IDs
*/
invRealSourceIDs: Map<string, string>;
/**
* set of (potentially aliased) input IDs
* these IDs are used to label inputs in result tuple
*/
sourceIDs: Set<string>;
autoClose: boolean;

Expand All @@ -73,16 +93,23 @@ export class StreamSync<A, B> extends Subscription<A, B> {
}
super(null, xform, null, opts.id || `streamsync-${Subscription.NEXT_ID++}`);
this.sources = new Map();
this.realSourceIDs = new Map();
this.invRealSourceIDs = new Map();
this.idSources = new Map();
this.sourceIDs = srcIDs;
this.autoClose = opts.close !== false;
if (opts.src) {
this.addAll(opts.src);
}
}

add(src: ISubscribable<A>) {
add(src: ISubscribable<A>, id?: string) {
id || (id = src.id);
this.ensureState();
this.sourceIDs.add(src.id);
this.sourceIDs.add(id);
this.realSourceIDs.set(id, src.id);
this.invRealSourceIDs.set(src.id, id);
this.idSources.set(src.id, src);
this.sources.set(
src,
src.subscribe(
Expand All @@ -97,26 +124,39 @@ export class StreamSync<A, B> extends Subscription<A, B> {
done: () => this.markDone(src),
__owner: this
},
labeled<string, A>(src.id),
`in-${src.id}`
labeled<string, A>(id),
`in-${id}`
)
);
}

addAll(src: ISubscribable<A>[]) {
// pre-add all source ids for partitionSync
for (let s of src) {
this.sourceIDs.add(s.id);
}
for (let s of src) {
this.add(s);
addAll(src: ISubscribable<A>[] | IObjectOf<ISubscribable<A>>) {
if (isPlainObject(src)) {
// pre-add all source ids for partitionSync
for (let id in src) {
this.sourceIDs.add(id);
}
for (let id in src) {
this.add(src[id], id);
}
} else {
// pre-add all source ids for partitionSync
for (let s of <ISubscribable<A>[]>src) {
this.sourceIDs.add(s.id);
}
for (let s of src) {
this.add(s);
}
}
}

remove(src: ISubscribable<A>) {
const sub = this.sources.get(src);
if (sub) {
this.sourceIDs.delete(src.id);
const id = this.invRealSourceIDs.get(src.id);
this.sourceIDs.delete(id);
this.realSourceIDs.delete(id);
this.idSources.delete(src.id);
this.sources.delete(src);
sub.unsubscribe();
return true;
Expand All @@ -125,18 +165,17 @@ export class StreamSync<A, B> extends Subscription<A, B> {
}

removeID(id: string) {
for (let s of this.sources) {
if (s[0].id === id) {
return this.remove(s[0]);
}
const src = this.idSources.get(this.realSourceIDs.get(id));
if (src) {
return this.remove(src);
}
return false;
}

removeAll(src: ISubscribable<A>[]) {
// pre-remove all source ids for partitionSync
for (let s of src) {
this.sourceIDs.delete(s.id);
this.sourceIDs.delete(this.invRealSourceIDs.get(s.id));
}
let ok = true;
for (let s of src) {
Expand All @@ -161,6 +200,9 @@ export class StreamSync<A, B> extends Subscription<A, B> {
this.state = State.DONE;
this.sources.clear();
this.sourceIDs.clear();
this.realSourceIDs.clear();
this.invRealSourceIDs.clear();
this.idSources.clear();
}
return super.unsubscribe(sub);
}
Expand Down

0 comments on commit b392817

Please sign in to comment.