Skip to content

Commit

Permalink
feat(rstream): update sidechainPartition/Toggle and timeout
Browse files Browse the repository at this point in the history
- use options objects for args
- update tests
  • Loading branch information
postspectacular committed Nov 23, 2019
1 parent c6da17d commit 898eb53
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 40 deletions.
17 changes: 10 additions & 7 deletions packages/rstream/src/subs/sidechain-partition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { CommonOpts, ISubscribable, State } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";

export interface SidechainPartitionOpts<T> extends CommonOpts {
pred: Predicate<T>;
}

/**
* Buffers values from `src` until side chain fires, then emits buffer
* (unless empty) and repeats process until either input is done. By
Expand All @@ -29,23 +33,22 @@ import { optsWithID } from "../utils/idgen";
*/
export const sidechainPartition = <A, B>(
side: ISubscribable<B>,
pred?: Predicate<B>,
opts?: Partial<CommonOpts>
): Subscription<A, A[]> => new SidechainPartition<A, B>(side, pred, opts);
opts?: Partial<SidechainPartitionOpts<B>>
): Subscription<A, A[]> => new SidechainPartition<A, B>(side, opts);

export class SidechainPartition<A, B> extends Subscription<A, A[]> {
sideSub: Subscription<B, B>;
buf: A[];

constructor(
side: ISubscribable<B>,
pred?: Predicate<B>,
opts?: Partial<CommonOpts>
opts?: Partial<SidechainPartitionOpts<B>>
) {
super(undefined, optsWithID("sidepart", opts));
opts = optsWithID("sidepart", opts);
super(undefined, opts);
this.buf = [];
const pred = opts.pred || (() => true);
const $this = this;
pred = pred || (() => true);
this.sideSub = side.subscribe({
next(x) {
if ($this.buf.length && pred!(x)) {
Expand Down
22 changes: 12 additions & 10 deletions packages/rstream/src/subs/sidechain-toggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { CommonOpts, ISubscribable } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";

export interface SidechainToggleOpts<T> extends CommonOpts {
pred: Predicate<T>;
initial: boolean;
}

/**
* Filters values from input based on values received from side chain.
* By default, the value read from the side chain is ignored, however
Expand Down Expand Up @@ -31,25 +36,22 @@ import { optsWithID } from "../utils/idgen";
*/
export const sidechainToggle = <A, B>(
side: ISubscribable<B>,
initial = true,
pred?: Predicate<B>,
opts?: Partial<CommonOpts>
): Subscription<A, A> => new SidechainToggle(side, initial, pred, opts);
opts?: Partial<SidechainToggleOpts<B>>
): Subscription<A, A> => new SidechainToggle(side, opts);

export class SidechainToggle<A, B> extends Subscription<A, A> {
sideSub: Subscription<B, B>;
isActive: boolean;

constructor(
side: ISubscribable<B>,
initial = true,
pred?: Predicate<B>,
opts?: Partial<CommonOpts>
opts?: Partial<SidechainToggleOpts<B>>
) {
super(undefined, optsWithID("sidetoggle", opts));
this.isActive = initial;
opts = optsWithID("sidetoggle", opts);
super(undefined, opts);
this.isActive = !!opts.initial;
const pred = opts.pred || (() => true);
const $this = this;
pred = pred || (() => true);
this.sideSub = side.subscribe({
next(x) {
if (pred!(x)) {
Expand Down
39 changes: 22 additions & 17 deletions packages/rstream/src/subs/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ import { CommonOpts, State } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";

export interface TimeoutOpts extends CommonOpts {
/**
* Error object.
*/
error: any;
/**
* True, if timeout resets with each received value.
*
* @defaultValue false
*/
reset: boolean;
}

/**
* A subscription that emits an arbitrary error object after a given
* time. If no `error` is given, uses a new `Error` instance by default.
Expand All @@ -11,33 +24,27 @@ import { optsWithID } from "../utils/idgen";
* the time interval since the last value has exceeded.
*
* @param timeoutMs timeout period in milliseconds
* @param error error object
* @param error
* @param resetTimeout timeout reset flag
* @param opts
*/
export const timeout = <T>(
timeoutMs: number,
error?: any,
resetTimeout = false,
opts?: Partial<CommonOpts>
): Subscription<T, T> => new Timeout(timeoutMs, error, resetTimeout, opts);
opts?: Partial<TimeoutOpts>
): Subscription<T, T> => new Timeout(timeoutMs, opts);

class Timeout<T> extends Subscription<T, T> {
protected timeoutMs: number;
protected timeoutId: any;
protected errorObj: any;
protected resetTimeout: boolean;

constructor(
timeoutMs: number,
error?: any,
resetTimeout = false,
opts?: Partial<CommonOpts>
) {
super(undefined, optsWithID("timeout", opts));
constructor(timeoutMs: number, opts?: Partial<TimeoutOpts>) {
opts = optsWithID("timeout", opts);
super(undefined, opts);
this.timeoutMs = timeoutMs;
this.errorObj = error;
this.resetTimeout = resetTimeout;
this.errorObj = opts.error;
this.resetTimeout = opts.reset === true;
this.reset();
}

Expand All @@ -55,9 +62,7 @@ class Timeout<T> extends Subscription<T, T> {
this.error(
this.errorObj ||
new Error(
`Timeout stream "${this.id}" after ${
this.timeoutMs
} ms`
`Timeout stream "${this.id}" after ${this.timeoutMs} ms`
)
);
}
Expand Down
14 changes: 11 additions & 3 deletions packages/rstream/test/sidechain-partition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ describe("SidechainPartition", function() {
buf.push(x);
},
done() {
assert.deepEqual(buf, [[1, 2], [3, 4, 5]]);
assert.deepEqual(buf, [
[1, 2],
[3, 4, 5]
]);
done();
}
});
Expand All @@ -38,12 +41,17 @@ describe("SidechainPartition", function() {
});

it("partitions w/ predicate", (done) => {
src.subscribe(sidechainPartition(side, (x) => x === 1)).subscribe({
src.subscribe(
sidechainPartition(side, { pred: (x: any) => x === 1 })
).subscribe({
next(x) {
buf.push(x);
},
done() {
assert.deepEqual(buf, [[1, 2, 3], [4, 5]]);
assert.deepEqual(buf, [
[1, 2, 3],
[4, 5]
]);
done();
}
});
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/test/sidechain-toggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe("SidechainToggle", () => {
expect: any,
done: Function
) => {
src.subscribe(sidechainToggle(side, initial, pred)).subscribe({
src.subscribe(sidechainToggle(side, { initial, pred })).subscribe({
next(x) {
buf.push(x);
},
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/test/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe("Timeout", () => {

const error = "error object";

timeout(TIMEOUT, error).subscribe({
timeout(TIMEOUT, { error }).subscribe({
error: (err) => {
assert.equal(err, error);
done();
Expand All @@ -43,7 +43,7 @@ describe("Timeout", () => {
this.timeout(TIMEOUT * 4);

const res: any[] = [];
const t = timeout(TIMEOUT, null, true);
const t = timeout(TIMEOUT, { reset: true });
t.subscribe({
next: (x) => {
res.push(x);
Expand Down

0 comments on commit 898eb53

Please sign in to comment.