Skip to content

Commit 12c6f5a

Browse files
authored
fix(worker): Gracefully shut down Worker on unexpected errors (#1539)
1 parent c8e7277 commit 12c6f5a

File tree

13 files changed

+471
-122
lines changed

13 files changed

+471
-122
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"scripts": {
2424
"rebuild": "npm run clean && npm run build",
2525
"build": "lerna run --stream build",
26-
"build.watch": "npm run build:protos && tsc --build --watch packages/*",
26+
"build.watch": "npm run build:protos && tsc --build --watch packages/*/tsconfig.json",
2727
"build:protos": "node ./packages/proto/scripts/compile-proto.js",
2828
"test": "lerna run --stream test",
2929
"test.watch": "lerna run --stream test.watch",

packages/client/src/workflow-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ function ensureArgs<W extends Workflow, T extends WorkflowStartOptions<W>>(
308308
opts: T
309309
): Omit<T, 'args'> & { args: unknown[] } {
310310
const { args, ...rest } = opts;
311-
return { args: args ?? [], ...rest };
311+
return { args: (args as unknown[]) ?? [], ...rest };
312312
}
313313

314314
/**

packages/core-bridge/ts/errors.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@ export class TransportError extends Error {}
1818
* Something unexpected happened, considered fatal
1919
*/
2020
@SymbolBasedInstanceOfError('UnexpectedError')
21-
export class UnexpectedError extends Error {}
21+
export class UnexpectedError extends Error {
22+
constructor(
23+
message: string,
24+
public cause?: unknown
25+
) {
26+
super(message);
27+
}
28+
}
2229

2330
export { IllegalStateError };
2431

@@ -47,7 +54,7 @@ export function convertFromNamedError(e: unknown, keepStackTrace: boolean): unkn
4754
return newerr;
4855

4956
case 'UnexpectedError':
50-
newerr = new UnexpectedError(e.message);
57+
newerr = new UnexpectedError(e.message, e);
5158
newerr.stack = keepStackTrace ? e.stack : undefined;
5259
return newerr;
5360
}

packages/test/src/test-integration-workflows-with-recorded-logs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class UnfinishedHandlersWorkflowTerminationTypeTest {
380380
switch (this.handlerType) {
381381
case 'update':
382382
executeUpdate = w.executeUpdate(unfinishedHandlersWorkflowTerminationTypeUpdate, { updateId });
383-
await waitUntil(() => workflowUpdateExists(w, updateId), 2000);
383+
await waitUntil(() => workflowUpdateExists(w, updateId), 5000);
384384
break;
385385
case 'signal':
386386
await w.signal(unfinishedHandlersWorkflowTerminationTypeSignal);

packages/test/src/test-worker-lifecycle.ts

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
*
55
* @module
66
*/
7+
import { setTimeout } from 'timers/promises';
8+
import { randomUUID } from 'crypto';
79
import test from 'ava';
8-
import { Runtime } from '@temporalio/worker';
9-
import { TransportError } from '@temporalio/core-bridge';
10+
import { Runtime, PromiseCompletionTimeoutError } from '@temporalio/worker';
11+
import { TransportError, UnexpectedError } from '@temporalio/core-bridge';
12+
import { Client } from '@temporalio/client';
1013
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
1114
import { defaultOptions, isolateFreeWorker } from './mock-native-worker';
15+
import { fillMemory } from './workflows';
1216

1317
if (RUN_INTEGRATION_TESTS) {
1418
test.serial('Worker shuts down gracefully', async (t) => {
@@ -28,6 +32,23 @@ if (RUN_INTEGRATION_TESTS) {
2832
await t.throwsAsync(worker.run(), { message: 'Poller was already started' });
2933
});
3034

35+
test.serial("Worker.runUntil doesn't hang if provided promise survives to Worker's shutdown", async (t) => {
36+
const worker = await Worker.create({
37+
...defaultOptions,
38+
taskQueue: t.title.replace(/ /g, '_'),
39+
});
40+
const p = worker.runUntil(
41+
new Promise(() => {
42+
/* a promise that will never unblock */
43+
})
44+
);
45+
t.is(worker.getState(), 'RUNNING');
46+
worker.shutdown();
47+
t.is(worker.getState(), 'STOPPING');
48+
await t.throwsAsync(p, { instanceOf: PromiseCompletionTimeoutError });
49+
t.is(worker.getState(), 'STOPPED');
50+
});
51+
3152
test.serial('Worker shuts down gracefully if interrupted before running', async (t) => {
3253
const worker = await Worker.create({
3354
...defaultOptions,
@@ -54,6 +75,55 @@ if (RUN_INTEGRATION_TESTS) {
5475
}
5576
);
5677
});
78+
79+
test.serial('Threaded VM gracely stops and fails on ERR_WORKER_OUT_OF_MEMORY', async (t) => {
80+
// We internally use a timeout of 10s to catch a possible case where test would
81+
// be non-conclusive. We need the test timeout to be longer than that.
82+
t.timeout(30_000);
83+
84+
const taskQueue = t.title.replace(/ /g, '_');
85+
const client = new Client();
86+
const worker = await Worker.create({
87+
...defaultOptions,
88+
taskQueue,
89+
});
90+
91+
// This workflow will allocate large block of memory, hopefully causing a ERR_WORKER_OUT_OF_MEMORY.
92+
// Note that due to the way Node/V8 optimize byte code, its possible that this may trigger
93+
// other type of errors, including some that can't be intercepted cleanly.
94+
client.workflow
95+
.start(fillMemory, {
96+
taskQueue,
97+
workflowId: randomUUID(),
98+
// Don't linger
99+
workflowExecutionTimeout: '30s',
100+
})
101+
.catch(() => void 0);
102+
103+
const workerRunPromise = worker.run();
104+
try {
105+
// Due to various environment factors, it is possible that the worker may sometime not fail.
106+
// That's obviously not what we want to assert, but that's still ok. We therefore set a
107+
// timeout of 10s and simply pass if the Worker hasn't failed by then.
108+
await Promise.race([setTimeout(10_000), workerRunPromise]);
109+
if (worker.getState() === 'RUNNING') {
110+
worker.shutdown();
111+
await workerRunPromise;
112+
}
113+
t.log('Non-conclusive result: Worker did not fail as expected');
114+
t.pass();
115+
} catch (err) {
116+
t.is((err as Error).name, UnexpectedError.name);
117+
t.is(
118+
(err as Error).message,
119+
'Workflow Worker Thread exited prematurely: Error [ERR_WORKER_OUT_OF_MEMORY]: ' +
120+
'Worker terminated due to reaching memory limit: JS heap out of memory'
121+
);
122+
t.is(worker.getState(), 'FAILED');
123+
} finally {
124+
if (Runtime._instance) await Runtime._instance.shutdown();
125+
}
126+
});
57127
}
58128

59129
test.serial('Mocked run shuts down gracefully', async (t) => {

packages/test/src/test-workflow-unhandled-rejection-crash.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@ if (RUN_INTEGRATION_TESTS) {
1717
taskQueue,
1818
args: [{ crashWorker: true }],
1919
});
20-
await t.throwsAsync(worker.run(), {
21-
instanceOf: UnexpectedError,
22-
message:
23-
'Worker thread shut down prematurely, this could be caused by an' +
24-
' unhandled rejection in workflow code that could not be' +
25-
' associated with a workflow run',
26-
});
27-
await handle.terminate();
20+
try {
21+
await t.throwsAsync(worker.run(), {
22+
instanceOf: UnexpectedError,
23+
message:
24+
'Workflow Worker Thread exited prematurely: UnhandledRejectionError: ' +
25+
"Unhandled Promise rejection for unknown Workflow Run id='undefined': " +
26+
'Error: error to crash the worker',
27+
});
28+
t.is(worker.getState(), 'FAILED');
29+
} finally {
30+
await handle.terminate();
31+
}
2832
});
2933
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
export async function fillMemory(): Promise<number> {
2+
// There are different ways that Node may report "out of memory" conditions, and the criterias
3+
// for each of them are essentially undocumented. The following code has been empirically
4+
// designed to trigger a very specific type of error, i.e. a Worker Thread getting killed with an
5+
// `ERR_WORKER_OUT_OF_MEMORY` code, with very high probability (more than 95% of the time across
6+
// all of our test setups, including all platforms, and Node.js v16, v18 and v20). The size of
7+
// RAM appears not to be important.
8+
//
9+
// Occurence of an `ERR_WORKER_OUT_OF_MEMORY` error in the following code appears to be the
10+
// result of some interractions of V8's JIT compiler and the V8's heap allocator. With appropriate
11+
// parameters, the code crashes on the first execution of the loop; yet, removing the loop or
12+
// reducing the upper bound of the loop (exact threshold appears to be between 64 and 128) prevents
13+
// the crash, presumably because the loop might no longer be deemed worth of early JIT compilation.
14+
// Allocating larger arrays is also likely to prevent the crash, presumably because the arrays will
15+
// then be allocated from the "large objects space" rather than the "yound gen" space, which, by
16+
// default, has a limit of 16MB on 64-bit systems.
17+
//
18+
// We also need to prevent the JIT compiler from optimizing this code _before_ we even start
19+
// executing it, as that then results in a low level error (native-like), which may terminate the
20+
// whole process rather than just the Worker Thread. Hence, we wrap the code in an `eval` block.
21+
22+
const accumulator = [];
23+
eval(`
24+
for (let i = 0; i < 2048; i++) {
25+
accumulator.push(new Array(1024 * 1024 * 2).fill(i));
26+
}
27+
`);
28+
29+
return accumulator.length;
30+
}

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export * from './deprecate-patch';
3838
export * from './fail-signal';
3939
export * from './fail-unless-signaled-before-start';
4040
export * from './fails-workflow-task';
41+
export * from './fill-memory';
4142
export * from './global-overrides';
4243
export * from './handle-external-workflow-cancellation-while-activity-running';
4344
export * from './http';

packages/worker/src/errors.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,63 @@ import { ShutdownError, TransportError, UnexpectedError } from '@temporalio/core
88
@SymbolBasedInstanceOfError('GracefulShutdownPeriodExpiredError')
99
export class GracefulShutdownPeriodExpiredError extends Error {}
1010

11+
/**
12+
* Thrown from the Workflow Worker when a Promise is rejected, but there is no `catch` handler
13+
* for that Promise. This error wraps the original error that was thrown from the Promise.
14+
*
15+
* Occurrence of this error generally indicate a missing `await` statement on a call that return
16+
* a Promise. To silent rejections on a specific Promise, use `promise.catch(funcThatCantThrow)`
17+
* (e.g. `promise.catch(() => void 0)` or `promise.catch((e) => logger.error(e))`).
18+
*/
19+
// FIXME: At this time, this wrapper is only used for errors that could not be associated with a
20+
// specific workflow run; it should also be used for unhandled rejections in workflow code,
21+
// but this is not possible at the moment as we intentionally "unhandle" non-TemporalFailure
22+
// errors happening in workflow code (i.e. ALL non-TemporalFailure errors thrown from
23+
// workflow code becomes Unhandled Rejection at some point in our own logic)
24+
@SymbolBasedInstanceOfError('UnhandledRejectionError')
25+
export class UnhandledRejectionError extends Error {
26+
constructor(
27+
message: string,
28+
public cause: unknown
29+
) {
30+
super(message);
31+
}
32+
}
33+
34+
/**
35+
* Combined error information for {@link Worker.runUntil}
36+
*/
37+
export interface CombinedWorkerRunErrorCause {
38+
/**
39+
* Error thrown by a Worker
40+
*/
41+
workerError: unknown;
42+
/**
43+
* Error thrown by the wrapped promise or function
44+
*/
45+
innerError: unknown;
46+
}
47+
48+
/**
49+
* Error thrown by {@link Worker.runUntil} and {@link Worker.runReplayHistories}
50+
*/
51+
@SymbolBasedInstanceOfError('CombinedWorkerRunError')
52+
export class CombinedWorkerRunError extends Error {
53+
public readonly cause: CombinedWorkerRunErrorCause;
54+
55+
constructor(message: string, { cause }: { cause: CombinedWorkerRunErrorCause }) {
56+
super(message);
57+
this.cause = cause;
58+
}
59+
}
60+
61+
/**
62+
* Error thrown by {@link Worker.runUntil} if the provided Promise does not resolve within the specified
63+
* {@link RunUntilOptions.promiseCompletionTimeout|timeout period} after the Worker has stopped.
64+
*/
65+
@SymbolBasedInstanceOfError('PromiseCompletionTimeoutError')
66+
export class PromiseCompletionTimeoutError extends Error {}
67+
1168
/**
1269
* @deprecated Import error classes directly
1370
*/

0 commit comments

Comments
 (0)