Skip to content

Commit c5b09de

Browse files
feat(workflow): Support workflow ID conflict policy (#1490)
Co-authored-by: James Watkins-Harvey <mjameswh@users.noreply.github.com>
1 parent 5c3955f commit c5b09de

File tree

5 files changed

+186
-9
lines changed

5 files changed

+186
-9
lines changed

packages/client/src/workflow-client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,7 @@ export class WorkflowClient extends BaseClient {
939939
requestId: uuid4(),
940940
workflowId: options.workflowId,
941941
workflowIdReusePolicy: options.workflowIdReusePolicy,
942+
workflowIdConflictPolicy: options.workflowIdConflictPolicy,
942943
workflowType: { name: workflowType },
943944
input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) },
944945
signalName,
@@ -989,6 +990,7 @@ export class WorkflowClient extends BaseClient {
989990
requestId: uuid4(),
990991
workflowId: opts.workflowId,
991992
workflowIdReusePolicy: opts.workflowIdReusePolicy,
993+
workflowIdConflictPolicy: opts.workflowIdConflictPolicy,
992994
workflowType: { name: workflowType },
993995
input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) },
994996
taskQueue: {

packages/client/src/workflow-options.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
1515
*
1616
* Assign a meaningful business id.
1717
* This ID can be used to ensure starting Workflows is idempotent.
18-
* Workflow IDs are unique, see also {@link WorkflowOptions.workflowIdReusePolicy}
18+
* Workflow IDs are unique: see {@link WorkflowOptions.workflowIdReusePolicy}
19+
* and {@link WorkflowOptions.workflowIdConflictPolicy}.
1920
*/
2021
workflowId: string;
2122

packages/common/src/failure.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ export class ChildWorkflowFailure extends TemporalFailure {
279279

280280
/**
281281
* This exception is thrown in the following cases:
282-
* - Workflow with the same Workflow Id is currently running
282+
* - Workflow with the same Workflow ID is currently running and the {@link WorkflowOptions.workflowIdConflictPolicy} is `WORKFLOW_ID_CONFLICT_POLICY_FAIL`
283283
* - There is a closed Workflow with the same Workflow Id and the {@link WorkflowOptions.workflowIdReusePolicy}
284284
* is `WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE`
285285
* - There is closed Workflow in the `Completed` state with the same Workflow Id and the {@link WorkflowOptions.workflowIdReusePolicy}

packages/common/src/workflow-options.ts

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import { checkExtends } from './type-helpers';
77
// Avoid importing the proto implementation to reduce workflow bundle size
88
// Copied from temporal.api.enums.v1.WorkflowIdReusePolicy
99
/**
10+
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
11+
*
12+
* See {@link WorkflowOptions.workflowIdConflictPolicy} for what happens when trying to start a
13+
* Workflow with the same ID as a *Running* Workflow.
14+
*
1015
* Concept: {@link https://docs.temporal.io/concepts/what-is-a-workflow-id-reuse-policy/ | Workflow Id Reuse Policy}
1116
*
12-
* Whether a Workflow can be started with a Workflow Id of a Closed Workflow.
17+
* *Note: It is not possible to have two actively running Workflows with the same ID.*
1318
*
14-
* *Note: A Workflow can never be started with a Workflow Id of a Running Workflow.*
1519
*/
1620
export enum WorkflowIdReusePolicy {
1721
/**
@@ -38,24 +42,78 @@ export enum WorkflowIdReusePolicy {
3842
WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE = 3,
3943

4044
/**
41-
* Terminate the current workflow if one is already running.
45+
* Terminate the current Workflow if one is already running; otherwise allow reusing the Workflow ID.
46+
*
47+
* @deprecated Use {@link WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE} instead, and
48+
* set `WorkflowOptions.workflowIdConflictPolicy` to
49+
* {@link WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING}.
50+
* When using this option, `WorkflowOptions.workflowIdConflictPolicy` must be left unspecified.
4251
*/
4352
WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING = 4,
4453
}
4554

4655
checkExtends<temporal.api.enums.v1.WorkflowIdReusePolicy, WorkflowIdReusePolicy>();
4756
checkExtends<WorkflowIdReusePolicy, temporal.api.enums.v1.WorkflowIdReusePolicy>();
4857

58+
// Avoid importing the proto implementation to reduce workflow bundle size
59+
// Copied from temporal.api.enums.v1.WorkflowIdConflictPolicy
60+
/**
61+
* Defines what happens when trying to start a Workflow with the same ID as a *Running* Workflow.
62+
*
63+
* See {@link WorkflowOptions.workflowIdReusePolicy} for what happens when trying to start a Workflow
64+
* with the same ID as a *Closed* Workflow.
65+
*
66+
* *Note: It is not possible to have two actively running Workflows with the same ID.*
67+
*/
68+
export enum WorkflowIdConflictPolicy {
69+
/**
70+
* This is the default so that we can set the policy
71+
* `WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING` in `WorkflowIdReusePolicy`, which is incompatible
72+
* with setting any other `WorkflowIdConflictPolicy` values.
73+
*
74+
* The actual default behavior is `WORKFLOW_ID_CONFLICT_POLICY_FAIL` for a start Workflow request,
75+
* and `WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING` for a signal with start Workflow request.
76+
*/
77+
WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED = 0,
78+
79+
/**
80+
* Do not start a new Workflow. Instead raise a `WorkflowExecutionAlreadyStartedError`.
81+
*/
82+
WORKFLOW_ID_CONFLICT_POLICY_FAIL = 1,
83+
84+
/**
85+
* Do not start a new Workflow. Instead return a Workflow Handle for the currently Running Workflow.
86+
*/
87+
WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING = 2,
88+
89+
/**
90+
* Start a new Workflow, terminating the current workflow if one is already running.
91+
*/
92+
WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING = 3,
93+
}
94+
95+
checkExtends<temporal.api.enums.v1.WorkflowIdConflictPolicy, WorkflowIdConflictPolicy>();
96+
checkExtends<WorkflowIdConflictPolicy, temporal.api.enums.v1.WorkflowIdConflictPolicy>();
97+
4998
export interface BaseWorkflowOptions {
5099
/**
51-
* Whether a Workflow can be started with a Workflow Id of a Closed Workflow.
100+
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
52101
*
53-
* *Note: A Workflow can never be started with a Workflow Id of a Running Workflow.*
102+
* *Note: It is not possible to have two actively running Workflows with the same ID.*
54103
*
55104
* @default {@link WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE}
56105
*/
57106
workflowIdReusePolicy?: WorkflowIdReusePolicy;
58107

108+
/**
109+
* Defines what happens when trying to start a Workflow with the same ID as a *Running* Workflow.
110+
*
111+
* *Note: It is not possible to have two actively running Workflows with the same ID.*
112+
*
113+
* @default {@link WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED}
114+
*/
115+
workflowIdConflictPolicy?: WorkflowIdConflictPolicy;
116+
59117
/**
60118
* Controls how a Workflow Execution is retried.
61119
*

packages/test/src/test-integration-workflows.ts

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
77
import { TestWorkflowEnvironment } from '@temporalio/testing';
88
import { CancelReason } from '@temporalio/worker/lib/activity';
99
import * as workflow from '@temporalio/workflow';
10-
import { defineQuery, defineSignal } from '@temporalio/workflow';
10+
import { defineQuery, defineSignal, WorkflowIdConflictPolicy } from '@temporalio/workflow';
1111
import { SdkFlags } from '@temporalio/workflow/lib/flags';
12-
import { ActivityCancellationType, ApplicationFailure } from '@temporalio/common';
12+
import { ActivityCancellationType, ApplicationFailure, WorkflowExecutionAlreadyStartedError } from '@temporalio/common';
1313
import { signalSchedulingWorkflow } from './activities/helpers';
1414
import { activityStartedSignal } from './workflows/definitions';
1515
import * as workflows from './workflows';
@@ -216,6 +216,122 @@ test('Start of workflow is delayed', async (t) => {
216216
t.is(tsToMs(startDelay), 5678000);
217217
});
218218

219+
export async function conflictId(): Promise<void> {
220+
await workflow.condition(() => false);
221+
}
222+
223+
test('Start of workflow respects workflow id conflict policy', async (t) => {
224+
const { createWorker, taskQueue } = helpers(t);
225+
const wfid = `${taskQueue}-` + randomUUID();
226+
const client = t.context.env.client;
227+
228+
const worker = await createWorker();
229+
await worker.runUntil(async () => {
230+
const handle = await client.workflow.start(conflictId, {
231+
taskQueue,
232+
workflowId: wfid,
233+
});
234+
const handleWithRunId = client.workflow.getHandle(handle.workflowId, handle.firstExecutionRunId);
235+
236+
// Confirm another fails by default
237+
const err = await t.throwsAsync(
238+
client.workflow.start(conflictId, {
239+
taskQueue,
240+
workflowId: wfid,
241+
}),
242+
{
243+
instanceOf: WorkflowExecutionAlreadyStartedError,
244+
}
245+
);
246+
247+
t.true(err instanceof WorkflowExecutionAlreadyStartedError);
248+
249+
// Confirm fails with explicit option
250+
const err1 = await t.throwsAsync(
251+
client.workflow.start(conflictId, {
252+
taskQueue,
253+
workflowId: wfid,
254+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
255+
}),
256+
{
257+
instanceOf: WorkflowExecutionAlreadyStartedError,
258+
}
259+
);
260+
261+
t.true(err1 instanceof WorkflowExecutionAlreadyStartedError);
262+
263+
// Confirm gives back same handle
264+
const handle2 = await client.workflow.start(conflictId, {
265+
taskQueue,
266+
workflowId: wfid,
267+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
268+
});
269+
270+
const desc = await handleWithRunId.describe();
271+
const desc2 = await handle2.describe();
272+
273+
t.is(desc.runId, desc2.runId);
274+
t.is(desc.status.name, 'RUNNING');
275+
t.is(desc2.status.name, 'RUNNING');
276+
277+
// Confirm terminates and starts new
278+
const handle3 = await client.workflow.start(conflictId, {
279+
taskQueue,
280+
workflowId: wfid,
281+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
282+
});
283+
284+
const descWithRunId = await handleWithRunId.describe();
285+
const desc3 = await handle3.describe();
286+
t.not(descWithRunId.runId, desc3.runId);
287+
t.is(descWithRunId.status.name, 'TERMINATED');
288+
t.is(desc3.status.name, 'RUNNING');
289+
});
290+
});
291+
292+
test('Start of workflow with signal respects conflict id policy', async (t) => {
293+
const { createWorker, taskQueue } = helpers(t);
294+
const wfid = `${taskQueue}-` + randomUUID();
295+
const client = t.context.env.client;
296+
const worker = await createWorker();
297+
await worker.runUntil(async () => {
298+
const handle = await client.workflow.start(workflows.signalTarget, {
299+
taskQueue,
300+
workflowId: wfid,
301+
});
302+
const handleWithRunId = client.workflow.getHandle(handle.workflowId, handle.firstExecutionRunId);
303+
304+
// Confirm gives back same handle is the default policy
305+
const handle2 = await t.context.env.client.workflow.signalWithStart(workflows.signalTarget, {
306+
taskQueue,
307+
workflowId: wfid,
308+
signal: workflows.argsTestSignal,
309+
signalArgs: [123, 'kid'],
310+
});
311+
const desc = await handleWithRunId.describe();
312+
const desc2 = await handle2.describe();
313+
314+
t.deepEqual(desc.runId, desc2.runId);
315+
t.deepEqual(desc.status.name, 'RUNNING');
316+
t.deepEqual(desc2.status.name, 'RUNNING');
317+
318+
// Confirm terminates and starts new
319+
const handle3 = await t.context.env.client.workflow.signalWithStart(workflows.signalTarget, {
320+
taskQueue,
321+
workflowId: wfid,
322+
signal: workflows.argsTestSignal,
323+
signalArgs: [123, 'kid'],
324+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
325+
});
326+
327+
const descWithRunId = await handleWithRunId.describe();
328+
const desc3 = await handle3.describe();
329+
t.true(descWithRunId.runId !== desc3.runId);
330+
t.deepEqual(descWithRunId.status.name, 'TERMINATED');
331+
t.deepEqual(desc3.status.name, 'RUNNING');
332+
});
333+
});
334+
219335
test('Start of workflow with signal is delayed', async (t) => {
220336
const { taskQueue } = helpers(t);
221337
// This workflow never runs

0 commit comments

Comments
 (0)