Skip to content

Commit 12c9e2a

Browse files
authored
Make startUpdate require wait stage (#1448)
1 parent daf1dca commit 12c9e2a

File tree

8 files changed

+200
-51
lines changed

8 files changed

+200
-51
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ jobs:
424424
typescript-repo-path: ${{github.event.pull_request.head.repo.full_name}}
425425
version: ${{github.event.pull_request.head.ref}}
426426
version-is-repo-ref: true
427+
features-repo-ref: sdk-1403-ts-startUpdate-require-wait-stage
427428

428429
stress-tests-no-reuse-context:
429430
name: Stress Tests (No Reuse V8 Context)

packages/client/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export * from './workflow-options';
3939
export * from './schedule-types';
4040
export * from './schedule-client';
4141
export * from './task-queue-client';
42+
export { WorkflowUpdateStage } from './workflow-update-stage';
4243
export {
4344
WorkerBuildIdVersionSets,
4445
BuildIdVersionSet,

packages/client/src/workflow-client.ts

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ import {
8181
WithDefaults,
8282
} from './base-client';
8383
import { mapAsyncIterable } from './iterators-utils';
84+
import { WorkflowUpdateStage } from './workflow-update-stage';
85+
import * as workflowUpdateStage from './workflow-update-stage';
8486

8587
/**
8688
* A client side handle to a single Workflow instance.
@@ -140,8 +142,8 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
140142
): Promise<Ret>;
141143

142144
/**
143-
* Start an Update and receive a handle to the Update.
144-
* The Update validator (if present) is run before the handle is returned.
145+
* Start an Update and receive a handle to the Update. The Update validator (if present) is run
146+
* before the handle is returned.
145147
*
146148
* @experimental Update is an experimental feature.
147149
*
@@ -150,22 +152,41 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
150152
* mean the update itself was timed out or cancelled.
151153
*
152154
* @param def an Update definition as returned from {@link defineUpdate}
153-
* @param options Update arguments
155+
* @param options update arguments, and update lifecycle stage to wait for
156+
*
157+
* Currently, startUpdate always waits until a worker is accepting tasks for the workflow and the
158+
* update is accepted or rejected, and the options object must be at least
159+
* ```ts
160+
* {
161+
* waitForStage: WorkflowUpdateStage.ACCEPTED
162+
* }
163+
* ```
164+
* If the update takes arguments, then the options object must additionally contain an `args`
165+
* property with an array of argument values.
154166
*
155167
* @example
156168
* ```ts
157-
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
169+
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, {
170+
* args: [2],
171+
* waitForStage: WorkflowUpdateStage.ACCEPTED,
172+
* });
158173
* const updateResult = await updateHandle.result();
159174
* ```
160175
*/
161176
startUpdate<Ret, Args extends [any, ...any[]], Name extends string = string>(
162177
def: UpdateDefinition<Ret, Args, Name> | string,
163-
options: WorkflowUpdateOptions & { args: Args }
178+
options: WorkflowUpdateOptions & {
179+
args: Args;
180+
waitForStage: WorkflowUpdateStage.ACCEPTED;
181+
}
164182
): Promise<WorkflowUpdateHandle<Ret>>;
165183

166184
startUpdate<Ret, Args extends [], Name extends string = string>(
167185
def: UpdateDefinition<Ret, Args, Name> | string,
168-
options?: WorkflowUpdateOptions & { args?: Args }
186+
options: WorkflowUpdateOptions & {
187+
args?: Args;
188+
waitForStage: WorkflowUpdateStage.ACCEPTED;
189+
}
169190
): Promise<WorkflowUpdateHandle<Ret>>;
170191

171192
/**
@@ -782,15 +803,17 @@ export class WorkflowClient extends BaseClient {
782803
* Used as the final function of the interceptor chain during startUpdate and executeUpdate.
783804
*/
784805
protected async _startUpdateHandler(
785-
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
806+
waitForStage: WorkflowUpdateStage,
786807
input: WorkflowStartUpdateInput
787808
): Promise<WorkflowStartUpdateOutput> {
788809
const updateId = input.options?.updateId ?? uuid4();
789810
const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = {
790811
namespace: this.options.namespace,
791812
workflowExecution: input.workflowExecution,
792813
firstExecutionRunId: input.firstExecutionRunId,
793-
waitPolicy: { lifecycleStage: waitForStage },
814+
waitPolicy: {
815+
lifecycleStage: workflowUpdateStage.toProtoEnum(waitForStage),
816+
},
794817
request: {
795818
meta: {
796819
updateId,
@@ -811,11 +834,7 @@ export class WorkflowClient extends BaseClient {
811834
try {
812835
do {
813836
response = await this.workflowService.updateWorkflowExecution(req);
814-
} while (
815-
response.stage < waitForStage &&
816-
response.stage <
817-
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
818-
);
837+
} while (response.stage < waitForStage && response.stage < WorkflowUpdateStage.ACCEPTED);
819838
} catch (err) {
820839
this.rethrowUpdateGrpcError(err, 'Workflow Update failed', input.workflowExecution);
821840
}
@@ -865,9 +884,7 @@ export class WorkflowClient extends BaseClient {
865884
updateRef: { workflowExecution, updateId },
866885
identity: this.options.identity,
867886
waitPolicy: {
868-
lifecycleStage:
869-
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
870-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
887+
lifecycleStage: workflowUpdateStage.toProtoEnum(WorkflowUpdateStage.COMPLETED),
871888
},
872889
};
873890
for (;;) {
@@ -1076,7 +1093,7 @@ export class WorkflowClient extends BaseClient {
10761093
}: WorkflowHandleOptions): WorkflowHandle<T> {
10771094
const _startUpdate = async <Ret, Args extends unknown[]>(
10781095
def: UpdateDefinition<Ret, Args> | string,
1079-
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
1096+
waitForStage: WorkflowUpdateStage,
10801097
options?: WorkflowUpdateOptions & { args?: Args }
10811098
): Promise<WorkflowUpdateHandle<Ret>> => {
10821099
const next = this._startUpdateHandler.bind(this, waitForStage);
@@ -1098,12 +1115,7 @@ export class WorkflowClient extends BaseClient {
10981115
output.workflowRunId,
10991116
output.outcome
11001117
);
1101-
if (
1102-
!output.outcome &&
1103-
waitForStage ===
1104-
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
1105-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
1106-
) {
1118+
if (!output.outcome && waitForStage === WorkflowUpdateStage.COMPLETED) {
11071119
await this._pollForUpdateOutcome(handle.updateId, input.workflowExecution);
11081120
}
11091121
return handle;
@@ -1162,25 +1174,18 @@ export class WorkflowClient extends BaseClient {
11621174
},
11631175
async startUpdate<Ret, Args extends any[]>(
11641176
def: UpdateDefinition<Ret, Args> | string,
1165-
options?: WorkflowUpdateOptions & { args?: Args }
1177+
options: WorkflowUpdateOptions & {
1178+
args?: Args;
1179+
waitForStage: WorkflowUpdateStage.ACCEPTED;
1180+
}
11661181
): Promise<WorkflowUpdateHandle<Ret>> {
1167-
return await _startUpdate(
1168-
def,
1169-
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
1170-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
1171-
options
1172-
);
1182+
return await _startUpdate(def, options.waitForStage, options);
11731183
},
11741184
async executeUpdate<Ret, Args extends any[]>(
11751185
def: UpdateDefinition<Ret, Args> | string,
11761186
options?: WorkflowUpdateOptions & { args?: Args }
11771187
): Promise<Ret> {
1178-
const handle = await _startUpdate(
1179-
def,
1180-
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
1181-
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
1182-
options
1183-
);
1188+
const handle = await _startUpdate(def, WorkflowUpdateStage.COMPLETED, options);
11841189
return await handle.result();
11851190
},
11861191
getUpdateHandle<Ret>(updateId: string): WorkflowUpdateHandle<Ret> {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { temporal } from '@temporalio/proto';
2+
import { checkExtends } from '@temporalio/common/lib/type-helpers';
3+
4+
export enum WorkflowUpdateStage {
5+
/** This is not an allowed value. */
6+
UNSPECIFIED = 0,
7+
/** Admitted stage. This stage is reached when the server accepts the update request. It is not
8+
* allowed to wait for this stage when using startUpdate, since the update request has not yet
9+
* been durably persisted at this stage. */
10+
ADMITTED = 1,
11+
/** Accepted stage. This stage is reached when a workflow has received the update and either
12+
* accepted it (i.e. it has passed validation, or there was no validator configured on the update
13+
* handler) or rejected it. This is currently the only allowed value when using startUpdate. */
14+
ACCEPTED = 2,
15+
/** Completed stage. This stage is reached when a workflow has completed processing the
16+
* update with either a success or failure. */
17+
COMPLETED = 3,
18+
}
19+
20+
checkExtends<
21+
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${keyof typeof WorkflowUpdateStage}`,
22+
keyof typeof temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
23+
>();
24+
checkExtends<
25+
keyof typeof temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
26+
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${keyof typeof WorkflowUpdateStage}`
27+
>();
28+
29+
export function toProtoEnum(stage: WorkflowUpdateStage): temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage {
30+
return temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage[
31+
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${WorkflowUpdateStage[stage] as keyof typeof WorkflowUpdateStage}`
32+
];
33+
}

packages/test/src/test-integration-update-interceptors.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkflowStartUpdateInput, WorkflowStartUpdateOutput } from '@temporalio/client';
1+
import { WorkflowStartUpdateInput, WorkflowStartUpdateOutput, WorkflowUpdateStage } from '@temporalio/client';
22
import * as wf from '@temporalio/workflow';
33
import { Next, UpdateInput, WorkflowInboundCallsInterceptor, WorkflowInterceptors } from '@temporalio/workflow';
44
import { helpers, makeTestFunction } from './helpers-integration';
@@ -66,7 +66,10 @@ test('Update client and inbound interceptors work for startUpdate', async (t) =>
6666
await worker.runUntil(async () => {
6767
const wfHandle = await startWorkflow(workflowWithUpdate);
6868

69-
const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
69+
const updateHandle = await wfHandle.startUpdate(update, {
70+
args: ['1'],
71+
waitForStage: WorkflowUpdateStage.ACCEPTED,
72+
});
7073
const updateResult = await updateHandle.result();
7174
t.deepEqual(updateResult, '1-clientIntercepted-inboundIntercepted');
7275
});

packages/test/src/test-integration-update.ts

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client';
1+
import { WorkflowUpdateStage, WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client';
22
import * as wf from '@temporalio/workflow';
33
import { helpers, makeTestFunction } from './helpers-integration';
44

@@ -64,11 +64,16 @@ test('Update can be executed via startUpdate() and handle.result()', async (t) =
6464
await worker.runUntil(async () => {
6565
const wfHandle = await startWorkflow(workflowWithUpdates);
6666

67-
const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
67+
const updateHandle = await wfHandle.startUpdate(update, {
68+
args: ['1'],
69+
waitForStage: WorkflowUpdateStage.ACCEPTED,
70+
});
6871
const updateResult = await updateHandle.result();
6972
t.deepEqual(updateResult, ['1']);
7073

71-
const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate);
74+
const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate, {
75+
waitForStage: WorkflowUpdateStage.ACCEPTED,
76+
});
7277
const doneUpdateResult = await doneUpdateHandle.result();
7378
t.is(doneUpdateResult, undefined);
7479

@@ -83,7 +88,11 @@ test('Update handle can be created from identifiers and used to obtain result',
8388
await worker.runUntil(async () => {
8489
const updateId = 'my-update-id';
8590
const wfHandle = await startWorkflow(workflowWithUpdates);
86-
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, { args: ['1'], updateId });
91+
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, {
92+
args: ['1'],
93+
updateId,
94+
waitForStage: WorkflowUpdateStage.ACCEPTED,
95+
});
8796

8897
// Obtain update handle on workflow handle from start update.
8998
const updateHandle = wfHandle.getUpdateHandle(updateId);
@@ -175,9 +184,15 @@ test('Update validator can reject when using handle.result() but handle can be o
175184
const worker = await createWorker();
176185
await worker.runUntil(async () => {
177186
const wfHandle = await startWorkflow(workflowWithUpdateValidator);
178-
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['arg'] });
187+
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
188+
args: ['arg'],
189+
waitForStage: WorkflowUpdateStage.ACCEPTED,
190+
});
179191
t.is(await updateHandle.result(), 'update-result');
180-
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['bad-arg'] });
192+
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
193+
args: ['bad-arg'],
194+
waitForStage: WorkflowUpdateStage.ACCEPTED,
195+
});
181196
await assertWorkflowUpdateFailed(updateHandle.result(), wf.ApplicationFailure, 'Validation failed');
182197
});
183198
});
@@ -246,7 +261,10 @@ test('Update id can be assigned and is present on returned handle', async (t) =>
246261
const worker = await createWorker();
247262
await worker.runUntil(async () => {
248263
const wfHandle = await startWorkflow(workflowWithUpdates);
249-
const updateHandle = await wfHandle.startUpdate(doneUpdate, { updateId: 'my-update-id' });
264+
const updateHandle = await wfHandle.startUpdate(doneUpdate, {
265+
updateId: 'my-update-id',
266+
waitForStage: WorkflowUpdateStage.ACCEPTED,
267+
});
250268
t.is(updateHandle.updateId, 'my-update-id');
251269
});
252270
});
@@ -425,7 +443,10 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
425443
t.is(queryResult, 4);
426444
const updateResult = await wfHandle.executeUpdate(incrementAndGetValueUpdate, { args: [2] });
427445
t.is(updateResult, 6);
428-
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
446+
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, {
447+
args: [2],
448+
waitForStage: WorkflowUpdateStage.ACCEPTED,
449+
});
429450
const secondUpdateResult = await secondUpdateHandle.result();
430451
t.is(secondUpdateResult, 8);
431452
await wfHandle.cancel();
@@ -436,7 +457,12 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
436457
test('startUpdate does not return handle before update has reached requested stage', async (t) => {
437458
const { startWorkflow } = helpers(t);
438459
const wfHandle = await startWorkflow(workflowWithUpdates);
439-
const updatePromise = wfHandle.startUpdate(update, { args: ['1'] }).then(() => 'update');
460+
const updatePromise = wfHandle
461+
.startUpdate(update, {
462+
args: ['1'],
463+
waitForStage: WorkflowUpdateStage.ACCEPTED,
464+
})
465+
.then(() => 'update');
440466
const timeoutPromise = new Promise<string>((f) =>
441467
setTimeout(() => f('timeout'), 500 + LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1000)
442468
);
@@ -536,14 +562,18 @@ test('startUpdate throws WorkflowUpdateRPCTimeoutOrCancelledError with no worker
536562
const { startWorkflow } = helpers(t);
537563
const wfHandle = await startWorkflow(workflowWithUpdates);
538564
await t.context.env.client.withDeadline(Date.now() + 100, async () => {
539-
const err = await t.throwsAsync(wfHandle.startUpdate(update, { args: ['1'] }));
565+
const err = await t.throwsAsync(
566+
wfHandle.startUpdate(update, { args: ['1'], waitForStage: WorkflowUpdateStage.ACCEPTED })
567+
);
540568
t.true(err instanceof WorkflowUpdateRPCTimeoutOrCancelledError);
541569
});
542570

543571
const ctrl = new AbortController();
544572
setTimeout(() => ctrl.abort(), 10);
545573
await t.context.env.client.withAbortSignal(ctrl.signal, async () => {
546-
const err = await t.throwsAsync(wfHandle.startUpdate(update, { args: ['1'] }));
574+
const err = await t.throwsAsync(
575+
wfHandle.startUpdate(update, { args: ['1'], waitForStage: WorkflowUpdateStage.ACCEPTED })
576+
);
547577
t.true(err instanceof WorkflowUpdateRPCTimeoutOrCancelledError);
548578
});
549579
});

0 commit comments

Comments
 (0)