Skip to content

Commit f638039

Browse files
authored
feat(workflow): Add support for default workflow handler and default signal handler (#1038)
1 parent 2fba917 commit f638039

File tree

10 files changed

+335
-43
lines changed

10 files changed

+335
-43
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Test usage of a default workflow handler
3+
*/
4+
import test from 'ava';
5+
import { v4 as uuid4 } from 'uuid';
6+
import { TestWorkflowEnvironment } from '@temporalio/testing';
7+
import { Worker } from './helpers';
8+
import { existing } from './workflows/default-workflow-function';
9+
10+
test('Default workflow handler is used if requested workflow does not exist', async (t) => {
11+
const env = await TestWorkflowEnvironment.createLocal();
12+
try {
13+
const taskQueue = `${t.title}-${uuid4()}`;
14+
const worker = await Worker.create({
15+
connection: env.nativeConnection,
16+
taskQueue,
17+
workflowsPath: require.resolve('./workflows/default-workflow-function'),
18+
});
19+
await worker.runUntil(async () => {
20+
const result = env.client.workflow.execute('non-existing', {
21+
taskQueue,
22+
workflowId: uuid4(),
23+
args: ['test', 'foo', 'bar'],
24+
});
25+
t.is((await result).handler, 'default');
26+
t.is((await result).workflowType, 'non-existing');
27+
t.deepEqual((await result).args, ['test', 'foo', 'bar']);
28+
});
29+
} finally {
30+
await env.teardown();
31+
}
32+
});
33+
34+
test('Default workflow handler is not used if requested workflow exists', async (t) => {
35+
const env = await TestWorkflowEnvironment.createLocal();
36+
try {
37+
const taskQueue = `${t.title}-${uuid4()}`;
38+
const worker = await Worker.create({
39+
connection: env.nativeConnection,
40+
taskQueue,
41+
workflowsPath: require.resolve('./workflows/default-workflow-function'),
42+
});
43+
await worker.runUntil(async () => {
44+
const result = env.client.workflow.execute(existing, {
45+
taskQueue,
46+
workflowId: uuid4(),
47+
args: ['test', 'foo', 'bar'],
48+
});
49+
t.is((await result).handler, 'existing');
50+
t.deepEqual((await result).args, ['test', 'foo', 'bar']);
51+
});
52+
} finally {
53+
await env.teardown();
54+
}
55+
});

packages/test/src/test-workflows.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worke
2020
import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
2121
import * as activityFunctions from './activities';
2222
import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers';
23+
import { ProcessedSignal } from './workflows';
2324

2425
export interface Context {
2526
workflow: VMWorkflow | ReusableVMWorkflow;
@@ -1967,3 +1968,86 @@ test('condition with timeout 0 in >1.5.0 - conditionTimeout0', async (t) => {
19671968
);
19681969
}
19691970
});
1971+
1972+
test('Buffered signals are dispatched to correct handler and in correct order - signalsOrdering', async (t) => {
1973+
const { workflowType } = t.context;
1974+
{
1975+
const completion = await activate(
1976+
t,
1977+
makeActivation(
1978+
undefined,
1979+
makeStartWorkflowJob(workflowType),
1980+
{ signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 1) } },
1981+
{ signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 2) } },
1982+
{ signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 3) } },
1983+
{ signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 4) } },
1984+
{ signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 5) } },
1985+
{ signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 6) } },
1986+
{ signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 7) } }
1987+
)
1988+
);
1989+
1990+
// Signal handlers will be registered in the following order:
1991+
//
1992+
// Registration of handler A => Processing of signalA#2
1993+
// Deregistration of handler A => No more processing of signalA
1994+
// Registration of handler B => Processing of signalB#5, signalB#7
1995+
// Registration of default handler => Processing of the rest of signals, in numeric order
1996+
// Registration of handler C => No signal pending for handler C
1997+
1998+
compareCompletion(
1999+
t,
2000+
completion,
2001+
makeSuccess([
2002+
makeCompleteWorkflowExecution(
2003+
defaultPayloadConverter.toPayload([
2004+
{ handler: 'signalA', args: [2] },
2005+
{ handler: 'signalB', args: [5] },
2006+
{ handler: 'signalB', args: [7] },
2007+
{ handler: 'default', signalName: 'non-existant', args: [1] },
2008+
{ handler: 'default', signalName: 'signalA', args: [3] },
2009+
{ handler: 'default', signalName: 'signalC', args: [4] },
2010+
{ handler: 'default', signalName: 'non-existant', args: [6] },
2011+
] as ProcessedSignal[])
2012+
),
2013+
])
2014+
);
2015+
}
2016+
});
2017+
2018+
test('Buffered signals dispatch is reentrant - signalsOrdering2', async (t) => {
2019+
const { workflowType } = t.context;
2020+
{
2021+
const completion = await activate(
2022+
t,
2023+
makeActivation(
2024+
undefined,
2025+
makeStartWorkflowJob(workflowType),
2026+
{ signalWorkflow: { signalName: 'non-existant', input: toPayloads(defaultPayloadConverter, 1) } },
2027+
{ signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 2) } },
2028+
{ signalWorkflow: { signalName: 'signalA', input: toPayloads(defaultPayloadConverter, 3) } },
2029+
{ signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 4) } },
2030+
{ signalWorkflow: { signalName: 'signalB', input: toPayloads(defaultPayloadConverter, 5) } },
2031+
{ signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 6) } },
2032+
{ signalWorkflow: { signalName: 'signalC', input: toPayloads(defaultPayloadConverter, 7) } }
2033+
)
2034+
);
2035+
compareCompletion(
2036+
t,
2037+
completion,
2038+
makeSuccess([
2039+
makeCompleteWorkflowExecution(
2040+
defaultPayloadConverter.toPayload([
2041+
{ handler: 'signalA', args: [2] },
2042+
{ handler: 'signalB', args: [4] },
2043+
{ handler: 'signalC', args: [6] },
2044+
{ handler: 'default', signalName: 'non-existant', args: [1] },
2045+
{ handler: 'signalA', args: [3] },
2046+
{ handler: 'signalB', args: [5] },
2047+
{ handler: 'signalC', args: [7] },
2048+
] as ProcessedSignal[])
2049+
),
2050+
])
2051+
);
2052+
}
2053+
});
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { workflowInfo } from '@temporalio/workflow';
2+
3+
export interface WorkflowTypeAndArgs {
4+
handler: string;
5+
workflowType?: string;
6+
args: unknown[];
7+
}
8+
9+
export async function existing(...args: unknown[]): Promise<WorkflowTypeAndArgs> {
10+
return {
11+
handler: 'existing',
12+
args,
13+
};
14+
}
15+
16+
export default async function (...args: unknown[]): Promise<WorkflowTypeAndArgs> {
17+
return {
18+
handler: 'default',
19+
workflowType: workflowInfo().workflowType,
20+
args,
21+
};
22+
}

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export * from './shield-in-shield';
7272
export * from './signal-handlers-clear';
7373
export * from './signal-target';
7474
export * from './signals-are-always-processed';
75+
export * from './signals-ordering';
7576
export * from './sinks';
7677
export * from './sleep';
7778
export * from './sleep-invalid-duration';
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Workflow used for testing default signal handler and signal ordering
3+
*
4+
* @module
5+
*/
6+
7+
import { setHandler, setDefaultSignalHandler, defineSignal } from '@temporalio/workflow';
8+
9+
const signalA = defineSignal<[number]>('signalA');
10+
const signalB = defineSignal<[number]>('signalB');
11+
const signalC = defineSignal<[number]>('signalC');
12+
13+
export interface ProcessedSignal {
14+
handler: string;
15+
signalName?: string;
16+
args: unknown[];
17+
}
18+
19+
export async function signalsOrdering(): Promise<ProcessedSignal[]> {
20+
const processedSignals: ProcessedSignal[] = [];
21+
22+
setHandler(signalA, (...args: unknown[]) => {
23+
processedSignals.push({ handler: 'signalA', args });
24+
setHandler(signalA, undefined);
25+
});
26+
setHandler(signalB, (...args: unknown[]) => {
27+
processedSignals.push({ handler: 'signalB', args });
28+
});
29+
setDefaultSignalHandler((signalName: string, ...args: unknown[]) => {
30+
processedSignals.push({ handler: 'default', signalName, args });
31+
});
32+
setHandler(signalC, (...args: unknown[]) => {
33+
processedSignals.push({ handler: 'signalC', args });
34+
});
35+
36+
return processedSignals;
37+
}
38+
39+
export async function signalsOrdering2(): Promise<ProcessedSignal[]> {
40+
const processedSignals: ProcessedSignal[] = [];
41+
42+
function handlerA(...args: unknown[]) {
43+
processedSignals.push({ handler: 'signalA', args });
44+
setHandler(signalA, undefined);
45+
setHandler(signalB, handlerB);
46+
}
47+
48+
function handlerB(...args: unknown[]) {
49+
processedSignals.push({ handler: 'signalB', args });
50+
setHandler(signalB, undefined);
51+
setHandler(signalC, handlerC);
52+
}
53+
54+
function handlerC(...args: unknown[]) {
55+
processedSignals.push({ handler: 'signalC', args });
56+
setHandler(signalC, undefined);
57+
setDefaultSignalHandler(handlerDefault);
58+
}
59+
60+
function handlerDefault(signalName: string, ...args: unknown[]) {
61+
processedSignals.push({ handler: 'default', signalName, args });
62+
setDefaultSignalHandler(undefined);
63+
setHandler(signalA, handlerA);
64+
}
65+
66+
setHandler(signalA, handlerA);
67+
68+
return processedSignals;
69+
}

packages/worker/src/workflow/vm-shared.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@ import { partition } from '../utils';
1111
import { Workflow } from './interface';
1212
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
1313

14-
export interface ActivationContext {
15-
isReplaying: boolean;
16-
}
17-
1814
// Not present in @types/node for some reason
1915
const { promiseHooks } = v8 as any;
2016

@@ -338,7 +334,7 @@ export abstract class BaseVMWorkflow implements Workflow {
338334
await new Promise(setImmediate);
339335
if (this.unhandledRejection) {
340336
return {
341-
runId: activation.runId,
337+
runId: this.activator.info.runId,
342338
failed: { failure: this.activator.errorToFailure(this.unhandledRejection) },
343339
};
344340
}

packages/workflow/src/interfaces.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import type { RawSourceMap } from 'source-map';
2-
import { RetryPolicy, TemporalFailure, CommonWorkflowOptions, SearchAttributes } from '@temporalio/common';
2+
import {
3+
RetryPolicy,
4+
TemporalFailure,
5+
CommonWorkflowOptions,
6+
SearchAttributes,
7+
SignalDefinition,
8+
QueryDefinition,
9+
} from '@temporalio/common';
310
import { checkExtends } from '@temporalio/common/lib/type-helpers';
411
import type { coresdk } from '@temporalio/proto';
512

@@ -380,3 +387,21 @@ export interface WorkflowCreateOptions {
380387
export interface WorkflowCreateOptionsWithSourceMap extends WorkflowCreateOptions {
381388
sourceMap: RawSourceMap;
382389
}
390+
391+
/**
392+
* A handler function capable of accepting the arguments for a given SignalDefinition or QueryDefinition.
393+
*/
394+
export type Handler<
395+
Ret,
396+
Args extends any[],
397+
T extends SignalDefinition<Args> | QueryDefinition<Ret, Args>
398+
> = T extends SignalDefinition<infer A>
399+
? (...args: A) => void | Promise<void>
400+
: T extends QueryDefinition<infer R, infer A>
401+
? (...args: A) => R
402+
: never;
403+
404+
/**
405+
* A handler function accepting signals calls for non-registered signal names.
406+
*/
407+
export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise<void>;

packages/workflow/src/internals.ts

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { DeterminismViolationError, isCancellation } from './errors';
2323
import { QueryInput, SignalInput, WorkflowExecuteInput, WorkflowInterceptors } from './interceptors';
2424
import {
2525
ContinueAsNew,
26+
DefaultSignalHandler,
2627
SDKInfo,
2728
FileSlice,
2829
EnhancedStackTrace,
@@ -120,7 +121,7 @@ export class Activator implements ActivationHandler {
120121
/**
121122
* Holds buffered signal calls until a handler is registered
122123
*/
123-
readonly bufferedSignals = new Map<string, coresdk.workflow_activation.ISignalWorkflow[]>();
124+
readonly bufferedSignals = Array<coresdk.workflow_activation.ISignalWorkflow>();
124125

125126
/**
126127
* Holds buffered query calls until a handler is registered.
@@ -136,6 +137,11 @@ export class Activator implements ActivationHandler {
136137
*/
137138
readonly signalHandlers = new Map<string, WorkflowSignalType>();
138139

140+
/**
141+
* A signal handler that catches calls for non-registered signal names.
142+
*/
143+
defaultSignalHandler?: DefaultSignalHandler;
144+
139145
/**
140146
* Source map file for looking up the source files in response to __enhanced_stack_trace
141147
*/
@@ -526,10 +532,13 @@ export class Activator implements ActivationHandler {
526532

527533
public async signalWorkflowNextHandler({ signalName, args }: SignalInput): Promise<void> {
528534
const fn = this.signalHandlers.get(signalName);
529-
if (fn === undefined) {
535+
if (fn) {
536+
return await fn(...args);
537+
} else if (this.defaultSignalHandler) {
538+
return await this.defaultSignalHandler(signalName, ...args);
539+
} else {
530540
throw new IllegalStateError(`No registered signal handler for signal ${signalName}`);
531541
}
532-
return await fn(...args);
533542
}
534543

535544
public signalWorkflow(activation: coresdk.workflow_activation.ISignalWorkflow): void {
@@ -538,14 +547,8 @@ export class Activator implements ActivationHandler {
538547
throw new TypeError('Missing activation signalName');
539548
}
540549

541-
const fn = this.signalHandlers.get(signalName);
542-
if (fn === undefined) {
543-
let buffer = this.bufferedSignals.get(signalName);
544-
if (buffer === undefined) {
545-
buffer = [];
546-
this.bufferedSignals.set(signalName, buffer);
547-
}
548-
buffer.push(activation);
550+
if (!this.signalHandlers.has(signalName) && !this.defaultSignalHandler) {
551+
this.bufferedSignals.push(activation);
549552
return;
550553
}
551554

@@ -561,6 +564,22 @@ export class Activator implements ActivationHandler {
561564
}).catch(this.handleWorkflowFailure.bind(this));
562565
}
563566

567+
public dispatchBufferedSignals(): void {
568+
const bufferedSignals = this.bufferedSignals;
569+
while (bufferedSignals.length) {
570+
if (this.defaultSignalHandler) {
571+
// We have a default signal handler, so all signals are dispatchable
572+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
573+
this.signalWorkflow(bufferedSignals.shift()!);
574+
} else {
575+
const foundIndex = bufferedSignals.findIndex((signal) => this.signalHandlers.has(signal.signalName ?? ''));
576+
if (foundIndex === -1) break;
577+
const [signal] = bufferedSignals.splice(foundIndex, 1);
578+
this.signalWorkflow(signal);
579+
}
580+
}
581+
}
582+
564583
public resolveSignalExternalWorkflow(activation: coresdk.workflow_activation.IResolveSignalExternalWorkflow): void {
565584
const { resolve, reject } = this.consumeCompletion('signalWorkflow', getSeq(activation));
566585
if (activation.failure) {

0 commit comments

Comments
 (0)