Skip to content

Commit 2fba917

Browse files
authored
Test that activity is cancelled on worker shutdown (#439)
1 parent 283ef61 commit 2fba917

File tree

6 files changed

+60
-10
lines changed

6 files changed

+60
-10
lines changed

packages/test/src/activities/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable @typescript-eslint/no-empty-function */
22
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
33
import { Context } from '@temporalio/activity';
4-
import { WorkflowClient, WorkflowHandle } from '@temporalio/client';
4+
import { Client, WorkflowHandle } from '@temporalio/client';
55
import { ApplicationFailure, QueryDefinition } from '@temporalio/common';
66
import { ProtoActivityInput, ProtoActivityResult } from '../../protos/root';
77
import { cancellableFetch as cancellableFetchInner } from './cancellable-fetch';
@@ -66,11 +66,11 @@ export async function waitForCancellation(): Promise<void> {
6666
function getSchedulingWorkflowHandle(): WorkflowHandle {
6767
const { info, connection, dataConverter } = getContext();
6868
const { workflowExecution } = info;
69-
const client = new WorkflowClient({ connection, namespace: info.workflowNamespace, dataConverter });
70-
return client.getHandle(workflowExecution.workflowId, workflowExecution.runId);
69+
const client = new Client({ connection, namespace: info.workflowNamespace, dataConverter });
70+
return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId);
7171
}
7272

73-
async function signalSchedulingWorkflow(signalName: string) {
73+
export async function signalSchedulingWorkflow(signalName: string) {
7474
const handle = getSchedulingWorkflowHandle();
7575
await handle.signal(signalName);
7676
}

packages/test/src/activities/interceptors.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import * as activity from '@temporalio/activity';
2-
import { Connection } from '@temporalio/client';
2+
import { ConnectionLike } from '@temporalio/client';
33
import { defaultDataConverter, LoadedDataConverter } from '@temporalio/common';
44
import { ActivityExecuteInput, ActivityInboundCallsInterceptor, Next } from '@temporalio/worker';
55

66
export class ConnectionInjectorInterceptor implements ActivityInboundCallsInterceptor {
7-
constructor(public readonly connection: Connection, public readonly dataConverter = defaultDataConverter) {}
7+
constructor(public readonly connection: ConnectionLike, public readonly dataConverter = defaultDataConverter) {}
88
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
99
Object.assign(activity.Context.current(), {
1010
connection: this.connection,
@@ -18,7 +18,7 @@ export class ConnectionInjectorInterceptor implements ActivityInboundCallsInterc
1818
* Extend the basic activity Context
1919
*/
2020
export interface Context extends activity.Context {
21-
connection: Connection;
21+
connection: ConnectionLike;
2222
dataConverter: LoadedDataConverter;
2323
}
2424

packages/test/src/helpers.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export const bundlerOptions = {
7373
// workflow module, add it to this list:
7474
ignoreModules: [
7575
'@temporalio/common/lib/internal-non-workflow',
76+
'@temporalio/activity',
7677
'@temporalio/client',
7778
'@temporalio/testing',
7879
'@temporalio/worker',
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import crypto from 'crypto';
2+
import * as activity from '@temporalio/activity';
3+
import * as testing from '@temporalio/testing';
4+
import * as workflow from '@temporalio/workflow';
5+
import { bundlerOptions, test, Worker } from './helpers';
6+
7+
export async function testWorkflow(): Promise<void> {
8+
await workflow.proxyActivities({ startToCloseTimeout: '1m' }).testActivity();
9+
}
10+
11+
test('Worker cancels heartbeating activities after shutdown has been requested', async (t) => {
12+
const env = await testing.TestWorkflowEnvironment.createLocal();
13+
try {
14+
let properlyCancelled = false;
15+
const taskQueue = 'activity-heartbeat-after-shutdown';
16+
const worker = await Worker.create({
17+
connection: env.nativeConnection,
18+
taskQueue,
19+
workflowsPath: __filename,
20+
bundlerOptions,
21+
activities: {
22+
async testActivity() {
23+
const ctx = activity.Context.current();
24+
worker.shutdown();
25+
ctx.heartbeat();
26+
try {
27+
await ctx.cancelled;
28+
} catch (err) {
29+
if (err instanceof activity.CancelledFailure && err.message === 'WORKER_SHUTDOWN') {
30+
properlyCancelled = true;
31+
}
32+
throw err;
33+
}
34+
},
35+
},
36+
});
37+
const handle = await env.client.workflow.start(testWorkflow, {
38+
workflowId: crypto.randomUUID(),
39+
taskQueue,
40+
});
41+
try {
42+
// If worker completes within graceful shutdown period, the activity has successfully been cancelled
43+
await worker.run();
44+
} finally {
45+
await handle.terminate();
46+
}
47+
t.true(properlyCancelled);
48+
} finally {
49+
await env.teardown();
50+
}
51+
});

packages/testing/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
} from '@temporalio/client';
2525
import { ActivityFunction, CancelledFailure } from '@temporalio/common';
2626
import { msToTs, tsToMs } from '@temporalio/common/lib/time';
27-
import { NativeConnection, Runtime, appendDefaultInterceptors } from '@temporalio/worker';
27+
import { NativeConnection, Runtime } from '@temporalio/worker';
2828
import {
2929
EphemeralServer,
3030
EphemeralServerConfig,

packages/worker/src/workflow/vm.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ import assert from 'assert';
22
import { AsyncLocalStorage } from 'async_hooks';
33
import vm from 'vm';
44
import { gte } from 'semver';
5-
import { WorkflowInfo } from '@temporalio/workflow';
65
import { IllegalStateError } from '@temporalio/common';
76
import { Workflow, WorkflowCreateOptions, WorkflowCreator } from './interface';
87
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
98
import {
10-
ActivationContext,
119
BaseVMWorkflow,
1210
globalHandlers,
1311
injectConsole,

0 commit comments

Comments
 (0)