Skip to content

Commit

Permalink
feat(rstream): add/update fork/joinBuffer generics, optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 7, 2019
1 parent 39866a8 commit 8f0c55c
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Fn, Fn3 } from "@thi.ng/api";
import { ArrayLikeIterable, Fn, Fn3 } from "@thi.ng/api";
import {
comp,
map,
Expand All @@ -10,6 +10,10 @@ import { sync } from "./stream-sync";
import { tunnel } from "./subs/tunnel";
import { Subscription } from "./subscription";

type Sliceable<T> = ArrayLike<T> & {
slice(a: number, b?: number): Sliceable<T>;
};

export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
/**
* Input stream to attach to obtain work items from.
Expand Down Expand Up @@ -157,9 +161,9 @@ export const forkJoin = <IN, MSG, RES, OUT>(
* size = 2, then the last 3 (i.e. 8 - 10 / 2) workers will only receive
* empty workloads.
*
* More generally, if the input buffer size is not equally divisible
* over the given number of workers, the last worker might receive a
* larger or smaller chunk.
* More generally, if the input buffer size is not equally distributable
* over the given number of workers, the last worker(s) might receive a
* larger, smaller or empty chunk.
*
* ```
* forkJoin<number[], number[], number[], number[]>({
Expand All @@ -177,23 +181,23 @@ export const forkJoin = <IN, MSG, RES, OUT>(
*
* @param minChunkSize
*/
export const forkBuffer = (minChunkSize = 1) => <T>(
export const forkBuffer = (minChunkSize = 1) => <T extends Sliceable<any>>(
id: number,
numWorkers: number,
buf: T[]
buf: T
) => {
const chunkSize = Math.max(minChunkSize, (buf.length / numWorkers) | 0);
return id < numWorkers - 1
? buf.slice(id * chunkSize, (id + 1) * chunkSize)
: buf.slice(id * chunkSize);
? <T>buf.slice(id * chunkSize, (id + 1) * chunkSize)
: <T>buf.slice(id * chunkSize);
};

/**
* Higher-order join function for scenarios involving the split-parallel
* processing of a large buffer. The returned function is meant to be
* used as `join` function in a `ForkJoinOpts` config, receives the
* processed result chunks from all workers and concatenates them back
* into a single result array.
* processed result chunks from all workers (ordered by worker ID) and
* concatenates them back into a single result array.
*
* The optional `fn` arg can be used to pick the actual result chunk
* from each worker result. This is useful if the worker result type is
Expand All @@ -203,6 +207,7 @@ export const forkBuffer = (minChunkSize = 1) => <T>(
*
* @param fn
*/
export const joinBuffer = <A, B extends Array<any>>(
fn: Fn<A, B> = (x) => <any>x
) => (parts: A[]) => <B>[...mapcat(fn, parts)];
export const joinBuffer = <A, B>(fn?: Fn<A, ArrayLikeIterable<B>>) =>
fn
? (parts: A[]) => <B[]>[...mapcat(fn, parts)]
: (parts: A[]) => <B[]>Array.prototype.concat.apply([], parts);

0 comments on commit 8f0c55c

Please sign in to comment.