Skip to content

Commit 59d5e62

Browse files
authored
fix(worker): Wait for worker shutdown if runUntil promise throws (#943)
1 parent adf872e commit 59d5e62

File tree

5 files changed

+105
-47
lines changed

5 files changed

+105
-47
lines changed

packages/test/src/test-replay.ts

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,12 @@ async function getHistories(fname: string): Promise<History> {
2424
}
2525
}
2626

27-
function historator(histories: Array<History>, goSlow?: boolean) {
28-
return {
29-
timesCalled: 0,
30-
async *[Symbol.asyncIterator]() {
31-
for (const history of histories) {
32-
this.timesCalled++;
33-
yield { workflowId: 'fake', history };
34-
if (goSlow) {
35-
// This matters because the exception propagation from the worker takes a long time
36-
// compared to this generator. This sleep makes it more realistic for a
37-
// stream-from-net-or-disk situation
38-
await new Promise((resolve) => setTimeout(resolve, 1000));
39-
}
40-
}
41-
},
42-
};
27+
function historator(histories: Array<History>) {
28+
return (async function* () {
29+
for (const history of histories) {
30+
yield { workflowId: 'fake', history };
31+
}
32+
})();
4333
}
4434

4535
const test = anyTest as TestInterface<Context>;
@@ -125,9 +115,7 @@ test('multiple-histories-replay', async (t) => {
125115
},
126116
{ histories }
127117
);
128-
t.deepEqual(histories.timesCalled, 2);
129-
t.deepEqual(res.errors.length, 0);
130-
t.pass();
118+
t.is(res.errors.length, 0);
131119
});
132120

133121
test('multiple-histories-replay-fails-fast', async (t) => {
@@ -136,7 +124,7 @@ test('multiple-histories-replay-fails-fast', async (t) => {
136124
// change workflow type to break determinism
137125
hist1.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'http';
138126
hist2.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'http';
139-
const histories = historator([hist1, hist2], true);
127+
const histories = historator([hist1, hist2]);
140128

141129
await t.throwsAsync(
142130
Worker.runReplayHistories(
@@ -147,7 +135,6 @@ test('multiple-histories-replay-fails-fast', async (t) => {
147135
{ histories }
148136
)
149137
);
150-
t.deepEqual(histories.timesCalled, 1);
151138
});
152139

153140
test('multiple-histories-replay-fails-slow', async (t) => {
@@ -165,6 +152,5 @@ test('multiple-histories-replay-fails-slow', async (t) => {
165152
},
166153
{ histories }
167154
);
168-
t.deepEqual(histories.timesCalled, 2);
169-
t.deepEqual(res.errors.length, 1);
155+
t.is(res.errors.length, 1);
170156
});

packages/test/src/test-testenvironment.ts

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { WorkflowFailedError } from '@temporalio/client';
22
import { TestWorkflowEnvironment, workflowInterceptorModules } from '@temporalio/testing';
33
import { Connection } from '@temporalio/testing/lib/connection';
4-
import { Runtime, Worker } from '@temporalio/worker';
4+
import { Worker } from '@temporalio/worker';
55
import anyTest, { TestInterface } from 'ava';
66
import { v4 as uuid4 } from 'uuid';
77
import {
@@ -19,9 +19,6 @@ interface Context {
1919
const test = anyTest as TestInterface<Context>;
2020

2121
test.before(async (t) => {
22-
Runtime.install({
23-
telemetryOptions: { tracingFilter: 'DEBUG' },
24-
});
2522
t.context = {
2623
testEnv: await TestWorkflowEnvironment.createTimeSkipping(),
2724
};
@@ -153,17 +150,15 @@ test.serial('Workflow code can run assertions', async (t) => {
153150
},
154151
});
155152

156-
await worker.runUntil(async () => {
157-
const err: WorkflowFailedError = await t.throwsAsync(
153+
const err: WorkflowFailedError = await t.throwsAsync(
154+
worker.runUntil(
158155
client.workflow.execute(assertFromWorkflow, {
159156
workflowId: uuid4(),
160157
taskQueue: 'test',
161158
args: [6],
162-
}),
163-
{
164-
instanceOf: WorkflowFailedError,
165-
}
166-
);
167-
t.is(err.cause?.message, 'Expected values to be strictly equal:\n\n6 !== 7\n');
168-
});
159+
})
160+
),
161+
{ instanceOf: WorkflowFailedError }
162+
);
163+
t.is(err.cause?.message, 'Expected values to be strictly equal:\n\n6 !== 7\n');
169164
});

packages/test/src/test-worker-lifecycle.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,21 @@ test.serial('Mocked run throws if not shut down gracefully', async (t) => {
9999
t.is(worker.getState(), 'FAILED');
100100
await t.throwsAsync(worker.run(), { message: 'Poller was already started' });
101101
});
102+
103+
test.serial('Mocked throws combined error in runUntil', async (t) => {
104+
const worker = isolateFreeWorker({
105+
shutdownGraceTime: '5ms',
106+
taskQueue: 'shutdown-test',
107+
});
108+
worker.native.initiateShutdown = () => new Promise(() => undefined);
109+
const err = await t.throwsAsync(
110+
worker.runUntil(async () => {
111+
throw new Error('inner');
112+
})
113+
);
114+
t.is(worker.getState(), 'FAILED');
115+
t.is(err.message, 'Worker terminated with fatal error in `runUntil`');
116+
const { workerError, innerError } = (err as any).cause;
117+
t.is(workerError.message, 'Timed out while waiting for worker to shutdown gracefully');
118+
t.is(innerError.message, 'inner');
119+
});

packages/worker/src/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ export * from './interceptors';
2828
export * from './logger';
2929
export { History, Runtime, RuntimeOptions } from './runtime';
3030
export * from './sinks';
31-
export { DataConverter, defaultPayloadConverter, errors, State, Worker } from './worker';
31+
export {
32+
CombinedWorkerRunError,
33+
CombinedWorkerRunErrorCause,
34+
DataConverter,
35+
defaultPayloadConverter,
36+
errors,
37+
State,
38+
Worker,
39+
} from './worker';
3240
export {
3341
appendDefaultInterceptors,
3442
CompiledWorkerOptions,

packages/worker/src/worker.ts

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,33 @@ export type ActivityTaskWithContext = ContextAware<{
140140

141141
type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & { buildId: string };
142142

143+
/**
144+
* Combined error information for {@link Worker.runUntil}
145+
*/
146+
export interface CombinedWorkerRunErrorCause {
147+
/**
148+
* Error thrown by a Worker
149+
*/
150+
workerError: unknown;
151+
/**
152+
* Error thrown by the wrapped promise or function
153+
*/
154+
innerError: unknown;
155+
}
156+
157+
/**
158+
* Error thrown by {@link Worker.runUntil}
159+
*/
160+
export class CombinedWorkerRunError extends Error {
161+
public readonly name = 'CombinedWorkerRunError';
162+
public readonly cause: CombinedWorkerRunErrorCause;
163+
164+
constructor(message: string, { cause }: { cause: CombinedWorkerRunErrorCause }) {
165+
super(message);
166+
this.cause = cause;
167+
}
168+
}
169+
143170
export interface NativeWorkerLike {
144171
type: 'Worker';
145172
initiateShutdown: Promisify<OmitFirstParam<typeof native.workerInitiateShutdown>>;
@@ -1644,21 +1671,45 @@ export class Worker {
16441671
* @returns the result of `fnOrPromise`
16451672
*
16461673
* Throws on fatal Worker errors.
1674+
*
1675+
* **SDK versions `< 1.5.0`**:
1676+
* This method would not wait for worker to complete shutdown if the inner `fnOrPromise` threw an error.
1677+
*
1678+
* **SDK versions `>=1.5.0`**:
1679+
* This method always waits for both worker shutdown and inner `fnOrPromise` to resolve.
1680+
* If one of worker run -or- the inner promise throw an error, that error is rethrown.
1681+
* If both throw an error, a {@link CombinedWorkerRunError} with a `cause` attribute containing both errors.
16471682
*/
16481683
async runUntil<R>(fnOrPromise: Promise<R> | (() => Promise<R>)): Promise<R> {
1649-
const runAndShutdown = async () => {
1684+
const workerRunPromise = this.run();
1685+
const innerPromise = (async () => {
16501686
try {
1651-
if (typeof fnOrPromise === 'function') {
1652-
return await fnOrPromise();
1653-
} else {
1654-
return await fnOrPromise;
1655-
}
1687+
const p = typeof fnOrPromise === 'function' ? fnOrPromise() : fnOrPromise;
1688+
return await p;
16561689
} finally {
1657-
this.shutdown();
1690+
if (this.state === 'RUNNING') {
1691+
this.shutdown();
1692+
}
16581693
}
1659-
};
1660-
const [_, ret] = await Promise.all([this.run(), runAndShutdown()]);
1661-
return ret;
1694+
})();
1695+
const [innerResult, workerRunResult] = await Promise.allSettled([innerPromise, workerRunPromise]);
1696+
1697+
if (workerRunResult.status === 'rejected') {
1698+
if (innerResult.status === 'rejected') {
1699+
throw new CombinedWorkerRunError('Worker terminated with fatal error in `runUntil`', {
1700+
cause: {
1701+
workerError: workerRunResult.reason,
1702+
innerError: innerResult.reason,
1703+
},
1704+
});
1705+
} else {
1706+
throw workerRunResult.reason;
1707+
}
1708+
} else if (innerResult.status === 'rejected') {
1709+
throw innerResult.reason;
1710+
} else {
1711+
return innerResult.value;
1712+
}
16621713
}
16631714

16641715
/**

0 commit comments

Comments
 (0)