Skip to content

Commit 7b3c6fd

Browse files
authored
fix(worker): avoid a race in shutdown hooks (#918)
1 parent 2e94d96 commit 7b3c6fd

File tree

2 files changed

+56
-13
lines changed

2 files changed

+56
-13
lines changed

packages/test/src/test-worker-lifecycle.ts

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* @module
66
*/
77
import { Worker } from '@temporalio/worker';
8+
import { Runtime } from '@temporalio/worker';
89
import test from 'ava';
910
import { RUN_INTEGRATION_TESTS } from './helpers';
1011
import { defaultOptions, isolateFreeWorker } from './mock-native-worker';
@@ -27,23 +28,59 @@ if (RUN_INTEGRATION_TESTS) {
2728
t.is(worker.getState(), 'STOPPED');
2829
await t.throwsAsync(worker.run(), { message: 'Poller was already started' });
2930
});
31+
32+
test.serial('Worker shuts down gracefully if interrupted before running', async (t) => {
33+
const worker = await Worker.create({
34+
...defaultOptions,
35+
shutdownGraceTime: '500ms',
36+
taskQueue: 'shutdown-test',
37+
});
38+
t.is(worker.getState(), 'INITIALIZED');
39+
process.emit('SIGINT', 'SIGINT');
40+
const p = worker.run();
41+
t.is(worker.getState(), 'RUNNING');
42+
await p;
43+
t.is(worker.getState(), 'STOPPED');
44+
});
3045
}
3146

3247
test.serial('Mocked run shuts down gracefully', async (t) => {
33-
const worker = isolateFreeWorker({
34-
shutdownGraceTime: '500ms',
35-
taskQueue: 'shutdown-test',
36-
});
37-
t.is(worker.getState(), 'INITIALIZED');
38-
const p = worker.run();
39-
t.is(worker.getState(), 'RUNNING');
40-
process.emit('SIGINT', 'SIGINT');
41-
await p;
42-
t.is(worker.getState(), 'STOPPED');
43-
await t.throwsAsync(worker.run(), { message: 'Poller was already started' });
48+
try {
49+
const worker = isolateFreeWorker({
50+
shutdownGraceTime: '500ms',
51+
taskQueue: 'shutdown-test',
52+
});
53+
t.is(worker.getState(), 'INITIALIZED');
54+
const p = worker.run();
55+
t.is(worker.getState(), 'RUNNING');
56+
process.emit('SIGINT', 'SIGINT');
57+
await p;
58+
t.is(worker.getState(), 'STOPPED');
59+
await t.throwsAsync(worker.run(), { message: 'Poller was already started' });
60+
} finally {
61+
if (Runtime._instance) await Runtime._instance.shutdown();
62+
}
63+
});
64+
65+
test.serial('Mocked run shuts down gracefully if interrupted before running', async (t) => {
66+
try {
67+
const worker = isolateFreeWorker({
68+
shutdownGraceTime: '500ms',
69+
taskQueue: 'shutdown-test',
70+
});
71+
// worker.native.initiateShutdown = () => new Promise(() => undefined);
72+
t.is(worker.getState(), 'INITIALIZED');
73+
process.emit('SIGINT', 'SIGINT');
74+
const p = worker.run();
75+
t.is(worker.getState(), 'RUNNING');
76+
await p;
77+
t.is(worker.getState(), 'STOPPED');
78+
} finally {
79+
if (Runtime._instance) await Runtime._instance.shutdown();
80+
}
4481
});
4582

46-
test('Mocked run throws if not shut down gracefully', async (t) => {
83+
test.serial('Mocked run throws if not shut down gracefully', async (t) => {
4784
const worker = isolateFreeWorker({
4885
shutdownGraceTime: '5ms',
4986
taskQueue: 'shutdown-test',

packages/worker/src/runtime.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ export class Runtime {
101101
protected readonly logPollPromise: Promise<void>;
102102
public readonly logger: Logger;
103103
protected readonly shutdownSignalCallbacks = new Set<() => void>();
104+
protected state: 'RUNNING' | 'SHUTTING_DOWN' = 'RUNNING';
104105

105106
static _instance?: Runtime;
106107
static instantiator?: 'install' | 'instance';
@@ -381,7 +382,11 @@ export class Runtime {
381382
* @hidden
382383
*/
383384
public registerShutdownSignalCallback(callback: () => void): void {
384-
this.shutdownSignalCallbacks.add(callback);
385+
if (this.state === 'RUNNING') {
386+
this.shutdownSignalCallbacks.add(callback);
387+
} else {
388+
queueMicrotask(callback);
389+
}
385390
}
386391

387392
/**
@@ -416,6 +421,7 @@ export class Runtime {
416421
* Bound to `this` for use with `process.on` and `process.off`
417422
*/
418423
protected startShutdownSequence = (): void => {
424+
this.state = 'SHUTTING_DOWN';
419425
this.teardownShutdownHook();
420426
for (const callback of this.shutdownSignalCallbacks) {
421427
queueMicrotask(callback); // Run later

0 commit comments

Comments
 (0)