Skip to content

Commit b32399b

Browse files
committed
feat: implement onceWhen helper function
1 parent ba89442 commit b32399b

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import { EventEmitter } from 'node:events';
2+
import { onceWhen } from '../events';
3+
4+
describe('onceWhen tests', () => {
5+
test('should resolve when event is emitted and predicate matches', async () => {
6+
const emitter = new EventEmitter<{
7+
myTestEvent: [eventNumber: number, msg: string];
8+
}>();
9+
10+
setTimeout(() => {
11+
for (let i = 0; i <= 5; i++) {
12+
emitter.emit('myTestEvent', i, `Message ${i}`);
13+
}
14+
}, 10); // Emit after a delay
15+
16+
const [eventNumber, msg] = await onceWhen(emitter, 'myTestEvent', (eventNumber, msg) => {
17+
return eventNumber === 5;
18+
});
19+
expect(eventNumber).toBe(5);
20+
expect(msg).toBe('Message 5');
21+
22+
// Expect that the event listener was removed after onceWhen is finished
23+
expect(emitter.eventNames()).toStrictEqual([]);
24+
});
25+
26+
test('should reject if aborted immediately', async () => {
27+
const emitter = new EventEmitter<{
28+
myTestEvent: [eventNumber: number];
29+
}>();
30+
const controller = new AbortController();
31+
const abortReason = new Error('Test aborted');
32+
controller.abort(abortReason);
33+
await expect(
34+
onceWhen(emitter, 'myTestEvent', () => true, { signal: controller.signal })
35+
).rejects.toThrow(abortReason);
36+
37+
// Expect that the event listener was removed after onceWhen is finished
38+
expect(emitter.eventNames()).toStrictEqual([]);
39+
});
40+
41+
test('should reject if aborted before event is emitted', async () => {
42+
const emitter = new EventEmitter<{
43+
myTestEvent: [eventNumber: number];
44+
}>();
45+
const controller = new AbortController();
46+
const abortReason = new Error('Test aborted');
47+
// controller.abort(abortReason);
48+
setTimeout(() => {
49+
for (let i = 0; i <= 5; i++) {
50+
emitter.emit('myTestEvent', i);
51+
if (i === 3) {
52+
controller.abort(abortReason); // Abort after emitting some events
53+
}
54+
}
55+
}, 10); // Emit after a delay
56+
57+
let lastEventNumberSeen = 0;
58+
await expect(
59+
onceWhen(
60+
emitter,
61+
'myTestEvent',
62+
eventNumber => {
63+
lastEventNumberSeen = eventNumber;
64+
return false;
65+
},
66+
{ signal: controller.signal }
67+
)
68+
).rejects.toThrow(abortReason);
69+
70+
// Check that we saw events before the abort
71+
expect(lastEventNumberSeen).toBe(3);
72+
73+
// Expect that the event listener was removed after onceWhen is finished
74+
expect(emitter.eventNames()).toStrictEqual([]);
75+
});
76+
77+
test('should resolve if event is emitted before abort', async () => {
78+
const emitter = new EventEmitter<{
79+
myTestEvent: [eventNumber: number];
80+
}>();
81+
const controller = new AbortController();
82+
83+
setTimeout(() => {
84+
for (let i = 0; i <= 5; i++) {
85+
emitter.emit('myTestEvent', i);
86+
}
87+
controller.abort(); // Abort after emitting all events
88+
}, 10); // Emit after a delay
89+
90+
const [eventNumber] = await onceWhen(emitter, 'myTestEvent', eventNumber => eventNumber === 5, {
91+
signal: controller.signal,
92+
});
93+
expect(eventNumber).toBe(5);
94+
95+
// Expect that the event listener was removed after onceWhen is finished
96+
expect(emitter.eventNames()).toStrictEqual([]);
97+
});
98+
99+
test('should reject if predict function throws', async () => {
100+
const emitter = new EventEmitter<{
101+
myTestEvent: [eventNumber: number];
102+
}>();
103+
setTimeout(() => {
104+
for (let i = 0; i <= 5; i++) {
105+
emitter.emit('myTestEvent', i);
106+
}
107+
}, 10);
108+
109+
let lastEventNumberSeen = 0;
110+
const predictFunctionError = new Error('Predict function error');
111+
await expect(
112+
onceWhen(emitter, 'myTestEvent', eventNumber => {
113+
lastEventNumberSeen = eventNumber;
114+
if (eventNumber === 3) {
115+
throw predictFunctionError;
116+
}
117+
return false;
118+
})
119+
).rejects.toThrow(predictFunctionError);
120+
expect(lastEventNumberSeen).toBe(3);
121+
122+
// Expect that the event listener was removed after onceWhen is finished
123+
expect(emitter.eventNames()).toStrictEqual([]);
124+
});
125+
});

src/helpers/events.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { EventEmitter, addAbortListener } from 'node:events';
2+
3+
// This is a workaround for Node.js versions that do not support Symbol.dispose
4+
const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose');
5+
6+
/**
7+
* Creates a Promise that resolves when the specified `eventName` is emitted by the `EventEmitter`
8+
* and the provided predicate returns `true` for the emitted arguments.
9+
*
10+
* Similar to [`events.once`](https://nodejs.org/api/events.html#eventsonceemitter-name-options),
11+
* but includes support for a predicate function to filter events. Only events for which
12+
* the predicate returns `true` will cause the Promise to resolve.
13+
*
14+
* The resolved value is an array of the arguments emitted with the event.
15+
*
16+
* Supports typed `EventEmitter`s and optional cancellation via `AbortSignal`.
17+
*/
18+
export function onceWhen<
19+
EventMap extends Record<string, any[]> = Record<string, any[]>,
20+
K extends Extract<keyof EventMap, string> = Extract<keyof EventMap, string>
21+
>(
22+
emitter: EventEmitter<EventMap>,
23+
eventName: K,
24+
predicate: (...args: EventMap[K]) => boolean,
25+
options?: { signal?: AbortSignal }
26+
): Promise<EventMap[K]> {
27+
return new Promise((resolve, reject) => {
28+
// Immediate abort check
29+
if (options?.signal?.aborted) {
30+
reject((options.signal.reason as Error) ?? new Error('Aborted'));
31+
return;
32+
}
33+
34+
// Cleanup helper: remove both the event listener and the abort listener
35+
const cleanup = () => {
36+
// eslint-disable-next-line @typescript-eslint/no-use-before-define
37+
(emitter as EventEmitter).off(eventName, listener);
38+
// eslint-disable-next-line @typescript-eslint/no-use-before-define
39+
disposable?.[DisposeSymbol]();
40+
};
41+
42+
// Abort handler
43+
const onAbort = () => {
44+
cleanup();
45+
reject((options?.signal?.reason as Error) ?? new Error('Aborted'));
46+
};
47+
48+
// Our event listener that checks the predicate
49+
const listener = (...args: EventMap[K]) => {
50+
try {
51+
if (predicate(...args)) {
52+
cleanup();
53+
resolve(args);
54+
}
55+
} catch (err) {
56+
cleanup();
57+
reject(err as Error);
58+
return;
59+
}
60+
};
61+
62+
// Install the AbortSignal listener via Node’s helper
63+
const disposable = options?.signal ? addAbortListener(options.signal, onAbort) : undefined;
64+
65+
(emitter as EventEmitter).on(eventName, listener);
66+
});
67+
}

src/helpers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ export * from './iterators';
22
export * from './time';
33
export * from './values';
44
export * from './is-debugging';
5+
export * from './events';
56
export { WorkerThreadManager } from './worker-thread-manager';
67
export type { WorkerPoolModuleInterface } from './worker-thread-manager';

0 commit comments

Comments
 (0)