Skip to content

Commit dbbd597

Browse files
authored
Warn on unfinished handlers and provide workflow.allHandlersFinished() API (#1459)
1 parent 1aabe19 commit dbbd597

File tree

8 files changed

+700
-31
lines changed

8 files changed

+700
-31
lines changed

packages/common/src/interfaces.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ export type WorkflowUpdateType = (...args: any[]) => Promise<any> | any;
88
export type WorkflowUpdateValidatorType = (...args: any[]) => void;
99
export type WorkflowUpdateAnnotatedType = {
1010
handler: WorkflowUpdateType;
11+
unfinishedPolicy: HandlerUnfinishedPolicy;
1112
validator?: WorkflowUpdateValidatorType;
1213
description?: string;
1314
};
1415
export type WorkflowSignalType = (...args: any[]) => Promise<void> | void;
15-
export type WorkflowSignalAnnotatedType = { handler: WorkflowSignalType; description?: string };
16+
export type WorkflowSignalAnnotatedType = {
17+
handler: WorkflowSignalType;
18+
unfinishedPolicy: HandlerUnfinishedPolicy;
19+
description?: string;
20+
};
1621
export type WorkflowQueryType = (...args: any[]) => any;
1722
export type WorkflowQueryAnnotatedType = { handler: WorkflowQueryType; description?: string };
1823

@@ -118,3 +123,22 @@ export interface HistoryAndWorkflowId {
118123
workflowId: string;
119124
history: temporal.api.history.v1.History | unknown | undefined;
120125
}
126+
127+
/**
128+
* Policy defining actions taken when a workflow exits while update or signal handlers are running.
129+
* The workflow exit may be due to successful return, failure, cancellation, or continue-as-new.
130+
*/
131+
export enum HandlerUnfinishedPolicy {
132+
/**
133+
* Issue a warning in addition to abandoning the handler execution. The warning will not be issued if the workflow fails.
134+
*/
135+
WARN_AND_ABANDON = 1,
136+
137+
/**
138+
* Abandon the handler execution.
139+
*
140+
* In the case of an update handler this means that the client will receive an error rather than
141+
* the update result.
142+
*/
143+
ABANDON = 2,
144+
}

packages/test/src/helpers-integration.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { randomUUID } from 'crypto';
2+
import { status as grpcStatus } from '@grpc/grpc-js';
23
import { ErrorConstructor, ExecutionContext, TestFn } from 'ava';
34
import {
5+
isGrpcServiceError,
46
WorkflowFailedError,
57
WorkflowHandle,
68
WorkflowStartOptions,
@@ -12,6 +14,7 @@ import {
1214
} from '@temporalio/testing';
1315
import {
1416
DefaultLogger,
17+
LogEntry,
1518
LogLevel,
1619
Runtime,
1720
WorkerOptions,
@@ -50,6 +53,7 @@ export function makeTestFunction(opts: {
5053
workflowsPath: string;
5154
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
5255
workflowInterceptorModules?: string[];
56+
recordedLogs?: { [workflowId: string]: LogEntry[] };
5357
}): TestFn<Context> {
5458
const test = anyTest as TestFn<Context>;
5559
test.before(async (t) => {
@@ -59,9 +63,15 @@ export function makeTestFunction(opts: {
5963
workflowsPath: opts.workflowsPath,
6064
logger: new DefaultLogger('WARN'),
6165
});
62-
// Ignore invalid log levels
66+
const logger = opts.recordedLogs
67+
? new DefaultLogger('DEBUG', (entry) => {
68+
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
69+
opts.recordedLogs![workflowId] ??= [];
70+
opts.recordedLogs![workflowId].push(entry);
71+
})
72+
: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel);
6373
Runtime.install({
64-
logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel),
74+
logger,
6575
telemetryOptions: {
6676
logging: {
6777
filter: makeTelemetryFilterString({
@@ -107,6 +117,7 @@ export interface Helpers {
107117
): Promise<WorkflowHandle<T>>;
108118
assertWorkflowUpdateFailed(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
109119
assertWorkflowFailedError(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
120+
updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean>;
110121
}
111122

112123
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
@@ -172,5 +183,22 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
172183
t.is(err.cause?.message, message);
173184
}
174185
},
186+
async updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean> {
187+
try {
188+
await testEnv.client.workflowService.pollWorkflowExecutionUpdate({
189+
namespace: testEnv.client.options.namespace,
190+
updateRef: {
191+
workflowExecution: { workflowId: handle.workflowId },
192+
updateId,
193+
},
194+
});
195+
return true;
196+
} catch (err) {
197+
if (isGrpcServiceError(err) && err.code === grpcStatus.NOT_FOUND) {
198+
return false;
199+
}
200+
throw err;
201+
}
202+
},
175203
};
176204
}

packages/test/src/helpers.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,23 @@ export async function sleep(ms: number): Promise<void> {
4545
return new Promise((resolve) => setTimeout(resolve, ms));
4646
}
4747

48+
export async function waitUntil(
49+
condition: () => Promise<boolean>,
50+
timeoutMs: number,
51+
intervalMs: number = 100
52+
): Promise<void> {
53+
const endTime = Date.now() + timeoutMs;
54+
for (;;) {
55+
if (await condition()) {
56+
return;
57+
} else if (Date.now() >= endTime) {
58+
throw new Error('timed out waiting for condition');
59+
} else {
60+
await sleep(intervalMs);
61+
}
62+
}
63+
}
64+
4865
export function cleanOptionalStackTrace(stackTrace: string | undefined | null): string | undefined {
4966
return stackTrace ? cleanStackTrace(stackTrace) : undefined;
5067
}

0 commit comments

Comments
 (0)