Skip to content

Commit

Permalink
refactor(rstream-csp): minor pkg restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 29, 2021
1 parent 761de32 commit 103f1ba
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 53 deletions.
5 changes: 1 addition & 4 deletions packages/rstream-csp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"scripts": {
"build": "yarn clean && tsc --declaration",
"build:check": "tsc --isolatedModules --noEmit",
"clean": "rimraf *.js *.d.ts *.map doc from",
"clean": "rimraf *.js *.d.ts *.map doc",
"doc": "typedoc --excludePrivate --excludeInternal --out doc src/index.ts",
"doc:ae": "mkdir -p .ae/doc .ae/temp && node_modules/.bin/api-extractor run --local --verbose",
"doc:readme": "../../scripts/node-esm ../../tools/src/readme.ts",
Expand Down Expand Up @@ -60,9 +60,6 @@
"exports": {
".": {
"import": "./index.js"
},
"./from/channel": {
"import": "./from/channel.js"
}
},
"thi.ng": {
Expand Down
48 changes: 0 additions & 48 deletions packages/rstream-csp/src/from/channel.ts

This file was deleted.

49 changes: 48 additions & 1 deletion packages/rstream-csp/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,48 @@
export * from "./from/channel";
import type { Channel } from "@thi.ng/csp";
import type { CommonOpts } from "@thi.ng/rstream";
import { LOGGER } from "@thi.ng/rstream/logger";
import { Stream } from "@thi.ng/rstream/stream";

export interface FromChannelOpts extends CommonOpts {
/**
* If true, the parent CSP channel will be closed when this stream
* closes.
*
* @defaultValue true
*/
closeChannel: boolean;
}

/**
* Returns a stream of values received from given
* {@link @thi.ng/csp#Channel}.
*
* @param src -
* @param opts -
*/
export const fromChannel = <T>(
src: Channel<T>,
opts?: Partial<FromChannelOpts>
) => {
opts = { id: `channel-${src.id}`, closeChannel: true, ...opts };
return new Stream<T>((stream) => {
let isActive = true;
(async () => {
let x;
while (((x = null), (x = await src.read())) !== undefined) {
if (x === undefined || !isActive) {
break;
}
stream.next(x);
}
stream.done();
})();
return () => {
if (opts!.closeChannel !== false) {
src.close(true);
LOGGER.info("closed channel", src.id);
}
isActive = false;
};
}, opts);
};

0 comments on commit 103f1ba

Please sign in to comment.