Skip to content

Commit

Permalink
feat(rstream): add fromNodeJS/linesFromNodeJS()
Browse files Browse the repository at this point in the history
- add NodeJS stream adapter bridges
- update readme
  • Loading branch information
postspectacular committed Nov 15, 2021
1 parent 10d0e9f commit d56026d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 1 deletion.
3 changes: 2 additions & 1 deletion packages/rstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ node --experimental-repl-await
> const rstream = await import("@thi.ng/rstream");
```

Package sizes (gzipped, pre-treeshake): ESM: 6.02 KB
Package sizes (gzipped, pre-treeshake): ESM: 6.12 KB

## Dependencies

Expand Down Expand Up @@ -402,6 +402,7 @@ s.next(42);
- [fromDOMEvent()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/event.ts#L25) - DOM events
- [fromInterval()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/interval.ts) - interval based counters
- [fromIterable()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/iterable.ts) - arrays, iterators / generators (async & sync)
- [fromNodeJS() / linesFromNodeJS()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/nodejs.ts) - NodeJS streams adapters
- [fromObject()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/object.ts) - object property streams
- [fromPromise()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/promise.ts) - single value stream from promise
- [fromPromises()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/promises.ts) - results from multiple promise
Expand Down
3 changes: 3 additions & 0 deletions packages/rstream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
"./metastream": {
"import": "./metastream.js"
},
"./nodejs": {
"import": "./nodejs.js"
},
"./object": {
"import": "./object.js"
},
Expand Down
1 change: 1 addition & 0 deletions packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export * from "./iterable.js";
export * from "./logger.js";
export * from "./merge.js";
export * from "./metastream.js";
export * from "./nodejs.js";
export * from "./object.js";
export * from "./post-worker.js";
export * from "./promise.js";
Expand Down
70 changes: 70 additions & 0 deletions packages/rstream/src/nodejs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { rechunk } from "@thi.ng/transducers/rechunk";
import type { Readable } from "stream";
import { Stream, stream } from "./stream.js";
import type { Subscription } from "./subscription.js";

/**
* Adapter bridge from NodeJS streams. Creates and returns a new {@link stream}
* of type `T` and pipes in data from `stdout` (also assumed to produce data of
* type `T`). If given, also connects `stderr` to new rstream's error handler.
* Unless `close` is false, the new stream closes once `stdout` is closed.
*
* @param stdout
* @param stderr
* @param close
*/
export const fromNodeJS = <T>(
stdout: Readable,
stderr?: Readable,
close = true
): Stream<T> => {
const ingest = stream<T>();
stdout.on("data", (data) => ingest.next(data));
stderr && stderr.on("data", (data) => ingest.error(data));
close && stdout.on("close", () => ingest.done());
return ingest;
};

/**
* Specialized version of {@link fromNodeJS} for string inputs and automatic
* rechunking/splitting of the input using the optionally provided regexp (line
* breaks by default).
*
* @remarks
* Internally uses https://docs.thi.ng/umbrella/transducers/modules.html#rechunk
* to rechunk input.
*
* @example
* ```ts
* import { spawn } from "child_process"
* import { linesFromNodeJS, trace } from "@thi.ng/rstream";
*
* const cmd = spawn("ls", ["-la"]);
*
* linesFromNodeJS<string>(cmd.stdout, cmd.stderr).subscribe(trace("output"));
*
* // output total 12760
* // output drwxr-xr-x 37 foo staff 1184 Nov 15 15:29 .
* // output drwxr-xr-x 143 foo staff 4576 Nov 11 21:08 ..
* // output drwxr-xr-x 17 foo staff 544 Nov 15 17:39 .git
* // output -rw-r--r-- 1 foo staff 149 Aug 4 15:32 .gitattributes
* // output drwxr-xr-x 5 foo staff 160 Apr 12 2021 .github
* // output -rw-r--r-- 1 foo staff 659 Sep 10 22:55 .gitignore
* // ...
* // output done
* ```
*
* @param stdout
* @param stderr
* @param re
* @param close
*/
export const linesFromNodeJS = (
stdout: Readable,
stderr?: Readable,
re?: RegExp,
close?: boolean
) =>
<Subscription<string, string>>(
fromNodeJS<string>(stdout, stderr, close).transform(rechunk(re))
);
1 change: 1 addition & 0 deletions packages/rstream/tpl.readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ s.next(42);
- [fromDOMEvent()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/event.ts#L25) - DOM events
- [fromInterval()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/interval.ts) - interval based counters
- [fromIterable()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/iterable.ts) - arrays, iterators / generators (async & sync)
- [fromNodeJS() / linesFromNodeJS()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/nodejs.ts) - NodeJS streams adapters
- [fromObject()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/object.ts) - object property streams
- [fromPromise()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/promise.ts) - single value stream from promise
- [fromPromises()](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream/src/promises.ts) - results from multiple promise
Expand Down

0 comments on commit d56026d

Please sign in to comment.