diff --git a/README.md b/README.md index f9b82914..46c561d6 100644 --- a/README.md +++ b/README.md @@ -702,6 +702,57 @@ options on to the custom `TaskQueue` implementation. (Note that because the queue options are set as a property on the task, tasks with queue options cannot be submitted as JavaScript primitives). +### Built-In Queues +Piscina also provides the `FixedQueue`, a more performant task queue implementation based on [`FixedQueue`](https://github.com/nodejs/node/blob/de7b37880f5a541d5f874c1c2362a65a4be76cd0/lib/internal/fixed_queue.js) from Node.js project. + +Here are some benchmarks to compare new `FixedQueue` with `ArrayTaskQueue` (current default). The benchmarks demonstrate substantial improvements in push and shift operations, especially with larger queue sizes. +``` +Queue size = 1000 +┌─────────┬─────────────────────────────────────────┬───────────┬────────────────────┬──────────┬─────────┐ +│ (index) │ Task Name │ ops/sec │ Average Time (ns) │ Margin │ Samples │ +├─────────┼─────────────────────────────────────────┼───────────┼────────────────────┼──────────┼─────────┤ +│ 0 │ 'ArrayTaskQueue full push + full shift' │ '9 692' │ 103175.15463917515 │ '±0.80%' │ 970 │ +│ 1 │ 'FixedQueue full push + full shift' │ '131 879' │ 7582.696390658352 │ '±1.81%' │ 13188 │ +└─────────┴─────────────────────────────────────────┴───────────┴────────────────────┴──────────┴─────────┘ + +Queue size = 100_000 +┌─────────┬─────────────────────────────────────────┬─────────┬────────────────────┬──────────┬─────────┐ +│ (index) │ Task Name │ ops/sec │ Average Time (ns) │ Margin │ Samples │ +├─────────┼─────────────────────────────────────────┼─────────┼────────────────────┼──────────┼─────────┤ +│ 0 │ 'ArrayTaskQueue full push + full shift' │ '0' │ 1162376920.0000002 │ '±1.77%' │ 10 │ +│ 1 │ 'FixedQueue full push + full shift' │ '1 026' │ 974328.1553396407 │ '±2.51%' │ 103 │ +└─────────┴─────────────────────────────────────────┴─────────┴────────────────────┴──────────┴─────────┘ +``` +In terms of Piscina performance itself, using `FixedQueue` with a queue size of 100,000 queued tasks can result in up to 6 times faster execution times. + +Users can import `FixedQueue` from the `Piscina` package and pass it as the `taskQueue` option to leverage its benefits. + + +#### Using FixedQueue Example + +Here's an example of how to use the `FixedQueue`: + +```js +const { Piscina, FixedQueue } = require('piscina'); +const { resolve } = require('path'); + +// Create a Piscina pool with FixedQueue +const piscina = new Piscina({ + filename: resolve(__dirname, 'worker.js'), + taskQueue: new FixedQueue() +}); + +// Submit tasks to the pool +for (let i = 0; i < 10; i++) { + piscina.runTask({ data: i }).then((result) => { + console.log(result); + }).catch((error) => { + console.error(error); + }); +} +``` +**Note** The `FixedQueue` will become the default task queue implementation in a next major version. + ## Current Limitations (Things we're working on / would love help with) * Improved Documentation diff --git a/benchmark/piscina-queue-comparison.js b/benchmark/piscina-queue-comparison.js new file mode 100644 index 00000000..353fc4fa --- /dev/null +++ b/benchmark/piscina-queue-comparison.js @@ -0,0 +1,42 @@ +const { Bench } = require('tinybench'); +const { Piscina, FixedQueue, ArrayTaskQueue } = require('..'); +const { resolve } = require('node:path'); + +const QUEUE_SIZE = 100_000; + +const bench = new Bench({ time: 100 }); + +bench + .add('Piscina with ArrayTaskQueue', async () => { + const queue = new ArrayTaskQueue(); + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/add.js'), + taskQueue: queue + }); + const tasks = []; + for (let i = 0; i < QUEUE_SIZE; i++) { + tasks.push(pool.runTask({ a: 4, b: 6 })); + } + await Promise.all(tasks); + await pool.destroy(); + }) + .add('Piscina with FixedQueue', async () => { + const queue = new FixedQueue(); + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/add.js'), + taskQueue: queue + }); + const tasks = []; + for (let i = 0; i < QUEUE_SIZE; i++) { + tasks.push(pool.runTask({ a: 4, b: 6 })); + } + await Promise.all(tasks); + await pool.destroy(); + }); + +(async () => { + await bench.warmup(); + await bench.run(); + + console.table(bench.table()); +})(); diff --git a/benchmark/queue-comparison.js b/benchmark/queue-comparison.js new file mode 100644 index 00000000..a00335e5 --- /dev/null +++ b/benchmark/queue-comparison.js @@ -0,0 +1,33 @@ +const { Bench } = require('tinybench'); +const { ArrayTaskQueue, FixedQueue } = require('..'); + +const QUEUE_SIZE = 100_000; + +const bench = new Bench({ time: 100 }); + +bench + .add('ArrayTaskQueue full push + full shift', async () => { + const queue = new ArrayTaskQueue(); + for (let i = 0; i < QUEUE_SIZE; i++) { + queue.push(i); + } + for (let i = 0; i < QUEUE_SIZE; i++) { + queue.shift(); + } + }) + .add('FixedQueue full push + full shift', async () => { + const queue = new FixedQueue(); + for (let i = 0; i < QUEUE_SIZE; i++) { + queue.push(i); + } + for (let i = 0; i < QUEUE_SIZE; i++) { + queue.shift(); + } + }); + +(async () => { + await bench.warmup(); + await bench.run(); + + console.table(bench.table()); +})(); diff --git a/benchmark/simple-benchmark-fixed-queue.js b/benchmark/simple-benchmark-fixed-queue.js new file mode 100644 index 00000000..7a357cc4 --- /dev/null +++ b/benchmark/simple-benchmark-fixed-queue.js @@ -0,0 +1,33 @@ +'use strict'; +const { Piscina, FixedQueue } = require('..'); + +const { resolve } = require('path'); + +async function simpleBenchmark ({ duration = 10000 } = {}) { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/add.js'), + taskQueue: new FixedQueue() + }); + let done = 0; + + const results = []; + const start = process.hrtime.bigint(); + while (pool.queueSize === 0) { + results.push(scheduleTasks()); + } + + async function scheduleTasks () { + while ((process.hrtime.bigint() - start) / 1_000_000n < duration) { + await pool.runTask({ a: 4, b: 6 }); + done++; + } + } + + await Promise.all(results); + + return done / duration * 1e3; +} + +simpleBenchmark().then((opsPerSecond) => { + console.log(`opsPerSecond: ${opsPerSecond} (with FixedQueue as taskQueue)`); +}); diff --git a/benchmark/simple-benchmark.js b/benchmark/simple-benchmark.js index 2e9a93c5..4af78fc6 100644 --- a/benchmark/simple-benchmark.js +++ b/benchmark/simple-benchmark.js @@ -25,5 +25,5 @@ async function simpleBenchmark ({ duration = 10000 } = {}) { } simpleBenchmark().then((opsPerSecond) => { - console.log(`opsPerSecond: ${opsPerSecond}`); + console.log(`opsPerSecond: ${opsPerSecond} (with default taskQueue)`); }); diff --git a/package-lock.json b/package-lock.json index 05cb7810..952486e5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "snazzy": "^9.0.0", "standardx": "^7.0.0", "tap": "^16.3.7", + "tinybench": "^2.8.0", "ts-node": "^10.9.2", "typescript": "5.4.5" }, @@ -7775,6 +7776,12 @@ "integrity": "sha512-N+8UisAXDGk8PFXP4HAzVR9nbfmVJ3zYLAWiTIoqC5v5isinhr+r5uaO8+7r3BMfuNIufIsA7RdpVgacC2cSpw==", "dev": true }, + "node_modules/tinybench": { + "version": "2.8.0", + "resolved": "https://registry.npmjs.org/tinybench/-/tinybench-2.8.0.tgz", + "integrity": "sha512-1/eK7zUnIklz4JUUlL+658n58XO2hHLQfSk1Zf2LKieUjxidN16eKFEoDEfjHc3ohofSSqK3X5yO6VGb6iW8Lw==", + "dev": true + }, "node_modules/to-fast-properties": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/to-fast-properties/-/to-fast-properties-2.0.0.tgz", diff --git a/package.json b/package.json index 9c4f4bbb..43bc534b 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,14 @@ "test": "c8 tap", "test:ci": "npm run lint && npm run build && npm run test:coverage", "test:coverage": "c8 --reporter=lcov tap --cov", - "prepack": "npm run build" + "prepack": "npm run build", + "bench": "npm run bench:taskqueue && npm run bench:piscina", + "bench:piscina": "npm run benchmark:piscina-default &&npm run benchmark:piscina-fixed-queue && npm run benchmark:piscina-comparison", + "bench:taskqueue": "npm run benchmark:queue-comparison", + "benchmark:piscina-default": "node benchmark/simple-benchmark.js", + "benchmark:piscina-fixed-queue": "node benchmark/simple-benchmark-fixed-queue.js", + "benchmark:piscina-comparison": "node benchmark/piscina-queue-comparison.js", + "benchmark:queue-comparison": "node benchmark/queue-comparison.js" }, "repository": { "type": "git", @@ -44,6 +51,7 @@ "snazzy": "^9.0.0", "standardx": "^7.0.0", "tap": "^16.3.7", + "tinybench": "^2.8.0", "ts-node": "^10.9.2", "typescript": "5.4.5" }, diff --git a/src/fixed-queue.ts b/src/fixed-queue.ts new file mode 100644 index 00000000..bb57a066 --- /dev/null +++ b/src/fixed-queue.ts @@ -0,0 +1,188 @@ +/* + * Modified Fixed Queue Implementation based on the one from Node.js Project + * License: MIT License + * Source: https://github.com/nodejs/node/blob/de7b37880f5a541d5f874c1c2362a65a4be76cd0/lib/internal/fixed_queue.js + */ +import assert from 'node:assert'; +import { TaskQueue, Task } from './common'; +// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. +const kSize = 2048; +const kMask = kSize - 1; + +// The FixedQueue is implemented as a singly-linked list of fixed-size +// circular buffers. It looks something like this: +// +// head tail +// | | +// v v +// +-----------+ <-----\ +-----------+ <------\ +-----------+ +// | [null] | \----- | next | \------- | next | +// +-----------+ +-----------+ +-----------+ +// | item | <-- bottom | item | <-- bottom | [empty] | +// | item | | item | | [empty] | +// | item | | item | | [empty] | +// | item | | item | | [empty] | +// | item | | item | bottom --> | item | +// | item | | item | | item | +// | ... | | ... | | ... | +// | item | | item | | item | +// | item | | item | | item | +// | [empty] | <-- top | item | | item | +// | [empty] | | item | | item | +// | [empty] | | [empty] | <-- top top --> | [empty] | +// +-----------+ +-----------+ +-----------+ +// +// Or, if there is only one circular buffer, it looks something +// like either of these: +// +// head tail head tail +// | | | | +// v v v v +// +-----------+ +-----------+ +// | [null] | | [null] | +// +-----------+ +-----------+ +// | [empty] | | item | +// | [empty] | | item | +// | item | <-- bottom top --> | [empty] | +// | item | | [empty] | +// | [empty] | <-- top bottom --> | item | +// | [empty] | | item | +// +-----------+ +-----------+ +// +// Adding a value means moving `top` forward by one, removing means +// moving `bottom` forward by one. After reaching the end, the queue +// wraps around. +// +// When `top === bottom` the current queue is empty and when +// `top + 1 === bottom` it's full. This wastes a single space of storage +// but allows much quicker checks. + +class FixedCircularBuffer { + bottom: number + top: number + list: Array + next: FixedCircularBuffer | null + _size: number + + constructor () { + this.bottom = 0; + this.top = 0; + this.list = new Array(kSize); + this.next = null; + this._size = 0; + } + + isEmpty () { + return this.top === this.bottom && this._size === 0; + } + + isFull () { + return this.top === this.bottom && this._size === kSize; + } + + push (data:Task) { + this.list[this.top] = data; + this.top = (this.top + 1) & kMask; + this._size++; + } + + shift () { + const nextItem = this.list[this.bottom]; + if (nextItem === undefined) { return null; } + this.list[this.bottom] = undefined; + this.bottom = (this.bottom + 1) & kMask; + this._size--; + return nextItem; + } + + remove (task: Task) { + const indexToRemove = this.list.indexOf(task); + + assert.notStrictEqual(indexToRemove, -1); + let curr = indexToRemove; + while (true) { + const next = (curr + 1) & kMask; + this.list[curr] = this.list[next]; + if (this.list[curr] === undefined) { + break; + } + if (next === indexToRemove) { + this.list[curr] = undefined; + break; + } + curr = next; + } + this.top = (this.top - 1) & kMask; + this._size--; + } +} + +export default class FixedQueue implements TaskQueue { + head: FixedCircularBuffer + tail: FixedCircularBuffer + _size: number = 0 + + constructor () { + this.head = this.tail = new FixedCircularBuffer(); + } + + isEmpty () { + return this.head.isEmpty(); + } + + push (data:Task) { + if (this.head.isFull()) { + // Head is full: Creates a new queue, sets the old queue's `.next` to it, + // and sets it as the new main queue. + this.head = this.head.next = new FixedCircularBuffer(); + } + this.head.push(data); + this._size++; + } + + shift (): Task | null { + const tail = this.tail; + const next = tail.shift(); + if (next !== null) this._size--; + if (tail.isEmpty() && tail.next !== null) { + // If there is another queue, it forms the new tail. + this.tail = tail.next; + tail.next = null; + } + return next; + } + + remove (task: Task) { + let prev: FixedCircularBuffer | null = null; + let buffer = this.tail; + while (true) { + if (buffer.list.includes(task)) { + buffer.remove(task); + this._size--; + break; + } + if (buffer.next === null) break; + prev = buffer; + buffer = buffer.next; + } + if (buffer.isEmpty()) { + // removing tail + if (prev === null) { + // if tail is not the last buffer + if (buffer.next !== null) this.tail = buffer.next; + } else { + // removing head + if (buffer.next === null) { + this.head = prev; + } else { + // removing buffer from middle + prev.next = buffer.next; + } + } + } + } + + get size () { + return this._size; + } +}; diff --git a/src/index.ts b/src/index.ts index 8aec98b5..7a009098 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,6 +27,7 @@ import { kTransferable, kValue } from './common'; +import FixedQueue from './fixed-queue'; import { version } from '../package.json'; import { setTimeout as sleep } from 'timers/promises'; @@ -133,7 +134,7 @@ type EnvSpecifier = typeof Worker extends { new (filename : never, options?: { env: infer T }) : Worker; } ? T : never; -class ArrayTaskQueue implements TaskQueue { +export class ArrayTaskQueue implements TaskQueue { tasks : Task[] = []; get size () { return this.tasks.length; } @@ -1361,11 +1362,17 @@ export default class Piscina extends EventEmitterAsyncResource { export const move = Piscina.move; export const isWorkerThread = Piscina.isWorkerThread; export const workerData = Piscina.workerData; +// Mutate Piscina class to allow named import in commonjs +// @ts-expect-error +Piscina.FixedQueue = FixedQueue; +// @ts-expect-error +Piscina.ArrayTaskQueue = ArrayTaskQueue; export { Piscina, kTransferable as transferableSymbol, kValue as valueSymbol, kQueueOptions as queueOptionsSymbol, - version + version, + FixedQueue }; diff --git a/test/fixed-queue.ts b/test/fixed-queue.ts new file mode 100644 index 00000000..b1bd3312 --- /dev/null +++ b/test/fixed-queue.ts @@ -0,0 +1,181 @@ +import { test } from 'tap'; +import { Task, kQueueOptions } from '../dist/src/common'; +import { Piscina, FixedQueue } from '..'; +import { resolve } from 'node:path'; + +class QueueTask implements Task { + get [kQueueOptions] () { + return null; + } +} + +test('queue length', async ({ equal }) => { + const queue = new FixedQueue(); + + equal(queue.size, 0); + + queue.push(new QueueTask()); + + equal(queue.size, 1); + + queue.shift(); + + equal(queue.size, 0); +}); + +test('queue length should not become negative', async ({ equal }) => { + const queue = new FixedQueue(); + + equal(queue.size, 0); + + queue.shift(); + + equal(queue.size, 0); +}); + +test('queue remove', async ({ equal }) => { + const queue = new FixedQueue(); + + const task = new QueueTask(); + + equal(queue.size, 0, 'should be empty on start'); + + queue.push(task); + + equal(queue.size, 1, 'should contain single task after push'); + + queue.remove(task); + + equal(queue.size, 0, 'should be empty after task removal'); +}); + +test('remove not queued task should not lead to errors', async ({ equal }) => { + const queue = new FixedQueue(); + + const task = new QueueTask(); + + equal(queue.size, 0, 'should be empty on start'); + + queue.remove(task); + + equal(queue.size, 0, 'should be empty after task removal'); +}); + +test('removing elements from intermediate CircularBuffer should not lead to issues', async ({ equal, same }) => { + const queue = new FixedQueue(); + + const batchSize = 2048; + + const firstBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const secondBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask()); + + const tasks = firstBatch.concat(secondBatch, thirdBatch); + + for (const task of tasks) { + queue.push(task); + } + equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`); + + let size = queue.size; + for (const task of secondBatch) { + queue.remove(task); + equal(queue.size, --size, `should contain ${size} items`); + } + + const expected = firstBatch.concat(thirdBatch); + const actual = []; + while (!queue.isEmpty()) { + const task = queue.shift(); + actual.push(task); + } + same(actual, expected); +}); + +test('removing elements from first CircularBuffer should not lead to issues', async ({ equal, same }) => { + const queue = new FixedQueue(); + + const batchSize = 2048; + + const firstBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const secondBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask()); + + const tasks = firstBatch.concat(secondBatch, thirdBatch); + + for (const task of tasks) { + queue.push(task); + } + equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`); + + let size = queue.size; + for (const task of firstBatch) { + queue.remove(task); + equal(queue.size, --size, `should contain ${size} items`); + } + + const expected = secondBatch.concat(thirdBatch); + const actual = []; + while (!queue.isEmpty()) { + const task = queue.shift(); + actual.push(task); + } + same(actual, expected); +}); + +test('removing elements from last CircularBuffer should not lead to issues', async ({ equal, same }) => { + const queue = new FixedQueue(); + + const batchSize = 2048; + + const firstBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const secondBatch = Array.from({ length: batchSize }, () => new QueueTask()); + const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask()); + + const tasks = firstBatch.concat(secondBatch, thirdBatch); + + for (const task of tasks) { + queue.push(task); + } + equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`); + + let size = queue.size; + for (const task of thirdBatch) { + queue.remove(task); + equal(queue.size, --size, `should contain ${size} items`); + } + + const expected = firstBatch.concat(secondBatch); + const actual = []; + while (!queue.isEmpty()) { + const task = queue.shift(); + actual.push(task); + } + same(actual, expected); +}); + +test('simple integraion with Piscina', async ({ equal }) => { + const queue = new FixedQueue(); + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts'), + taskQueue: queue + }); + + const result = await pool.runTask(null); + equal(result, 'done'); +}); + +test('concurrent calls with Piscina', async ({ same }) => { + const queue = new FixedQueue(); + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/eval-async.js'), + taskQueue: queue + }); + + const tasks = ['1+1', '2+2', '3+3']; + const results = await Promise.all(tasks.map((task) => pool.runTask(task))); + // eslint-disable-next-line + const expected = tasks.map(eval); + + same(results, expected); +});