From de48e13407fde97ade7abc24514ecd0adb1aa773 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Tue, 5 Jun 2018 14:47:08 +0100 Subject: [PATCH 01/10] build: add yarn bootstrap --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 1731618bbd..531a2faa09 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "webpack-dev-server": "^3.1.4" }, "scripts": { + "bootstrap": "lerna bootstrap", "build": "yarn install && lerna -v && lerna bootstrap && lerna run build --sort", "cover": "lerna run cover", "depgraph": "scripts/depgraph && git add assets/deps.png && git commit -m 'docs: update dep graph' && git push", From be21c4c20a5cb8e3885cfcb3501b876cb373900d Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 02:55:12 +0100 Subject: [PATCH 02/10] feat(rstream-graph): update NodeOutput, support multiple handlers - extract prepareNodeOutputs() --- packages/rstream-graph/src/api.ts | 5 ++++- packages/rstream-graph/src/graph.ts | 33 ++++++++++++++++++++++------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/packages/rstream-graph/src/api.ts b/packages/rstream-graph/src/api.ts index d0ad0272c2..f4224cb158 100644 --- a/packages/rstream-graph/src/api.ts +++ b/packages/rstream-graph/src/api.ts @@ -96,4 +96,7 @@ export interface NodeInput { xform?: Transducer; } -export type NodeOutput = Path | ((node: ISubscribable) => void); +export type NodeOutput = + Path | + ((node: ISubscribable) => void) | + IObjectOf) => void)>; diff --git a/packages/rstream-graph/src/graph.ts b/packages/rstream-graph/src/graph.ts index f74ae6cbb6..de68ef78ae 100644 --- a/packages/rstream-graph/src/graph.ts +++ b/packages/rstream-graph/src/graph.ts @@ -11,7 +11,7 @@ import { fromView } from "@thi.ng/rstream/from/view"; import { StreamSync, sync } from "@thi.ng/rstream/stream-sync"; import { Transducer } from "@thi.ng/transducers/api"; -import { GraphSpec, NodeFactory, NodeSpec } from "./api"; +import { GraphSpec, NodeFactory, NodeSpec, NodeOutput } from "./api"; /** * Dataflow graph initialization function. Takes an object of @@ -67,16 +67,33 @@ const nodeFromSpec = (state: IAtom, spec: NodeSpec, id: string) => (resolve src[id] = s; } const node = spec.fn(src, id); - if (spec.out) { - if (isFunction(spec.out)) { - spec.out(node); - } else { + prepareNodeOutputs(spec.out, node, state, id); + return node; +}; + +const prepareNodeOutputs = (out: NodeOutput, node: ISubscribable, state: IAtom, id: string) => { + if (out) { + if (isFunction(out)) { + out(node); + } + else if (isPlainObject(out)) { + for (let oid in out) { + const o = out[oid]; + if (isFunction(o)) { + o(node); + } else { + ((path, oid) => node.subscribe({ + next: (x) => state.resetIn(path, x[oid]) + }, `out-${id}-${oid}`))(o, oid); + } + } + } + else { ((path) => node.subscribe({ next: (x) => state.resetIn(path, x) - }, `out-${id}`))(spec.out); + }, `out-${id}`))(out); } } - return node; }; export const addNode = (graph: IObjectOf>, state: IAtom, id: string, spec: NodeSpec) => @@ -137,4 +154,4 @@ export const ensureInputs = (src: IObjectOf>, inputIDs: strin illegalArgs(`node "${nodeID}": missing input(s): ${missing.join(", ")}`); } } -}; +}; \ No newline at end of file From dc6e0acc9229444f2b95bd1e1320a9561a9019c7 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 12:00:52 +0100 Subject: [PATCH 03/10] refactor(resolve-map): export absPath(), add LookupPath type alias --- packages/resolve-map/src/index.ts | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/resolve-map/src/index.ts b/packages/resolve-map/src/index.ts index cb5b4eadbb..5c1b08cd53 100644 --- a/packages/resolve-map/src/index.ts +++ b/packages/resolve-map/src/index.ts @@ -7,6 +7,8 @@ import { getIn, mutIn } from "@thi.ng/paths"; const SEMAPHORE = Symbol("SEMAPHORE"); +export type LookupPath = PropertyKey[]; + /** * Visits all key-value pairs in depth-first order for given object or * array, expands any reference values, mutates the original object and @@ -63,13 +65,13 @@ const SEMAPHORE = Symbol("SEMAPHORE"); * * @param obj */ -export const resolveMap = (obj: any, root?: any, path: PropertyKey[] = [], resolved: any = {}) => { +export const resolveMap = (obj: any, root?: any, path: LookupPath = [], resolved: any = {}) => { root = root || obj; for (let k in obj) { _resolve(root, [...path, k], resolved); } return obj; -} +}; /** * Like `resolveMap`, but for arrays. @@ -79,15 +81,15 @@ export const resolveMap = (obj: any, root?: any, path: PropertyKey[] = [], resol * @param path * @param resolved */ -const resolveArray = (arr: any[], root?: any, path: PropertyKey[] = [], resolved: any = {}) => { +const resolveArray = (arr: any[], root?: any, path: LookupPath = [], resolved: any = {}) => { root = root || arr; for (let k = 0, n = arr.length; k < n; k++) { _resolve(root, [...path, k], resolved); } return arr; -} +}; -const _resolve = (root: any, path: PropertyKey[], resolved: any) => { +const _resolve = (root: any, path: LookupPath, resolved: any) => { let v = getIn(root, path), rv = SEMAPHORE; const pp = path.join("/"); if (!resolved[pp]) { @@ -107,9 +109,17 @@ const _resolve = (root: any, path: PropertyKey[], resolved: any) => { resolved[pp] = true; } return v; -} +}; -const absPath = (curr: PropertyKey[], q: string, idx = 1): PropertyKey[] => { +/** + * Takes the path for the current key and a lookup path string. Converts + * the possibly relative lookup path into its absolute form. + * + * @param curr + * @param q + * @param idx + */ +export const absPath = (curr: LookupPath, q: string, idx = 1): PropertyKey[] => { if (q.charAt(idx) === "/") { return q.substr(idx + 1).split("/"); } @@ -125,4 +135,4 @@ const absPath = (curr: PropertyKey[], q: string, idx = 1): PropertyKey[] => { } !curr.length && illegalArgs(`invalid lookup path`); return curr; -} +}; From f2e0df251fbd35e5687cf20b7c7eb940e4498bcd Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 13:38:30 +0100 Subject: [PATCH 04/10] feat(rstream-graph): add full/optional support for multiple node outputs BREAKING CHANGE: update NodeSpec format & graph initialization - add new types/interfaces - non-destructive initGraph() behavior - update & refactor nodeFromSpec() - update addNode/removeNode() - update tests & docs --- packages/rstream-graph/src/api.ts | 33 +++-- packages/rstream-graph/src/graph.ts | 200 +++++++++++++++++++-------- packages/rstream-graph/test/index.ts | 47 ++++++- 3 files changed, 203 insertions(+), 77 deletions(-) diff --git a/packages/rstream-graph/src/api.ts b/packages/rstream-graph/src/api.ts index f4224cb158..06e65229fa 100644 --- a/packages/rstream-graph/src/api.ts +++ b/packages/rstream-graph/src/api.ts @@ -7,7 +7,17 @@ import { Transducer } from "@thi.ng/transducers/api"; * A function which constructs and returns an `ISubscribable` using * given object of inputs and node ID. See `node()` and `node1()`. */ -export type NodeFactory = (src: IObjectOf>, id: string) => ISubscribable; +export type NodeFactory = (src: NodeInputs, id: string) => ISubscribable; + +export type NodeInputs = IObjectOf>; +export type NodeOutputs = IObjectOf>; +export type Graph = IObjectOf; + +export interface Node { + ins: NodeInputs; + outs: NodeOutputs; + node: ISubscribable; +} /** * A dataflow graph spec is simply an object where keys are node names @@ -18,8 +28,8 @@ export type NodeFactory = (src: IObjectOf>, id: string) => */ export type GraphSpec = IObjectOf< NodeSpec | - ISubscribable | - ((resolve: (path: string) => any) => ISubscribable)>; + Node | + ((resolve: (path: string) => any) => Node)>; /** * Specification for a single "node" in the dataflow graph. Nodes here @@ -33,13 +43,13 @@ export type GraphSpec = IObjectOf< * are implemented as `StreamSync` instances and the input IDs are used * to locally rename input streams within the `StreamSync` container. * - * See `initGraph` and `nodeFromSpec` for more details (in - * /src/nodes.ts) + * Alo see `initGraph` and `nodeFromSpec` (in /src/nodes.ts) for more + * details how these specs are compiled into stream constructs. */ export interface NodeSpec { fn: NodeFactory; - ins: IObjectOf; - out?: NodeOutput; + ins: IObjectOf; + outs?: IObjectOf; } /** @@ -88,7 +98,7 @@ export interface NodeSpec { * If the optional `xform` is given, a subscription with the transducer * is added to the input and then used as input instead. */ -export interface NodeInput { +export interface NodeInputSpec { id?: string; path?: Path; stream?: string | ((resolve) => ISubscribable); @@ -96,7 +106,6 @@ export interface NodeInput { xform?: Transducer; } -export type NodeOutput = - Path | - ((node: ISubscribable) => void) | - IObjectOf) => void)>; +export type NodeOutputSpec = Path | NodeOutputFn; + +export type NodeOutputFn = (node: ISubscribable, id: PropertyKey) => ISubscribable; diff --git a/packages/rstream-graph/src/graph.ts b/packages/rstream-graph/src/graph.ts index de68ef78ae..4c7a2de448 100644 --- a/packages/rstream-graph/src/graph.ts +++ b/packages/rstream-graph/src/graph.ts @@ -4,107 +4,185 @@ import { isFunction } from "@thi.ng/checks/is-function"; import { isPlainObject } from "@thi.ng/checks/is-plain-object"; import { isString } from "@thi.ng/checks/is-string"; import { illegalArgs } from "@thi.ng/errors/illegal-arguments"; -import { resolveMap } from "@thi.ng/resolve-map"; +import { getIn } from "@thi.ng/paths"; +import { absPath, resolveMap } from "@thi.ng/resolve-map"; import { ISubscribable } from "@thi.ng/rstream/api"; import { fromIterableSync } from "@thi.ng/rstream/from/iterable"; import { fromView } from "@thi.ng/rstream/from/view"; import { StreamSync, sync } from "@thi.ng/rstream/stream-sync"; import { Transducer } from "@thi.ng/transducers/api"; -import { GraphSpec, NodeFactory, NodeSpec, NodeOutput } from "./api"; +import { + Graph, + GraphSpec, + NodeFactory, + NodeInputs, + NodeInputSpec, + NodeOutputs, + NodeOutputSpec, + NodeSpec +} from "./api"; /** - * Dataflow graph initialization function. Takes an object of - * NodeSpec's, calls `nodeFromSpec` for each and then recursively - * resolves references via `@thi.ng/resolve-map/resolveMap`. Returns - * updated graph object (mutates in-place, original specs are replaced - * by stream constructs). + * Dataflow graph initialization function. Takes a state Atom (or `null` + * if not needed) and an object of `NodeSpec` values or functions + * returning `Node` objects. Calls `nodeFromSpec` for each spec and then + * recursively resolves references via thi.ng/resolve-map `resolveMap`. + * Returns new initialized graph object of `Node` objects and + * `@thi.ng/rstream` stream constructs. Does NOT mutate original + * `GraphSpec` object. * * @param state - * @param nodes + * @param spec */ -export const initGraph = (state: IAtom, nodes: GraphSpec): IObjectOf> => { - for (let id in nodes) { - const n = nodes[id]; - if (isPlainObject(n)) { - (nodes)[id] = nodeFromSpec(state, nodes[id], id); +export const initGraph = (state: IAtom, spec: GraphSpec): Graph => { + const res: Graph = {} + for (let id in spec) { + const n = spec[id]; + if (isNodeSpec(n)) { + res[id] = nodeFromSpec(state, spec[id], id); + } else { + res[id] = n; } } - return resolveMap(nodes); + return resolveMap(res); }; +const isNodeSpec = (x: any): x is NodeSpec => + isPlainObject(x) && isFunction((x).fn); + /** - * Transforms a single NodeSpec into a lookup function for `resolveMap` - * (which is called from `initGraph`). When that function is called, - * recursively resolves all specified input streams and calls this - * spec's `fn` to produce a new stream from these inputs. If the spec - * includes the optional `out` key, it also executes the provided - * function, or if the value is a string, adds a subscription to this - * node's result stream which then updates the provide state atom at the - * path defined by `out`. Returns an ISubscribable. + * Transforms a single `NodeSpec` into a lookup function for + * `resolveMap` (which is called from `initGraph`). When that function + * is called, recursively resolves all specified input streams and calls + * this spec's `fn` to produce a new stream from these inputs. + * + * If the spec includes the optional `outs` keys, it also creates the + * subscriptions for each of the given output keys, which then can be + * used as inputs by other nodes. Each value in the `outs` subspec can + * be a function or state path (string/number/array, see thi.ng/paths). + * Functions are called with this node's constructed stream/subscribable + * and the output id and must return a new `ISubscribable`. For path + * values a subscription is added to this node's result stream which + * then updates the provided state atom at the path given. + * + * Non-function output specs subs assume the raw node output value is an + * object from which the different output keys are being extracted. + * The special `*` output key can be used to handle the entire node + * output value. + * + * ``` + * out: { + * // fn output spec + * // creates new sub which uses `pick` transducer to + * // select key `a` from main node output + * a: (node, id) => node.subscribe({}, pick(id)), + * + * // yields sub of `b` key's values extracted from main output + * // and also stores them at given path in state atom + * b: "foo.b" + * + * // yields sub with same value as main node output and + * // stores vals in state atom at given path + * "*": "foo.main" + * } + * ``` * * See `api.ts` for further details and possible spec variations. * + * @param state * @param spec + * @param id */ const nodeFromSpec = (state: IAtom, spec: NodeSpec, id: string) => (resolve) => { - const src: IObjectOf> = {}; - for (let id in spec.ins) { + const ins = prepareNodeInputs(spec.ins, state, resolve); + const node = spec.fn(ins, id); + const outs = prepareNodeOutputs(spec.outs, node, state, id); + return { ins, node, outs }; +}; + +const prepareNodeInputs = (ins: IObjectOf, state: IAtom, resolve: (x: string) => any) => { + const res: NodeInputs = {}; + if (!ins) return res; + for (let id in ins) { let s; - const i = spec.ins[id]; + const i = ins[id]; if (i.path) { s = fromView(state, i.path); - } else if (i.stream) { + } + else if (i.stream) { s = isString(i.stream) ? resolve(i.stream) : i.stream(resolve); - } else if (i.const) { + } + else if (i.const) { s = fromIterableSync([isFunction(i.const) ? i.const(resolve) : i.const]); - } else { + } + else { illegalArgs(`invalid node input: ${id}`); } if (i.xform) { s = s.subscribe(i.xform, id); } - src[id] = s; + res[id] = s; } - const node = spec.fn(src, id); - prepareNodeOutputs(spec.out, node, state, id); - return node; -}; + return res; +} -const prepareNodeOutputs = (out: NodeOutput, node: ISubscribable, state: IAtom, id: string) => { - if (out) { - if (isFunction(out)) { - out(node); - } - else if (isPlainObject(out)) { - for (let oid in out) { - const o = out[oid]; - if (isFunction(o)) { - o(node); - } else { - ((path, oid) => node.subscribe({ - next: (x) => state.resetIn(path, x[oid]) - }, `out-${id}-${oid}`))(o, oid); - } - } - } - else { - ((path) => node.subscribe({ +const prepareNodeOutputs = (outs: IObjectOf, node: ISubscribable, state: IAtom, nodeID: string) => { + const res: NodeOutputs = {}; + if (!outs) return res; + for (let id in outs) { + const o = outs[id]; + if (isFunction(o)) { + res[id] = o(node, id); + } else if (id == "*") { + res[id] = ((path) => node.subscribe({ next: (x) => state.resetIn(path, x) - }, `out-${id}`))(out); + }, `out-${nodeID}`))(o); + } else { + res[id] = ((path, id) => node.subscribe({ + next: (x) => state.resetIn(path, x[id]) + }, `out-${nodeID}-${id}`))(o, id); } } + return res; }; -export const addNode = (graph: IObjectOf>, state: IAtom, id: string, spec: NodeSpec) => - graph[id] = nodeFromSpec(state, spec, id)((nodeID) => graph[nodeID]); - -export const removeNode = (graph: IObjectOf>, id: string) => { +/** + * Compiles given `NodeSpec` and adds it to graph. Returns compiled + * `Node` object for the given spec. Throws error if the graph already + * contains a node with given `id`. + * + * @param graph + * @param state + * @param id + * @param spec + */ +export const addNode = (graph: Graph, state: IAtom, id: string, spec: NodeSpec) => { if (graph[id]) { - graph[id].unsubscribe(); + illegalArgs(`graph already contains a node with ID: ${id}`); + } + graph[id] = nodeFromSpec(state, spec, id)((path) => getIn(graph, absPath([id], path))); +} + +/** + * Calls `.unsubscribe()` on given node and all of its outputs, then + * removes it from graph. Returns `false` if no node exists for given + * `id`. + * + * @param graph + * @param id + */ +export const removeNode = (graph: Graph, id: string) => { + const node = graph[id]; + if (node) { + node.node.unsubscribe(); + for (let id in node.outs) { + node.outs[id].unsubscribe(); + } delete graph[id]; return true; } + return false; }; /** @@ -133,7 +211,9 @@ export const node = (xform: Transducer, any>, inputIDs?: string[] export const node1 = (xform?: Transducer, inputID = "src"): NodeFactory => (src: IObjectOf>, id: string): ISubscribable => { ensureInputs(src, [inputID], id); - return xform ? src[inputID].subscribe(xform, id) : src[inputID].subscribe(null, id); + return xform ? + src[inputID].subscribe(xform, id) : + src[inputID].subscribe(null, id); }; /** @@ -154,4 +234,4 @@ export const ensureInputs = (src: IObjectOf>, inputIDs: strin illegalArgs(`node "${nodeID}": missing input(s): ${missing.join(", ")}`); } } -}; \ No newline at end of file +}; diff --git a/packages/rstream-graph/test/index.ts b/packages/rstream-graph/test/index.ts index 6918075d79..461a548328 100644 --- a/packages/rstream-graph/test/index.ts +++ b/packages/rstream-graph/test/index.ts @@ -10,28 +10,65 @@ describe("rstream-graph", () => { const acc = []; const state = new Atom({ a: 1, b: 2 }); const graph = rsg.initGraph(state, { - foo: rs.fromIterable([2]), - bar: ($) => $("foo").transform(map((x: number) => x * 10)), + foo: () => ({ + node: rs.fromIterable([2]), + ins: {}, + outs: {} + }), + bar: ($) => ({ + node: $("/foo/node").transform(map((x: number) => x * 10)), + ins: {}, + outs: {} + }), add: { fn: rsg.add, ins: { a: { path: "a" }, b: { path: "b" } }, + outs: { + alt: (n) => n.subscribe({}) // identical to main out, testing only + } }, mul: { fn: rsg.mul, ins: { - a: { stream: "add" }, + a: { stream: "/add/outs/alt" }, b: { stream: () => rs.fromIterable([10, 20, 30]) }, - c: { stream: "bar" } + c: { stream: "/bar/node" } }, + outs: { + baz: (n, id) => n.subscribe({ next: (x) => state.resetIn(["foo", id], x) }) + } + }, + res: { + ins: { + src: { stream: "/mul/node" } + }, + fn: rsg.node1(map((x: number) => ({ x: x, x2: x * 2 }))), + outs: { + "*": "res" + } + }, + res2: { + ins: { + src: { stream: "/res/node" } + }, + fn: rsg.node1(), + outs: { + x: "res2.x", + } } }); - graph.mul.subscribe({ next: (x) => acc.push(x) }); + graph.mul.node.subscribe({ next: (x) => acc.push(x) }); setTimeout(() => { state.resetIn("a", 10); + console.log(graph); assert.deepEqual(acc, [600, 1200, 1800, 7200]); + assert.deepEqual( + state.deref(), + { a: 10, b: 2, foo: { baz: 7200 }, res: { x: 7200, x2: 14400 }, res2: { x: 7200 } } + ); done(); }, 10); }); From 558f4f8cbe3a843bc107a213e0dfa1c7de28a913 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 14:36:36 +0100 Subject: [PATCH 05/10] fix(resolve-map): add private _resolveDeep - fixes resolution issue if a function dynamically created deep values --- packages/resolve-map/src/index.ts | 32 +++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/packages/resolve-map/src/index.ts b/packages/resolve-map/src/index.ts index 5c1b08cd53..ea64deb9c3 100644 --- a/packages/resolve-map/src/index.ts +++ b/packages/resolve-map/src/index.ts @@ -81,7 +81,7 @@ export const resolveMap = (obj: any, root?: any, path: LookupPath = [], resolved * @param path * @param resolved */ -const resolveArray = (arr: any[], root?: any, path: LookupPath = [], resolved: any = {}) => { +const _resolveArray = (arr: any[], root?: any, path: LookupPath = [], resolved: any = {}) => { root = root || arr; for (let k = 0, n = arr.length; k < n; k++) { _resolve(root, [...path, k], resolved); @@ -90,7 +90,8 @@ const resolveArray = (arr: any[], root?: any, path: LookupPath = [], resolved: a }; const _resolve = (root: any, path: LookupPath, resolved: any) => { - let v = getIn(root, path), rv = SEMAPHORE; + let rv = SEMAPHORE; + let v = getIn(root, path); const pp = path.join("/"); if (!resolved[pp]) { if (isString(v) && v.charAt(0) === "@") { @@ -98,9 +99,9 @@ const _resolve = (root: any, path: LookupPath, resolved: any) => { } else if (isPlainObject(v)) { resolveMap(v, root, path, resolved); } else if (isArray(v)) { - resolveArray(v, root, path, resolved); + _resolveArray(v, root, path, resolved); } else if (isFunction(v)) { - rv = v((p: string) => _resolve(root, absPath(path, p, 0), resolved)); + rv = v((p: string) => _resolveDeep(root, absPath(path, p, 0), resolved)); } if (rv !== SEMAPHORE) { mutIn(root, path, rv); @@ -111,6 +112,29 @@ const _resolve = (root: any, path: LookupPath, resolved: any) => { return v; }; +/** + * Repeatedly calls `_resolve` stepwise descending along given path. + * This is to ensure resolution of deep values created by functions at + * parent tree levels. E.g. given: + * + * ``` + * {a: () => ({b: {c: 1}}), d: ($) => $("/a/b/c") } + * => + * { a: { b: { c: 1 } }, d: 1 } + * ``` + * + * @param root + * @param path + * @param resolved + */ +const _resolveDeep = (root: any, path: LookupPath, resolved: any) => { + let v; + for (let i = 1, n = path.length; i <= n; i++) { + v = _resolve(root, path.slice(0, i), resolved); + } + return v; +}; + /** * Takes the path for the current key and a lookup path string. Converts * the possibly relative lookup path into its absolute form. From 1a09b61ef178a48e09b8b84b19f0afd1e019d202 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 14:37:40 +0100 Subject: [PATCH 06/10] minor(rstream-graph): minor fix exported types --- packages/rstream-graph/src/graph.ts | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/rstream-graph/src/graph.ts b/packages/rstream-graph/src/graph.ts index 4c7a2de448..81e9f4f0cc 100644 --- a/packages/rstream-graph/src/graph.ts +++ b/packages/rstream-graph/src/graph.ts @@ -15,6 +15,7 @@ import { Transducer } from "@thi.ng/transducers/api"; import { Graph, GraphSpec, + Node, NodeFactory, NodeInputs, NodeInputSpec, @@ -67,15 +68,15 @@ const isNodeSpec = (x: any): x is NodeSpec => * then updates the provided state atom at the path given. * * Non-function output specs subs assume the raw node output value is an - * object from which the different output keys are being extracted. - * The special `*` output key can be used to handle the entire node - * output value. + * object from which the different output keys are being extracted. The + * special `*` output key can be used to handle the entire node output + * value. This is useful/required for non-object node result values. * * ``` * out: { * // fn output spec * // creates new sub which uses `pick` transducer to - * // select key `a` from main node output + * // select key `a` from main node output (assumed to be object) * a: (node, id) => node.subscribe({}, pick(id)), * * // yields sub of `b` key's values extracted from main output @@ -94,12 +95,13 @@ const isNodeSpec = (x: any): x is NodeSpec => * @param spec * @param id */ -const nodeFromSpec = (state: IAtom, spec: NodeSpec, id: string) => (resolve) => { - const ins = prepareNodeInputs(spec.ins, state, resolve); - const node = spec.fn(ins, id); - const outs = prepareNodeOutputs(spec.outs, node, state, id); - return { ins, node, outs }; -}; +const nodeFromSpec = (state: IAtom, spec: NodeSpec, id: string) => + (resolve) => { + const ins = prepareNodeInputs(spec.ins, state, resolve); + const node = spec.fn(ins, id); + const outs = prepareNodeOutputs(spec.outs, node, state, id); + return { ins, node, outs }; + }; const prepareNodeInputs = (ins: IObjectOf, state: IAtom, resolve: (x: string) => any) => { const res: NodeInputs = {}; @@ -157,11 +159,13 @@ const prepareNodeOutputs = (outs: IObjectOf, node: ISubscribable * @param id * @param spec */ -export const addNode = (graph: Graph, state: IAtom, id: string, spec: NodeSpec) => { +export const addNode = (graph: Graph, state: IAtom, id: string, spec: NodeSpec): Node => { if (graph[id]) { illegalArgs(`graph already contains a node with ID: ${id}`); } - graph[id] = nodeFromSpec(state, spec, id)((path) => getIn(graph, absPath([id], path))); + return graph[id] = nodeFromSpec(state, spec, id)( + (path) => getIn(graph, absPath([id], path)) + ); } /** From dd2cbd44d21d3041aae30334b2207d18467e3497 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 14:38:11 +0100 Subject: [PATCH 07/10] refactor(examples): update rstream-graph examples --- examples/rstream-dataflow/src/index.ts | 28 +++++++++++++------------- examples/rstream-grid/src/dataflow.ts | 12 ++++++----- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/examples/rstream-dataflow/src/index.ts b/examples/rstream-dataflow/src/index.ts index 42f7a0b216..07cce78ace 100644 --- a/examples/rstream-dataflow/src/index.ts +++ b/examples/rstream-dataflow/src/index.ts @@ -59,7 +59,7 @@ const graph = initGraph(db, { mpos: { fn: extract([1, "pos"]), ins: { src: { stream: () => gestures } }, - out: "mpos" + outs: { "*": "mpos" } }, // extracts last click position from gesture tuple @@ -68,7 +68,7 @@ const graph = initGraph(db, { clickpos: { fn: extract([1, "click"]), ins: { src: { stream: () => gestures } }, - out: "clickpos" + outs: { "*": "clickpos" } }, // extracts & computes length of `delta` vector in gesture tuple @@ -83,7 +83,7 @@ const graph = initGraph(db, { } )), ins: { src: { stream: () => gestures } }, - out: "dist" + outs: { "*": "dist" } }, // combines `clickpos`, `dist` and `color` streams to produce a @@ -101,11 +101,11 @@ const graph = initGraph(db, { undefined )), ins: { - click: { stream: "clickpos" }, - radius: { stream: "radius" }, - color: { stream: "color" }, + click: { stream: "/clickpos/node" }, + radius: { stream: "/radius/node" }, + color: { stream: "/color/node" }, }, - out: "circle" + outs: { "*": "circle" } }, // produces a new random color for each new drag gesture (and @@ -119,8 +119,8 @@ const graph = initGraph(db, { dedupe(equiv), map((x) => x && colors.next().value) )), - ins: { src: { stream: "clickpos" } }, - out: "color" + ins: { src: { stream: "/clickpos/node" } }, + outs: { "*": "color" } }, // transforms a `requestAnimationFrame` event stream (frame counter @ 60fps) @@ -128,7 +128,7 @@ const graph = initGraph(db, { sine: { fn: node1(map((x: number) => 0.8 + 0.2 * Math.sin(x * 0.05))), ins: { src: { stream: () => raf } }, - out: "sin" + outs: { "*": "sin" } }, // multiplies `dist` and `sine` streams to produce an animated @@ -136,10 +136,10 @@ const graph = initGraph(db, { radius: { fn: mul, ins: { - a: { stream: "sine" }, - b: { stream: "dist" } + a: { stream: "/sine/node" }, + b: { stream: "/dist/node" } }, - out: "radius" + outs: { "*": "radius" } } }); @@ -152,7 +152,7 @@ start("app", () => // since all @thi.ng/rstream subscriptions implement the // @thi.ng/api/IDeref interface (like several other types, e.g. // @thi.ng/atom's Atom, Cursor, View etc.) - graph.circle + graph.circle.node ]); // create a GraphViz DOT file of the entire dataflow graph diff --git a/examples/rstream-grid/src/dataflow.ts b/examples/rstream-grid/src/dataflow.ts index c48f792a23..67a395377f 100644 --- a/examples/rstream-grid/src/dataflow.ts +++ b/examples/rstream-grid/src/dataflow.ts @@ -32,22 +32,24 @@ export function initDataflow(bus: EventBus) { rotation: { fn: rotate, ins: { - shapes: { stream: "grid" }, + shapes: { stream: "/grid/node" }, theta: { path: paths.THETA }, }, }, svg: { fn: createSVG, ins: { - shapes: { stream: "rotation" }, + shapes: { stream: "/rotation/node" }, cols: { path: paths.COLS }, rows: { path: paths.ROWS }, stroke: { path: paths.STROKE }, }, // dispatch SVG result doc as event - out: (node) => node.subscribe({ - next: (svg) => bus.dispatch([ev.UPDATE_SVG, svg]) - }) + outs: { + "*": (node) => node.subscribe({ + next: (svg) => bus.dispatch([ev.UPDATE_SVG, svg]) + }) + } } }); return graph; From 48c796f5cb7cdefe7f86a876c7829410158f4c06 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 15:19:22 +0100 Subject: [PATCH 08/10] fix(resolve-map): also use _resolvePath for plain lookups, optimize - rename _resolveDeep => _resolvePath - update docs --- packages/resolve-map/src/index.ts | 40 ++++++++++++++++++------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/packages/resolve-map/src/index.ts b/packages/resolve-map/src/index.ts index ea64deb9c3..d6aa2c94e5 100644 --- a/packages/resolve-map/src/index.ts +++ b/packages/resolve-map/src/index.ts @@ -93,15 +93,16 @@ const _resolve = (root: any, path: LookupPath, resolved: any) => { let rv = SEMAPHORE; let v = getIn(root, path); const pp = path.join("/"); + console.log("resolve", pp, resolved[pp]); if (!resolved[pp]) { - if (isString(v) && v.charAt(0) === "@") { - rv = _resolve(root, absPath(path, v), resolved); - } else if (isPlainObject(v)) { + if (isPlainObject(v)) { resolveMap(v, root, path, resolved); } else if (isArray(v)) { _resolveArray(v, root, path, resolved); + } else if (isString(v) && v.charAt(0) === "@") { + rv = _resolvePath(root, absPath(path, v), resolved); } else if (isFunction(v)) { - rv = v((p: string) => _resolveDeep(root, absPath(path, p, 0), resolved)); + rv = v((p: string) => _resolvePath(root, absPath(path, p, 0), resolved)); } if (rv !== SEMAPHORE) { mutIn(root, path, rv); @@ -113,12 +114,16 @@ const _resolve = (root: any, path: LookupPath, resolved: any) => { }; /** - * Repeatedly calls `_resolve` stepwise descending along given path. - * This is to ensure resolution of deep values created by functions at - * parent tree levels. E.g. given: + * If the value at given path is still unresolved, repeatedly calls + * `_resolve` by stepwise descending along given path and returns final + * value. This is to ensure full resolution of deeper values created by + * functions at intermediate tree levels. If the path is already marked + * as resolved, returns its value. + * + * E.g. given: * * ``` - * {a: () => ({b: {c: 1}}), d: ($) => $("/a/b/c") } + * {a: () => ({b: {c: 1}}), d: "@/a/b/c" } * => * { a: { b: { c: 1 } }, d: 1 } * ``` @@ -127,7 +132,10 @@ const _resolve = (root: any, path: LookupPath, resolved: any) => { * @param path * @param resolved */ -const _resolveDeep = (root: any, path: LookupPath, resolved: any) => { +const _resolvePath = (root: any, path: LookupPath, resolved: any) => { + if (resolved[path.join("/")]) { + return getIn(root, path); + } let v; for (let i = 1, n = path.length; i <= n; i++) { v = _resolve(root, path.slice(0, i), resolved); @@ -140,23 +148,23 @@ const _resolveDeep = (root: any, path: LookupPath, resolved: any) => { * the possibly relative lookup path into its absolute form. * * @param curr - * @param q + * @param path * @param idx */ -export const absPath = (curr: LookupPath, q: string, idx = 1): PropertyKey[] => { - if (q.charAt(idx) === "/") { - return q.substr(idx + 1).split("/"); +export const absPath = (curr: LookupPath, path: string, idx = 1): PropertyKey[] => { + if (path.charAt(idx) === "/") { + return path.substr(idx + 1).split("/"); } curr = curr.slice(0, curr.length - 1); - const sub = q.substr(idx).split("/"); + const sub = path.substr(idx).split("/"); for (let i = 0, n = sub.length; i < n; i++) { if (sub[i] === "..") { - !curr.length && illegalArgs(`invalid lookup path`); + !curr.length && illegalArgs(`invalid lookup path: ${path}`); curr.pop(); } else { return curr.concat(sub.slice(i)); } } - !curr.length && illegalArgs(`invalid lookup path`); + !curr.length && illegalArgs(`invalid lookup path: ${path}`); return curr; }; From 720b1f189dbe4552160149715c308edf632be3c6 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 15:33:26 +0100 Subject: [PATCH 09/10] docs(rstream-graph): update api docs & readme --- packages/rstream-graph/README.md | 10 ++++--- packages/rstream-graph/src/api.ts | 45 ++++++++++++++++--------------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/packages/rstream-graph/README.md b/packages/rstream-graph/README.md index 3595044cc9..4c91b3c7c6 100644 --- a/packages/rstream-graph/README.md +++ b/packages/rstream-graph/README.md @@ -12,7 +12,10 @@ Declarative, reactive dataflow graph construction using [@thi.ng/atom](https://github.com/thi-ng/umbrella/tree/master/packages/atom) and [@thi.ng/transducers](https://github.com/thi-ng/umbrella/tree/master/packages/transducers) primitives. -Stream subscription types act as graph nodes and attached transducers as graph edges, transforming data for downstream consumers / nodes. Theoretically, allows cycles and is not restricted to DAG topologies, but care must be taken to avoid CPU hogging (user's responsibility). +Stream subscription types act as graph nodes and attached transducers as +graph edges, transforming data for downstream consumers / nodes. +Theoretically, allows cycles and is not restricted to DAG topologies, +but care must be taken to avoid CPU hogging (user's responsibility). ## Installation @@ -67,7 +70,7 @@ const graph = rsg.initGraph(state, { mul: { fn: rsg.mul, ins: { - a: { stream: "add" }, + a: { stream: "/add/node" }, b: { stream: () => rs.fromIterable([10, 20, 30]) } }, } @@ -87,7 +90,8 @@ setTimeout(() => state.resetIn("a", 10), 1000); // result: 360 ``` -Please documentation in the source code for further details. +Please see documentation in the source code & test cases for further +details. ## Authors diff --git a/packages/rstream-graph/src/api.ts b/packages/rstream-graph/src/api.ts index 06e65229fa..811802b26e 100644 --- a/packages/rstream-graph/src/api.ts +++ b/packages/rstream-graph/src/api.ts @@ -23,8 +23,8 @@ export interface Node { * A dataflow graph spec is simply an object where keys are node names * and their values are either pre-existing @thi.ng/rstream * `ISubscribable`s, functions returning `ISubscribable`s or - * `NodeSpec`s, defining inputs and the operation to be applied to - * produce a result stream. + * `NodeSpec`s, defining a node's inputs, outputs and the operation to + * be applied to produce one or more result streams. */ export type GraphSpec = IObjectOf< NodeSpec | @@ -33,15 +33,17 @@ export type GraphSpec = IObjectOf< /** * Specification for a single "node" in the dataflow graph. Nodes here - * are actually streams (or just generally any form of @thi.ng/rstream - * subscription), usually with an associated transducer to transform / - * combine the inputs and produce values for the node's result stream. + * are actually streams / qsubscriptions (or just generally any form of + * @thi.ng/rstream `ISubscribable`), usually with an associated + * transducer to transform / combine the inputs and produce values for + * the node's result stream. * - * The `fn` function is responsible to produce such a stream construct. - * The keys used to specify inputs in the `ins` object are dictated by - * the actual node `fn` used. Most node functions with multiple inputs - * are implemented as `StreamSync` instances and the input IDs are used - * to locally rename input streams within the `StreamSync` container. + * The `fn` function is responsible to produce such a stream transformer + * construct. The keys used to specify inputs in the `ins` object are + * dictated by the actual node `fn` used. Most node functions with + * multiple inputs are implemented as `StreamSync` instances and the + * input IDs are used to locally rename input streams within the + * `StreamSync` container. * * Alo see `initGraph` and `nodeFromSpec` (in /src/nodes.ts) for more * details how these specs are compiled into stream constructs. @@ -64,22 +66,23 @@ export interface NodeSpec { * { path: ["nested", "src", "path"] } * ``` * - * 2) Reference path to another node in the GraphSpec object. See - * `@thi.ng/resolve-map` for details. + * 2) Reference path to another node's output in the GraphSpec object. + * See `@thi.ng/resolve-map` for details. * * ``` - * { stream: "/path/to/node-id" } // absolute - * { stream: "../../path/to/node-id" } // relative - * { stream: "node-id" } // sibling + * { stream: "/node-id/node" } // main node output + * { stream: "/node-id/outs/foo" } // specific output * ``` * * 3) Reference another node indirectly. The passed in `resolve` - * function can be used to lookup other nodes, e.g. the following - * spec looks up node "src" and adds a transformed subscription, - * which is then used as input for current node. + * function can be used to lookup other nodes, with the same logic as + * above. E.g. the following spec looks up the main output of node + * "abc" and adds a transformed subscription, which is then used as + * input for current node. * * ``` - * { stream: (resolve) => resolve("src").subscribe(map(x => x * 10)) } + * { stream: (resolve) => + * resolve("/abc/node").subscribe(map(x => x * 10)) } * ``` * * 4) Provide an external input stream: @@ -95,8 +98,8 @@ export interface NodeSpec { * { const: () => 1 } * ``` * - * If the optional `xform` is given, a subscription with the transducer - * is added to the input and then used as input instead. + * If the optional `xform` is given, a subscription with the given + * transducer is added to the input and then used as input instead. */ export interface NodeInputSpec { id?: string; From 93b40b7824421078698f278a7dc849af172362bc Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Wed, 6 Jun 2018 15:40:24 +0100 Subject: [PATCH 10/10] Publish - @thi.ng/resolve-map@2.0.6 - @thi.ng/rstream-graph@2.0.0 --- packages/resolve-map/CHANGELOG.md | 16 ++++++++++++++-- packages/resolve-map/package.json | 2 +- packages/rstream-graph/CHANGELOG.md | 23 +++++++++++++++++++++++ packages/rstream-graph/package.json | 4 ++-- 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/packages/resolve-map/CHANGELOG.md b/packages/resolve-map/CHANGELOG.md index a5176bc1a6..8014867fe0 100644 --- a/packages/resolve-map/CHANGELOG.md +++ b/packages/resolve-map/CHANGELOG.md @@ -3,7 +3,19 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. - + +## [2.0.6](https://github.com/thi-ng/umbrella/compare/@thi.ng/resolve-map@2.0.5...@thi.ng/resolve-map@2.0.6) (2018-06-06) + + +### Bug Fixes + +* **resolve-map:** add private _resolveDeep ([558f4f8](https://github.com/thi-ng/umbrella/commit/558f4f8)) +* **resolve-map:** also use _resolvePath for plain lookups, optimize ([48c796f](https://github.com/thi-ng/umbrella/commit/48c796f)) + + + + + ## [2.0.5](https://github.com/thi-ng/umbrella/compare/@thi.ng/resolve-map@2.0.4...@thi.ng/resolve-map@2.0.5) (2018-05-14) @@ -11,7 +23,7 @@ See [Conventional Commits](https://conventionalcommits.org) for commit guideline **Note:** Version bump only for package @thi.ng/resolve-map - + ## [2.0.4](https://github.com/thi-ng/umbrella/compare/@thi.ng/resolve-map@2.0.3...@thi.ng/resolve-map@2.0.4) (2018-05-14) diff --git a/packages/resolve-map/package.json b/packages/resolve-map/package.json index 9c1c6e365b..0c1a114272 100644 --- a/packages/resolve-map/package.json +++ b/packages/resolve-map/package.json @@ -1,6 +1,6 @@ { "name": "@thi.ng/resolve-map", - "version": "2.0.5", + "version": "2.0.6", "description": "DAG resolution of vanilla objects & arrays with internally linked values", "main": "./index.js", "typings": "./index.d.ts", diff --git a/packages/rstream-graph/CHANGELOG.md b/packages/rstream-graph/CHANGELOG.md index de5cee41a4..02b99a26d1 100644 --- a/packages/rstream-graph/CHANGELOG.md +++ b/packages/rstream-graph/CHANGELOG.md @@ -3,6 +3,29 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. + +# [2.0.0](https://github.com/thi-ng/umbrella/compare/@thi.ng/rstream-graph@1.1.2...@thi.ng/rstream-graph@2.0.0) (2018-06-06) + + +### Features + +* **rstream-graph:** add full/optional support for multiple node outputs ([f2e0df2](https://github.com/thi-ng/umbrella/commit/f2e0df2)) +* **rstream-graph:** update NodeOutput, support multiple handlers ([be21c4c](https://github.com/thi-ng/umbrella/commit/be21c4c)) + + +### BREAKING CHANGES + +* **rstream-graph:** update NodeSpec format & graph initialization + +- add new types/interfaces +- non-destructive initGraph() behavior +- update & refactor nodeFromSpec() +- update addNode/removeNode() +- update tests & docs + + + + ## [1.1.2](https://github.com/thi-ng/umbrella/compare/@thi.ng/rstream-graph@1.1.1...@thi.ng/rstream-graph@1.1.2) (2018-05-30) diff --git a/packages/rstream-graph/package.json b/packages/rstream-graph/package.json index ed3902b62f..43c793ff7e 100644 --- a/packages/rstream-graph/package.json +++ b/packages/rstream-graph/package.json @@ -1,6 +1,6 @@ { "name": "@thi.ng/rstream-graph", - "version": "1.1.2", + "version": "2.0.0", "description": "Declarative dataflow graph construction for @thi.ng/rstream", "main": "./index.js", "typings": "./index.d.ts", @@ -28,7 +28,7 @@ "@thi.ng/checks": "^1.5.3", "@thi.ng/errors": "^0.1.3", "@thi.ng/paths": "^1.3.8", - "@thi.ng/resolve-map": "^2.0.5", + "@thi.ng/resolve-map": "^2.0.6", "@thi.ng/rstream": "^1.7.1", "@thi.ng/transducers": "^1.10.2" },