Skip to content

Commit

Permalink
fix(rstream): fix #305, metaStream() factory arg type
Browse files Browse the repository at this point in the history
- add test case
  • Loading branch information
postspectacular committed Aug 8, 2021
1 parent 7cc349a commit 2bc7bff
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
43 changes: 20 additions & 23 deletions packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assert, Fn } from "@thi.ng/api";
import { assert, Fn, Nullable } from "@thi.ng/api";
import { CloseMode, CommonOpts, ISubscription, State } from "./api";
import { Subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";
Expand All @@ -15,30 +15,27 @@ export interface MetaStreamOpts extends CommonOpts {
}

/**
* Returns a {@link Subscription} which transforms each incoming value
* into a new {@link Stream}, subscribes to it (via an hidden / internal
* Returns a {@link Subscription} which transforms each incoming value into a
* new {@link ISubscription}, subscribes to it (via an hidden / internal
* subscription) and then only passes values from that stream to its own
* subscribers.
*
* @remarks
* If a new value is received, the metastream first unsubscribes from
* any still active stream, before creating and subscribing to the new
* stream. Hence this stream type is useful for cases where streams need
* to be dynamically created & inserted into an existing dataflow
* topology.
* If a new value is received, the metastream first unsubscribes from any still
* active previous stream (if any), before creating and subscribing to the new
* one. Hence this stream type is useful for cases where streams need to be
* dynamically created & inserted into an existing dataflow topology.
*
* The user supplied `factory` function will be called for each incoming
* value and is responsible for creating the new stream instances. If
* the function returns null/undefined, no further action will be taken
* (acts like a filter transducer).
* The user supplied `factory` function will be called for each incoming value
* and is responsible for creating the new stream instances. If the function
* returns null/undefined, no further action will be taken (acts like a filter
* transducer).
*
* The factory function does NOT need to create *new* streams, but can
* merely return other existing streams, and so making the meta stream
* act like a switch with arbitrary criteria.
*
* If the meta stream itself is the only subscriber to existing input
* streams, you'll need to configure the input's
* {@link CommonOpts.closeOut} option to keep them alive and support
* The factory function does NOT need to create *new* streams, but can merely
* return other existing streams, and so making the meta stream act like a
* switch with arbitrary criteria. However, if the meta stream itself is the
* only subscriber to such existing input streams, you'll need to configure the
* input's {@link CommonOpts.closeOut} option to keep them alive and support
* dynamic switching between them.
*
* @example
Expand Down Expand Up @@ -97,22 +94,22 @@ export interface MetaStreamOpts extends CommonOpts {
* @param id -
*/
export const metaStream = <A, B>(
factory: Fn<A, Subscription<B, B>>,
factory: Fn<A, Nullable<ISubscription<B, B>>>,
opts?: Partial<MetaStreamOpts>
) => new MetaStream(factory, opts);

/**
* @see {@link metaStream} for reference & examples.
*/
export class MetaStream<A, B> extends Subscription<A, B> {
factory: Fn<A, Subscription<B, B>>;
stream?: Subscription<B, B>;
factory: Fn<A, Nullable<ISubscription<B, B>>>;
stream?: ISubscription<B, B>;
sub?: ISubscription<B, B>;
emitLast: boolean;
doneRequested: boolean;

constructor(
factory: Fn<A, Subscription<B, B>>,
factory: Fn<A, Nullable<ISubscription<B, B>>>,
opts: Partial<MetaStreamOpts> = {}
) {
super(undefined, optsWithID("metastram", opts));
Expand Down
25 changes: 23 additions & 2 deletions packages/rstream/test/metastream.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import * as assert from "assert";
import { CloseMode, fromIterable, metaStream } from "../src";
import { CloseMode, fromIterable, metaStream, reactive } from "../src";
import { TIMEOUT } from "./config";
import { assertActive, assertUnsub } from "./utils";

describe("MetaStream", function () {
this.retries(3);

it("basic", (done) => {
const acc: number[] = [];
const src = fromIterable([1, 2, 3], { delay: TIMEOUT });
const meta = metaStream<number, number>((x) =>
fromIterable([x * 10, x * 20, x * 30], { delay: TIMEOUT >> 2 })
);
const sub = src.subscribe(meta);
const acc: number[] = [];
const sub2 = sub.subscribe({
next(x) {
acc.push(x);
Expand All @@ -27,6 +27,27 @@ describe("MetaStream", function () {
}, 5 * TIMEOUT);
});

it("null", (done) => {
const acc: number[] = [];
const src = fromIterable([1, 2, 3], { delay: TIMEOUT });
const meta = metaStream<number, number>((x) =>
x & 1 ? reactive(x) : null
);
const sub = src.subscribe(meta);
const sub2 = sub.subscribe({
next(x) {
acc.push(x);
},
});
setTimeout(() => {
assert.deepStrictEqual(acc, [1, 3]);
assertUnsub(meta);
assertUnsub(sub);
assertUnsub(sub2);
done();
}, 5 * TIMEOUT);
});

it("closein", (done) => {
const src = fromIterable([1], { delay: TIMEOUT });
const meta = metaStream((x) => fromIterable([x]), {
Expand Down

0 comments on commit 2bc7bff

Please sign in to comment.