Skip to content

Commit

Permalink
refactor(rstream): add/update optsWithID() handling
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Aug 17, 2019
1 parent 7289d17 commit f0d7f87
Show file tree
Hide file tree
Showing 20 changed files with 46 additions and 40 deletions.
2 changes: 1 addition & 1 deletion packages/rstream/src/from/atom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ export const fromAtom = <T>(
});
emitFirst && stream.next(atom.deref());
return () => atom.removeWatch(stream.id);
}, optsWithID("atom-", opts));
}, optsWithID("atom", opts));
2 changes: 1 addition & 1 deletion packages/rstream/src/from/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const fromEvent = (
let listener = (e: Event) => stream.next(e);
src.addEventListener(name, listener, listenerOpts);
return () => src.removeEventListener(name, listener, listenerOpts);
}, optsWithID(`event-${name}-`, streamOpts));
}, optsWithID(`event-${name}`, streamOpts));

/**
* Same as `fromEvent`, however only supports well-known DOM event
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/from/interval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ export const fromInterval = (
}
}, delay);
return () => clearInterval(id);
}, optsWithID("interval-", opts));
}, optsWithID("interval", opts));
4 changes: 2 additions & 2 deletions packages/rstream/src/from/iterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const fromIterable = <T>(
}
}, delay);
return () => clearInterval(id);
}, optsWithID("iterable-", opts));
}, optsWithID("iterable", opts));

/**
* Creates a new `Stream` of given iterable which synchronously calls
Expand All @@ -52,4 +52,4 @@ export const fromIterableSync = <T>(
stream.next(s);
}
stream.closeIn !== CloseMode.NEVER && stream.done();
}, optsWithID("iterable-sync-", opts));
}, optsWithID("iterable-sync", opts));
2 changes: 1 addition & 1 deletion packages/rstream/src/from/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ export const fromPromise = <T>(src: Promise<T>, opts?: Partial<CommonOpts>) => {
return () => {
canceled = true;
};
}, optsWithID("promise-", opts));
}, optsWithID("promise", opts));
};
2 changes: 1 addition & 1 deletion packages/rstream/src/from/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ export const fromPromises = <T>(
promises: Iterable<Promise<T>>,
opts?: Partial<CommonOpts>
): Subscription<T[], T> =>
fromPromise(Promise.all(promises), optsWithID("promises-", opts)).transform(
fromPromise(Promise.all(promises), optsWithID("promises", opts)).transform(
mapcat((x: T[]) => x)
);
2 changes: 1 addition & 1 deletion packages/rstream/src/from/raf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ export const fromRAF = (opts?: Partial<CommonOpts>) =>
isActive = false;
cancelAnimationFrame(id);
};
}, optsWithID("raf-", opts));
}, optsWithID("raf", opts));
2 changes: 1 addition & 1 deletion packages/rstream/src/from/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ export const fromView = <T>(
isActive = false;
view.release();
};
}, optsWithID("view-", opts));
}, optsWithID("view", opts));
2 changes: 1 addition & 1 deletion packages/rstream/src/from/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ export const fromWorker = <T>(
_worker.terminate();
}
};
}, optsWithID("worker-", opts));
}, optsWithID("worker", opts));
};
2 changes: 1 addition & 1 deletion packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class MetaStream<A, B> extends Subscription<A, B> {
factory: Fn<A, Subscription<B, B>>,
opts?: Partial<CommonOpts>
) {
super(undefined, optsWithID("metastram-", opts));
super(undefined, optsWithID("metastram", opts));
this.factory = factory;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class PubSub<A, B> extends Subscription<A, B> {
opts = opts || <PubSubOpts<A, B>>{};
super(
undefined,
optsWithID("pubsub-", <Partial<SubscriptionOpts<A, B>>>{
optsWithID("pubsub", <Partial<SubscriptionOpts<A, B>>>{
xform: opts.xform
})
);
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class StreamMerge<A, B> extends Subscription<A, B> {

constructor(opts?: Partial<StreamMergeOpts<A, B>>) {
opts = opts || {};
super(undefined, optsWithID("streammerge-", opts));
super(undefined, optsWithID("streammerge", opts));
this.sources = new Map();
opts.src && this.addAll(opts.src);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
}
super(
undefined,
optsWithID("streamsync-", <Partial<StreamSyncOpts<any, any>>>{
optsWithID("streamsync", <Partial<StreamSyncOpts<any, any>>>{
...opts,
xform
})
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
// prettier-ignore
constructor(src?: StreamSource<T> | Partial<CommonOpts>, opts?: Partial<CommonOpts>) {
const [_src, _opts] = isFunction(src) ? [src, opts] : [undefined, src];
super(undefined, optsWithID("stream-", _opts));
super(undefined, optsWithID("stream", _opts));
this.src = _src;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/subs/resolve.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Fn, IID } from "@thi.ng/api";
import { LOGGER, State } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

export interface ResolverOpts extends IID<string> {
/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export class Resolver<T> extends Subscription<Promise<T>, T> {
protected fail?: Fn<any, void>;

constructor(opts: Partial<ResolverOpts> = {}) {
super(undefined, { id: opts.id || `resolve-${nextID()}` });
super(undefined, optsWithID("resolve"));
this.fail = opts.fail;
}

Expand Down
18 changes: 11 additions & 7 deletions packages/rstream/src/subs/sidechain-partition.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Predicate } from "@thi.ng/api";
import { ISubscribable, State } from "../api";
import { CommonOpts, ISubscribable, State } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Buffers values from `src` until side chain fires, then emits buffer
Expand All @@ -25,20 +25,24 @@ import { nextID } from "../utils/idgen";
*
* @param side
* @param pred
* @param id
* @param opts
*/
export const sidechainPartition = <A, B>(
side: ISubscribable<B>,
pred?: Predicate<B>,
id?: string
): Subscription<A, A[]> => new SidechainPartition<A, B>(side, pred, id);
opts?: Partial<CommonOpts>
): Subscription<A, A[]> => new SidechainPartition<A, B>(side, pred, opts);

export class SidechainPartition<A, B> extends Subscription<A, A[]> {
sideSub: Subscription<B, B>;
buf: A[];

constructor(side: ISubscribable<B>, pred?: Predicate<B>, id?: string) {
super(undefined, { id: id || `sidepart-${nextID()}` });
constructor(
side: ISubscribable<B>,
pred?: Predicate<B>,
opts?: Partial<CommonOpts>
) {
super(undefined, optsWithID("sidepart", opts));
this.buf = [];
const $this = this;
pred = pred || (() => true);
Expand Down
14 changes: 7 additions & 7 deletions packages/rstream/src/subs/sidechain-toggle.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Predicate } from "@thi.ng/api";
import { ISubscribable } from "../api";
import { CommonOpts, ISubscribable } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Filters values from input based on values received from side chain.
Expand All @@ -27,14 +27,14 @@ import { nextID } from "../utils/idgen";
* @param side
* @param pred
* @param initial initial switch state
* @param id
* @param opts
*/
export const sidechainToggle = <A, B>(
side: ISubscribable<B>,
initial = true,
pred?: Predicate<B>,
id?: string
): Subscription<A, A> => new SidechainToggle(side, initial, pred, id);
opts?: Partial<CommonOpts>
): Subscription<A, A> => new SidechainToggle(side, initial, pred, opts);

export class SidechainToggle<A, B> extends Subscription<A, A> {
sideSub: Subscription<B, B>;
Expand All @@ -44,9 +44,9 @@ export class SidechainToggle<A, B> extends Subscription<A, A> {
side: ISubscribable<B>,
initial = true,
pred?: Predicate<B>,
id?: string
opts?: Partial<CommonOpts>
) {
super(undefined, { id: id || `sidetoggle-${nextID()}` });
super(undefined, optsWithID("sidetoggle", opts));
this.isActive = initial;
const $this = this;
pred = pred || (() => true);
Expand Down
14 changes: 7 additions & 7 deletions packages/rstream/src/subs/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { State } from "../api";
import { CommonOpts, State } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* A subscription that emits an arbitrary error object after a given
Expand All @@ -13,14 +13,14 @@ import { nextID } from "../utils/idgen";
* @param timeoutMs timeout period in milliseconds
* @param error error object
* @param resetTimeout timeout reset flag
* @param id subscription id
* @param opts
*/
export const timeout = <T>(
timeoutMs: number,
error?: any,
resetTimeout = false,
id?: string
): Subscription<T, T> => new Timeout(timeoutMs, error, resetTimeout, id);
opts?: Partial<CommonOpts>
): Subscription<T, T> => new Timeout(timeoutMs, error, resetTimeout, opts);

class Timeout<T> extends Subscription<T, T> {
protected timeoutMs: number;
Expand All @@ -32,9 +32,9 @@ class Timeout<T> extends Subscription<T, T> {
timeoutMs: number,
error?: any,
resetTimeout = false,
id?: string
opts?: Partial<CommonOpts>
) {
super(undefined, { id: id || `timeout-${nextID()}` });
super(undefined, optsWithID("timeout", opts));
this.timeoutMs = timeoutMs;
this.errorObj = error;
this.resetTimeout = resetTimeout;
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import { optsWithID } from "./utils/idgen";
export function trigger(): Stream<boolean>;
export function trigger<T>(x: T): Stream<T>;
export function trigger(x: any = true) {
return fromIterable([x], 0, optsWithID("trigger-"));
return fromIterable([x], 0, optsWithID("trigger"));
}
4 changes: 3 additions & 1 deletion packages/rstream/src/utils/idgen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ export const optsWithID = <T extends CommonOpts>(
prefix: string,
opts?: Partial<T>
) =>
<Partial<T>>(!opts || !opts.id ? { ...opts, id: prefix + nextID() } : opts);
<Partial<T>>(
(!opts || !opts.id ? { ...opts, id: prefix + "-" + nextID() } : opts)
);

0 comments on commit f0d7f87

Please sign in to comment.