From 898eb537c6812eecc1b6f0dd720ce5a24b431f13 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Sat, 23 Nov 2019 15:19:36 +0000 Subject: [PATCH] feat(rstream): update sidechainPartition/Toggle and timeout - use options objects for args - update tests --- .../rstream/src/subs/sidechain-partition.ts | 17 ++++---- packages/rstream/src/subs/sidechain-toggle.ts | 22 ++++++----- packages/rstream/src/subs/timeout.ts | 39 +++++++++++-------- packages/rstream/test/sidechain-partition.ts | 14 +++++-- packages/rstream/test/sidechain-toggle.ts | 2 +- packages/rstream/test/timeout.ts | 4 +- 6 files changed, 58 insertions(+), 40 deletions(-) diff --git a/packages/rstream/src/subs/sidechain-partition.ts b/packages/rstream/src/subs/sidechain-partition.ts index 1eb9aeed33..0e1a333b45 100644 --- a/packages/rstream/src/subs/sidechain-partition.ts +++ b/packages/rstream/src/subs/sidechain-partition.ts @@ -3,6 +3,10 @@ import { CommonOpts, ISubscribable, State } from "../api"; import { Subscription } from "../subscription"; import { optsWithID } from "../utils/idgen"; +export interface SidechainPartitionOpts extends CommonOpts { + pred: Predicate; +} + /** * Buffers values from `src` until side chain fires, then emits buffer * (unless empty) and repeats process until either input is done. By @@ -29,9 +33,8 @@ import { optsWithID } from "../utils/idgen"; */ export const sidechainPartition = ( side: ISubscribable, - pred?: Predicate, - opts?: Partial -): Subscription => new SidechainPartition(side, pred, opts); + opts?: Partial> +): Subscription => new SidechainPartition(side, opts); export class SidechainPartition extends Subscription { sideSub: Subscription; @@ -39,13 +42,13 @@ export class SidechainPartition extends Subscription { constructor( side: ISubscribable, - pred?: Predicate, - opts?: Partial + opts?: Partial> ) { - super(undefined, optsWithID("sidepart", opts)); + opts = optsWithID("sidepart", opts); + super(undefined, opts); this.buf = []; + const pred = opts.pred || (() => true); const $this = this; - pred = pred || (() => true); this.sideSub = side.subscribe({ next(x) { if ($this.buf.length && pred!(x)) { diff --git a/packages/rstream/src/subs/sidechain-toggle.ts b/packages/rstream/src/subs/sidechain-toggle.ts index 5b7705be54..6e7c4318bf 100644 --- a/packages/rstream/src/subs/sidechain-toggle.ts +++ b/packages/rstream/src/subs/sidechain-toggle.ts @@ -3,6 +3,11 @@ import { CommonOpts, ISubscribable } from "../api"; import { Subscription } from "../subscription"; import { optsWithID } from "../utils/idgen"; +export interface SidechainToggleOpts extends CommonOpts { + pred: Predicate; + initial: boolean; +} + /** * Filters values from input based on values received from side chain. * By default, the value read from the side chain is ignored, however @@ -31,10 +36,8 @@ import { optsWithID } from "../utils/idgen"; */ export const sidechainToggle = ( side: ISubscribable, - initial = true, - pred?: Predicate, - opts?: Partial -): Subscription => new SidechainToggle(side, initial, pred, opts); + opts?: Partial> +): Subscription => new SidechainToggle(side, opts); export class SidechainToggle extends Subscription { sideSub: Subscription; @@ -42,14 +45,13 @@ export class SidechainToggle extends Subscription { constructor( side: ISubscribable, - initial = true, - pred?: Predicate, - opts?: Partial + opts?: Partial> ) { - super(undefined, optsWithID("sidetoggle", opts)); - this.isActive = initial; + opts = optsWithID("sidetoggle", opts); + super(undefined, opts); + this.isActive = !!opts.initial; + const pred = opts.pred || (() => true); const $this = this; - pred = pred || (() => true); this.sideSub = side.subscribe({ next(x) { if (pred!(x)) { diff --git a/packages/rstream/src/subs/timeout.ts b/packages/rstream/src/subs/timeout.ts index 966434931e..7a369a4bd6 100644 --- a/packages/rstream/src/subs/timeout.ts +++ b/packages/rstream/src/subs/timeout.ts @@ -2,6 +2,19 @@ import { CommonOpts, State } from "../api"; import { Subscription } from "../subscription"; import { optsWithID } from "../utils/idgen"; +export interface TimeoutOpts extends CommonOpts { + /** + * Error object. + */ + error: any; + /** + * True, if timeout resets with each received value. + * + * @defaultValue false + */ + reset: boolean; +} + /** * A subscription that emits an arbitrary error object after a given * time. If no `error` is given, uses a new `Error` instance by default. @@ -11,16 +24,14 @@ import { optsWithID } from "../utils/idgen"; * the time interval since the last value has exceeded. * * @param timeoutMs timeout period in milliseconds - * @param error error object + * @param error * @param resetTimeout timeout reset flag * @param opts */ export const timeout = ( timeoutMs: number, - error?: any, - resetTimeout = false, - opts?: Partial -): Subscription => new Timeout(timeoutMs, error, resetTimeout, opts); + opts?: Partial +): Subscription => new Timeout(timeoutMs, opts); class Timeout extends Subscription { protected timeoutMs: number; @@ -28,16 +39,12 @@ class Timeout extends Subscription { protected errorObj: any; protected resetTimeout: boolean; - constructor( - timeoutMs: number, - error?: any, - resetTimeout = false, - opts?: Partial - ) { - super(undefined, optsWithID("timeout", opts)); + constructor(timeoutMs: number, opts?: Partial) { + opts = optsWithID("timeout", opts); + super(undefined, opts); this.timeoutMs = timeoutMs; - this.errorObj = error; - this.resetTimeout = resetTimeout; + this.errorObj = opts.error; + this.resetTimeout = opts.reset === true; this.reset(); } @@ -55,9 +62,7 @@ class Timeout extends Subscription { this.error( this.errorObj || new Error( - `Timeout stream "${this.id}" after ${ - this.timeoutMs - } ms` + `Timeout stream "${this.id}" after ${this.timeoutMs} ms` ) ); } diff --git a/packages/rstream/test/sidechain-partition.ts b/packages/rstream/test/sidechain-partition.ts index 0ee2c6e594..d1fe41935d 100644 --- a/packages/rstream/test/sidechain-partition.ts +++ b/packages/rstream/test/sidechain-partition.ts @@ -21,7 +21,10 @@ describe("SidechainPartition", function() { buf.push(x); }, done() { - assert.deepEqual(buf, [[1, 2], [3, 4, 5]]); + assert.deepEqual(buf, [ + [1, 2], + [3, 4, 5] + ]); done(); } }); @@ -38,12 +41,17 @@ describe("SidechainPartition", function() { }); it("partitions w/ predicate", (done) => { - src.subscribe(sidechainPartition(side, (x) => x === 1)).subscribe({ + src.subscribe( + sidechainPartition(side, { pred: (x: any) => x === 1 }) + ).subscribe({ next(x) { buf.push(x); }, done() { - assert.deepEqual(buf, [[1, 2, 3], [4, 5]]); + assert.deepEqual(buf, [ + [1, 2, 3], + [4, 5] + ]); done(); } }); diff --git a/packages/rstream/test/sidechain-toggle.ts b/packages/rstream/test/sidechain-toggle.ts index dc88c6c861..bf03ab3ed8 100644 --- a/packages/rstream/test/sidechain-toggle.ts +++ b/packages/rstream/test/sidechain-toggle.ts @@ -22,7 +22,7 @@ describe("SidechainToggle", () => { expect: any, done: Function ) => { - src.subscribe(sidechainToggle(side, initial, pred)).subscribe({ + src.subscribe(sidechainToggle(side, { initial, pred })).subscribe({ next(x) { buf.push(x); }, diff --git a/packages/rstream/test/timeout.ts b/packages/rstream/test/timeout.ts index 6f62fc65dc..9c0f0c9a43 100644 --- a/packages/rstream/test/timeout.ts +++ b/packages/rstream/test/timeout.ts @@ -19,7 +19,7 @@ describe("Timeout", () => { const error = "error object"; - timeout(TIMEOUT, error).subscribe({ + timeout(TIMEOUT, { error }).subscribe({ error: (err) => { assert.equal(err, error); done(); @@ -43,7 +43,7 @@ describe("Timeout", () => { this.timeout(TIMEOUT * 4); const res: any[] = []; - const t = timeout(TIMEOUT, null, true); + const t = timeout(TIMEOUT, { reset: true }); t.subscribe({ next: (x) => { res.push(x);