Skip to content

Commit

Permalink
refactor(rstream): major pkg restructure
Browse files Browse the repository at this point in the history
- flatten /src folder for easier imports in userland
- move logging from api.ts => logger.ts
- rename internal helpers (__nextID, __optsWithID, defWorker, ...)
- update imports
  • Loading branch information
postspectacular committed Sep 20, 2021
1 parent 18ae2f7 commit 831c113
Show file tree
Hide file tree
Showing 40 changed files with 268 additions and 261 deletions.
135 changes: 66 additions & 69 deletions packages/rstream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"scripts": {
"build": "yarn clean && tsc --declaration",
"build:check": "tsc --isolatedModules --noEmit",
"clean": "rimraf *.js *.d.ts *.map doc from subs utils",
"clean": "rimraf *.js *.d.ts *.map doc",
"doc": "typedoc --excludePrivate --excludeInternal --out doc src/index.ts",
"doc:ae": "mkdir -p .ae/doc .ae/temp && node_modules/.bin/api-extractor run --local --verbose",
"doc:readme": "../../scripts/node-esm ../../tools/src/readme.ts",
Expand Down Expand Up @@ -69,10 +69,7 @@
},
"files": [
"*.js",
"*.d.ts",
"from",
"subs",
"utils"
"*.d.ts"
],
"exports": {
".": {
Expand All @@ -81,107 +78,107 @@
"./api": {
"import": "./api.js"
},
"./checks": {
"import": "./checks.js"
"./asidechain": {
"import": "./asidechain.js"
},
"./forkjoin": {
"import": "./forkjoin.js"
"./atom": {
"import": "./atom.js"
},
"./bisect": {
"import": "./bisect.js"
},
"./from/atom": {
"import": "./from/atom.js"
"./checks": {
"import": "./checks.js"
},
"./from/event": {
"import": "./from/event.js"
"./debounce": {
"import": "./debounce.js"
},
"./from/interval": {
"import": "./from/interval.js"
"./defworker": {
"import": "./defworker.js"
},
"./from/iterable": {
"import": "./from/iterable.js"
"./event": {
"import": "./event.js"
},
"./from/object": {
"import": "./from/object.js"
"./forkjoin": {
"import": "./forkjoin.js"
},
"./from/promise": {
"import": "./from/promise.js"
"./idgen": {
"import": "./idgen.js"
},
"./from/promises": {
"import": "./from/promises.js"
"./interval": {
"import": "./interval.js"
},
"./from/raf": {
"import": "./from/raf.js"
"./iterable": {
"import": "./iterable.js"
},
"./from/view": {
"import": "./from/view.js"
"./logger": {
"import": "./logger.js"
},
"./from/worker": {
"import": "./from/worker.js"
"./merge": {
"import": "./merge.js"
},
"./metastream": {
"import": "./metastream.js"
},
"./pubsub": {
"import": "./pubsub.js"
},
"./stream-merge": {
"import": "./stream-merge.js"
"./object": {
"import": "./object.js"
},
"./stream-sync": {
"import": "./stream-sync.js"
"./post-worker": {
"import": "./post-worker.js"
},
"./stream": {
"import": "./stream.js"
"./promise": {
"import": "./promise.js"
},
"./subs/asidechain": {
"import": "./subs/asidechain.js"
"./promises": {
"import": "./promises.js"
},
"./subs/bisect": {
"import": "./subs/bisect.js"
"./pubsub": {
"import": "./pubsub.js"
},
"./subs/debounce": {
"import": "./subs/debounce.js"
"./raf": {
"import": "./raf.js"
},
"./subs/post-worker": {
"import": "./subs/post-worker.js"
"./resolve": {
"import": "./resolve.js"
},
"./subs/resolve": {
"import": "./subs/resolve.js"
"./sidechain-partition": {
"import": "./sidechain-partition.js"
},
"./subs/sidechain-partition": {
"import": "./subs/sidechain-partition.js"
"./sidechain-toggle": {
"import": "./sidechain-toggle.js"
},
"./subs/sidechain-toggle": {
"import": "./subs/sidechain-toggle.js"
"./stream": {
"import": "./stream.js"
},
"./subs/timeout": {
"import": "./subs/timeout.js"
"./subscription": {
"import": "./subscription.js"
},
"./subs/trace": {
"import": "./subs/trace.js"
"./sync": {
"import": "./sync.js"
},
"./subs/transduce": {
"import": "./subs/transduce.js"
"./timeout": {
"import": "./timeout.js"
},
"./subs/tunnel": {
"import": "./subs/tunnel.js"
"./trace": {
"import": "./trace.js"
},
"./subscription": {
"import": "./subscription.js"
"./transduce": {
"import": "./transduce.js"
},
"./trigger": {
"import": "./trigger.js"
},
"./tunnel": {
"import": "./tunnel.js"
},
"./tween": {
"import": "./tween.js"
},
"./utils/checks": {
"import": "./utils/checks.js"
},
"./utils/idgen": {
"import": "./utils/idgen.js"
"./view": {
"import": "./view.js"
},
"./utils/worker": {
"import": "./utils/worker.js"
"./worker": {
"import": "./worker.js"
}
},
"thi.ng": {
Expand Down
6 changes: 0 additions & 6 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import type { Fn, Fn0, IDeref, IID } from "@thi.ng/api";
import type { ILogger } from "@thi.ng/logger";
import { NULL_LOGGER } from "@thi.ng/logger/null";
import type { Transducer } from "@thi.ng/transducers";
import type { Stream } from "./stream";

Expand Down Expand Up @@ -234,7 +232,3 @@ export interface IStream<T> extends ISubscriber<T> {

export type StreamCancel = () => void;
export type StreamSource<T> = (sub: Stream<T>) => StreamCancel | void;

export let LOGGER = NULL_LOGGER;

export const setLogger = (logger: ILogger) => (LOGGER = logger);
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { CommonOpts, ISubscription } from "../api";
import { Subscription } from "../subscription";
import type { CommonOpts, ISubscription } from "./api";
import { Subscription } from "./subscription";

/**
* Abstract base class for sidechained subscription types (e.g.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Predicate2 } from "@thi.ng/api";
import type { ReadonlyAtom } from "@thi.ng/atom";
import type { CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";
import type { CommonOpts } from "./api";
import { __optsWithID } from "./idgen";
import { stream } from "./stream";

export interface FromAtomOpts<T> extends CommonOpts {
/**
Expand Down Expand Up @@ -55,13 +55,13 @@ export interface FromAtomOpts<T> extends CommonOpts {
export const fromAtom = <T>(
atom: ReadonlyAtom<T>,
opts?: Partial<FromAtomOpts<T>>
): Stream<T> => {
opts = optsWithID("atom", <FromAtomOpts<T>>{
) => {
opts = __optsWithID("atom", <FromAtomOpts<T>>{
emitFirst: true,
changed: (a, b) => a !== b,
...opts,
});
return new Stream<T>((stream) => {
return stream<T>((stream) => {
atom.addWatch(stream.id, (_, prev, curr) => {
if (opts!.changed!(prev, curr)) {
stream.next(curr);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Predicate } from "@thi.ng/api";
import type { ISubscriber } from "../api";
import { PubSub } from "../pubsub";
import { nextID } from "../utils/idgen";
import type { ISubscriber } from "./api";
import { __nextID } from "./idgen";
import { PubSub } from "./pubsub";

/**
* Returns a {@link PubSub} using given predicate `pred` as boolean
Expand Down Expand Up @@ -52,7 +52,7 @@ export const bisect = <T>(
truthy?: ISubscriber<T>,
falsy?: ISubscriber<T>
): PubSub<T, T> => {
const sub = new PubSub<T, T>({ topic: pred, id: `bisect-${nextID()}` });
const sub = new PubSub<T, T>({ topic: pred, id: `bisect-${__nextID()}` });
truthy && sub.subscribeTopic(true, truthy);
falsy && sub.subscribeTopic(false, falsy);
return sub;
Expand Down
10 changes: 9 additions & 1 deletion packages/rstream/src/checks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { implementsFunction } from "@thi.ng/checks/implements-function";
import type { ISubscribable } from "./api";
import { CloseMode, ISubscribable } from "./api";

export const isSubscribable = (x: any): x is ISubscribable<any> =>
implementsFunction(x, "subscribe");

/**
* Returns true if mode is FIRST, or if mode is LAST *and* `num = 0`.
*
* @internal
*/
export const isFirstOrLastInput = (mode: CloseMode, num: number) =>
mode === CloseMode.FIRST || (mode === CloseMode.LAST && !num);
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { fromIterable } from "../from/iterable";
import { metaStream, MetaStreamOpts } from "../metastream";
import { optsWithID } from "../utils/idgen";
import { __optsWithID } from "./idgen";
import { fromIterable } from "./iterable";
import { metaStream, MetaStreamOpts } from "./metastream";

/**
* Returns a subscription which buffers any intermediate inputs arriving faster
Expand All @@ -19,7 +19,7 @@ import { optsWithID } from "../utils/idgen";
export const debounce = <T>(delay: number, opts?: Partial<MetaStreamOpts>) =>
metaStream(
(x: T) => fromIterable([x], { delay }),
optsWithID("debounce", {
__optsWithID("debounce", {
emitLast: true,
...opts,
})
Expand Down
9 changes: 9 additions & 0 deletions packages/rstream/src/defworker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const defInlineWorker = (src: string) =>
defWorker(new Blob([src], { type: "text/javascript" }));

export const defWorker = (worker: Worker | string | Blob) =>
worker instanceof Worker
? worker
: new Worker(
worker instanceof Blob ? URL.createObjectURL(worker) : worker
);
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CommonOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";
import type { CommonOpts } from "./api";
import { __optsWithID } from "./idgen";
import { stream, Stream } from "./stream";

/**
* Creates a {@link Stream} of events attached to given element / event
Expand All @@ -18,11 +18,11 @@ export const fromEvent = (
listenerOpts: boolean | AddEventListenerOptions = false,
streamOpts?: Partial<CommonOpts>
) =>
new Stream<Event>((stream) => {
stream<Event>((stream) => {
let listener = (e: Event) => stream.next(e);
src.addEventListener(name, listener, listenerOpts);
return () => src.removeEventListener(name, listener, listenerOpts);
}, optsWithID(`event-${name}`, streamOpts));
}, __optsWithID(`event-${name}`, streamOpts));

/**
* Same as {@link fromEvent}, however only supports well-known DOM event
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { mapcat } from "@thi.ng/transducers/mapcat";
import { range } from "@thi.ng/transducers/range";
import { transduce } from "@thi.ng/transducers/transduce";
import type { CommonOpts, ISubscribable, ITransformable } from "./api";
import { sync } from "./stream-sync";
import { tunnel } from "./subs/tunnel";
import type { Subscription } from "./subscription";
import { sync } from "./sync";
import { tunnel } from "./tunnel";

export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
/**
Expand Down
19 changes: 19 additions & 0 deletions packages/rstream/src/idgen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { CommonOpts } from "./api";

let NEXT_ID = 0;

export const __nextID = () => NEXT_ID++;

/**
* @param prefix
* @param opts
*
* @internal
*/
export const __optsWithID = <T extends CommonOpts>(
prefix: string,
opts?: Partial<T>
) =>
<Partial<T>>(
(!opts || !opts.id ? { ...opts, id: prefix + "-" + __nextID() } : opts)
);
Loading

0 comments on commit 831c113

Please sign in to comment.