Skip to content

Commit

Permalink
feat(fibers): add timeSliceIterable()
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 6, 2023
1 parent 4549a85 commit fe13b03
Showing 1 changed file with 48 additions and 1 deletion.
49 changes: 48 additions & 1 deletion packages/fibers/src/ops.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Fn0, Nullable, Predicate } from "@thi.ng/api";
import type { Fn, Fn0, Nullable, Predicate } from "@thi.ng/api";
import { shuffle as $shuffle } from "@thi.ng/arrays/shuffle";
import { now, timeDiff } from "@thi.ng/bench/now";
import type { IRandom } from "@thi.ng/random";
Expand Down Expand Up @@ -159,6 +159,53 @@ export const timeSlice = (
}
}, opts);

/**
* Similar to {@link timeSlice}, but for consuming the given iterable in a
* time-sliced manner. With each fiber update consumes & buffers values from
* `src` in chunks for `maxTime` milliseconds, then passes recorded chunk to
* given `consume` function in order to process these values further.
*
* @example
* ```ts
* import { range } from "@this.ng/transducers";
*
* // consume & batch process iterable in 16ms time slices
* timeSliceIterable(
* range(1_000_000),
* (chunk) => console.log(chunk),
* 16
* ).run();
* ```
*
* @param src
* @param consume
* @param maxTime
* @param opts
*/
export const timeSliceIterable = <T>(
src: Iterable<T>,
consume: Fn<T[], void>,
maxTime: number,
opts?: Partial<FiberOpts>
) =>
fiber(function* () {
const iter = src[Symbol.iterator]();
while (true) {
let t0 = now();
const buf: T[] = [];
do {
const { value, done } = iter.next();
if (done) {
consume(buf);
return;
}
buf.push(value);
} while (timeDiff(t0, now()) < maxTime);
consume(buf);
yield;
}
}, opts);

/**
* Returns a fiber which "blocks" until given predicate function returns true.
*
Expand Down

0 comments on commit fe13b03

Please sign in to comment.