Skip to content

Commit

Permalink
Implement Semaphore.waitForUnlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Dolan Murvihill authored and Dolan Murvihill committed Jan 23, 2024
1 parent fd097a7 commit b1a38fc
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
67 changes: 54 additions & 13 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ import { E_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';


interface Nice {
nice: number;
}

interface QueueEntry {
resolve(result: [number, SemaphoreInterface.Releaser]): void;

reject(error: unknown): void;

weight: number;
nice: number;
}

interface Waiter {
resolve(): void;

nice: number;
}

class Semaphore implements SemaphoreInterface {
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {}
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {
}

acquire(weight = 1, nice = 0): Promise<[number, SemaphoreInterface.Releaser]> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);
Expand Down Expand Up @@ -43,12 +56,15 @@ class Semaphore implements SemaphoreInterface {
waitForUnlock(weight = 1, nice = 0): Promise<void> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
this._weightedWaiters[weight - 1].push(resolve);

this._dispatchQueue();
});
if (this._couldLockImmediately(weight, nice)) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
insertSorted(this._weightedWaiters[weight - 1], { resolve, nice });
this._dispatchQueue();
});
}
}

isLocked(): boolean {
Expand Down Expand Up @@ -101,16 +117,41 @@ class Semaphore implements SemaphoreInterface {
}

private _drainUnlockWaiters(): void {
for (let weight = this._value; weight > 0; weight--) {
if (!this._weightedWaiters[weight - 1]) continue;

this._weightedWaiters[weight - 1].forEach((waiter) => waiter());
this._weightedWaiters[weight - 1] = [];
if (this._queue.length === 0) {
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
waiters.forEach((waiter) => waiter.resolve());
this._weightedWaiters[weight - 1] = [];
}
} else {
const queuedNice = this._queue[0].nice;
for (let weight = this._value; weight > 0; weight--) {
const waiters = this._weightedWaiters[weight - 1];
if (!waiters) continue;
const i = waiters.findIndex((waiter) => waiter.nice >= queuedNice);
(i === -1 ? waiters : waiters.splice(0, i))
.forEach((waiter => { waiter.resolve(); }));
}
}
}

private _couldLockImmediately(weight: number, nice: number) {
return (this._queue.length === 0 || this._queue[0].nice > nice) &&
weight <= this._value;
}

private _queue: Array<QueueEntry> = [];
private _weightedWaiters: Array<Array<() => void>> = [];
private _weightedWaiters: Array<Array<Waiter>> = [];
}

function insertSorted<T extends Nice>(a: T[], v: T) {
const i = a.findIndex((other) => v.nice < other.nice);
if (i === -1) {
a.push(v);
} else {
a.splice(i, 0, v);
}
}

export default Semaphore;
4 changes: 2 additions & 2 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
return sync.cancel();
},

waitForUnlock: (weight?: number): Promise<void> => {
waitForUnlock: (weight?: number, nice?: number): Promise<void> => {
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
}

return new Promise((resolve, reject) => {
const handle = setTimeout(() => reject(timeoutError), timeout);
sync.waitForUnlock(weight).then(() => {
sync.waitForUnlock(weight, nice).then(() => {
clearTimeout(handle);
resolve();
});
Expand Down
10 changes: 9 additions & 1 deletion test/semaphoreSuite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,15 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
assert.deepStrictEqual([flag1, flag2], [true, true]);
});

test('waitForUnlock unblocks the nicest waiters last');
test('waitForUnlock unblocks the nicest waiters last', async () => {
const calledBack: number[] = [];
semaphore.acquire(3, 1); // A big heavy waiting task
semaphore.waitForUnlock(1, 0).then(() => { calledBack.push(0); }); // High priority
semaphore.waitForUnlock(1, 1).then(() => { calledBack.push(1); }); // Queued behind the heavy task
semaphore.waitForUnlock(1, 2).then(() => { calledBack.push(2); }); // Low priority
await clock.runAllAsync();
assert.deepStrictEqual(calledBack, [0]);
});

test('waitForUnlock only unblocks when the semaphore can actually be acquired again', async () => {
semaphore.acquire(2);
Expand Down

0 comments on commit b1a38fc

Please sign in to comment.