Skip to content

Commit 844e3f0

Browse files
authored
getUpdateHandle (#1312)
Add `getUpdateHandle` API. Fixes #1306.
1 parent 1f30153 commit 844e3f0

File tree

2 files changed

+70
-8
lines changed

2 files changed

+70
-8
lines changed

packages/client/src/workflow-client.ts

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,15 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
164164
options?: WorkflowUpdateOptions & { args?: Args }
165165
): Promise<WorkflowUpdateHandle<Ret>>;
166166

167+
/**
168+
* Get a handle to an Update.
169+
*/
170+
getUpdateHandle<Ret>(
171+
updateId: string,
172+
workflowId: string,
173+
options?: GetWorkflowUpdateHandleOptions
174+
): WorkflowUpdateHandle<Ret>;
175+
167176
/**
168177
* Query a running or completed Workflow.
169178
*
@@ -296,6 +305,9 @@ export interface WorkflowResultOptions {
296305
followRuns?: boolean;
297306
}
298307

308+
/**
309+
* Options for {@link WorkflowClient.getHandle}
310+
*/
299311
export interface GetWorkflowHandleOptions extends WorkflowResultOptions {
300312
/**
301313
* ID of the first execution in the Workflow execution chain.
@@ -351,7 +363,7 @@ export interface WorkflowUpdateHandle<Ret> {
351363
/**
352364
* The ID of the Run of the Workflow being targeted by this Update request.
353365
*/
354-
workflowRunId: string;
366+
workflowRunId?: string;
355367

356368
/**
357369
* Return the result of the Update.
@@ -360,6 +372,16 @@ export interface WorkflowUpdateHandle<Ret> {
360372
result(): Promise<Ret>;
361373
}
362374

375+
/**
376+
* Options for {@link WorkflowHandle.getUpdateHandle}
377+
*/
378+
export interface GetWorkflowUpdateHandleOptions {
379+
/**
380+
* The ID of the Run of the Workflow targeted by the Update.
381+
*/
382+
workflowRunId?: string;
383+
}
384+
363385
/**
364386
* Options for {@link WorkflowClient.list}
365387
*/
@@ -775,15 +797,19 @@ export class WorkflowClient extends BaseClient {
775797
}
776798

777799
protected createWorkflowUpdateHandle<Ret>(
778-
input: WorkflowStartUpdateInput,
779-
{ updateId, workflowRunId, outcome }: WorkflowStartUpdateOutput
800+
updateId: string,
801+
workflowId: string,
802+
options?: GetWorkflowUpdateHandleOptions,
803+
outcome?: temporal.api.update.v1.IOutcome
780804
): WorkflowUpdateHandle<Ret> {
805+
const workflowRunId = options?.workflowRunId;
781806
return {
782807
updateId,
783-
workflowId: input.workflowExecution.workflowId,
808+
workflowId,
784809
workflowRunId,
785810
result: async () => {
786-
const completedOutcome = outcome ?? (await this._pollForUpdateOutcome(updateId, input.workflowExecution));
811+
const completedOutcome =
812+
outcome ?? (await this._pollForUpdateOutcome(updateId, { workflowId, runId: workflowRunId }));
787813
if (completedOutcome.failure) {
788814
throw new WorkflowUpdateFailedError(
789815
'Workflow Update failed',
@@ -1048,7 +1074,12 @@ export class WorkflowClient extends BaseClient {
10481074
options: opts,
10491075
};
10501076
const output = await fn(input);
1051-
return this.createWorkflowUpdateHandle<Ret>(input, output);
1077+
return this.createWorkflowUpdateHandle<Ret>(
1078+
output.updateId,
1079+
input.workflowExecution.workflowId,
1080+
{ workflowRunId: output.workflowRunId },
1081+
output.outcome
1082+
);
10521083
};
10531084

10541085
return {
@@ -1113,7 +1144,6 @@ export class WorkflowClient extends BaseClient {
11131144
options
11141145
);
11151146
},
1116-
11171147
async executeUpdate<Ret, Args extends any[]>(
11181148
def: UpdateDefinition<Ret, Args> | string,
11191149
options?: WorkflowUpdateOptions & { args?: Args }
@@ -1126,7 +1156,13 @@ export class WorkflowClient extends BaseClient {
11261156
);
11271157
return await handle.result();
11281158
},
1129-
1159+
getUpdateHandle<Ret>(
1160+
updateId: string,
1161+
workflowId: string,
1162+
options?: GetWorkflowUpdateHandleOptions
1163+
): WorkflowUpdateHandle<Ret> {
1164+
return this.client.createWorkflowUpdateHandle(updateId, workflowId, options);
1165+
},
11301166
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
11311167
const next = this.client._signalWorkflowHandler.bind(this.client);
11321168
const fn = composeInterceptors(interceptors, 'signal', next);

packages/test/src/test-integration-update.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { status as grpcStatus } from '@grpc/grpc-js';
2+
import { isGrpcServiceError } from '@temporalio/client';
13
import * as wf from '@temporalio/workflow';
24
import { helpers, makeTestFunction } from './helpers-integration';
35

@@ -59,6 +61,30 @@ test('Update can be executed via startUpdate() and handle.result()', async (t) =
5961
});
6062
});
6163

64+
test('Update handle can be created from identifiers and used to obtain result', async (t) => {
65+
const { createWorker, startWorkflow } = helpers(t);
66+
const worker = await createWorker();
67+
await worker.runUntil(async () => {
68+
const updateId = 'my-update-id';
69+
const wfHandle = await startWorkflow(workflowWithUpdates);
70+
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, { args: ['1'], updateId });
71+
72+
const updateHandle = wfHandle.getUpdateHandle(updateId, wfHandle.workflowId);
73+
t.deepEqual(await updateHandle.result(), ['1']);
74+
75+
t.truthy(updateHandleFromStartUpdate.workflowRunId);
76+
const updateHandle2 = wfHandle.getUpdateHandle(updateId, wfHandle.workflowId, {
77+
workflowRunId: updateHandleFromStartUpdate.workflowRunId,
78+
});
79+
t.deepEqual(await updateHandle2.result(), ['1']);
80+
81+
const incorrectRunId = wf.uuid4();
82+
const updateHandle3 = wfHandle.getUpdateHandle(updateId, wfHandle.workflowId, { workflowRunId: incorrectRunId });
83+
const err = await t.throwsAsync(updateHandle3.result());
84+
t.true(isGrpcServiceError(err) && err.code === grpcStatus.NOT_FOUND);
85+
});
86+
});
87+
6288
const activities = {
6389
async myActivity(): Promise<number> {
6490
return 3;

0 commit comments

Comments
 (0)