Skip to content

Commit 6ccfd51

Browse files
Add versioning intent to commands (#1156)
Co-authored-by: James Watkins-Harvey <james.watkinsharvey@temporal.io>
1 parent 93a4d9c commit 6ccfd51

File tree

10 files changed

+115
-38
lines changed

10 files changed

+115
-38
lines changed

packages/client/src/task-queue-client.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import { WorkflowService } from './types';
77
import { BuildIdOperation, versionSetsFromProto, WorkerBuildIdVersionSets } from './build-id-types';
88
import { isGrpcServiceError, ServiceError } from './errors';
99
import { rethrowKnownErrorTypes } from './helpers';
10-
import IUpdateWorkerBuildIdCompatibilityRequest = temporal.api.workflowservice.v1.IUpdateWorkerBuildIdCompatibilityRequest;
11-
import GetWorkerTaskReachabilityResponse = temporal.api.workflowservice.v1.GetWorkerTaskReachabilityResponse;
10+
11+
type IUpdateWorkerBuildIdCompatibilityRequest =
12+
temporal.api.workflowservice.v1.IUpdateWorkerBuildIdCompatibilityRequest;
13+
type GetWorkerTaskReachabilityResponse = temporal.api.workflowservice.v1.GetWorkerTaskReachabilityResponse;
1214

1315
/**
1416
* @experimental
@@ -173,12 +175,12 @@ export type ReachabilityOptions = RequireAtLeastOne<BaseReachabilityOptions, 'bu
173175

174176
/**
175177
* There are different types of reachability:
176-
* - `NewWorkflows`: The Build Id might be used by new workflows
177-
* - `ExistingWorkflows` The Build Id might be used by open workflows and/or closed workflows.
178-
* - `OpenWorkflows` The Build Id might be used by open workflows
179-
* - `ClosedWorkflows` The Build Id might be used by closed workflows
178+
* - `NEW_WORKFLOWS`: The Build Id might be used by new workflows
179+
* - `EXISTING_WORKFLOWS` The Build Id might be used by open workflows and/or closed workflows.
180+
* - `OPEN_WORKFLOWS` The Build Id might be used by open workflows
181+
* - `CLOSED_WORKFLOWS` The Build Id might be used by closed workflows
180182
*/
181-
export type ReachabilityType = 'NewWorkflows' | 'ExistingWorkflows' | 'OpenWorkflows' | 'ClosedWorkflows';
183+
export type ReachabilityType = 'NEW_WORKFLOWS' | 'EXISTING_WORKFLOWS' | 'OPEN_WORKFLOWS' | 'CLOSED_WORKFLOWS';
182184

183185
/**
184186
* See {@link ReachabilityOptions}
@@ -203,7 +205,7 @@ export interface ReachabilityResponse {
203205
buildIdReachability: Record<string | UnversionedBuildIdType, BuildIdReachability>;
204206
}
205207

206-
export type ReachabilityTypeResponse = ReachabilityType | 'NotFetched';
208+
export type ReachabilityTypeResponse = ReachabilityType | 'NOT_FETCHED';
207209

208210
export interface BuildIdReachability {
209211
/**
@@ -218,13 +220,13 @@ function reachabilityTypeToProto(type: ReachabilityType | undefined | null): tem
218220
case null:
219221
case undefined:
220222
return temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_UNSPECIFIED;
221-
case 'NewWorkflows':
223+
case 'NEW_WORKFLOWS':
222224
return temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_NEW_WORKFLOWS;
223-
case 'ExistingWorkflows':
225+
case 'EXISTING_WORKFLOWS':
224226
return temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_EXISTING_WORKFLOWS;
225-
case 'OpenWorkflows':
227+
case 'OPEN_WORKFLOWS':
226228
return temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_OPEN_WORKFLOWS;
227-
case 'ClosedWorkflows':
229+
case 'CLOSED_WORKFLOWS':
228230
return temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_CLOSED_WORKFLOWS;
229231
default:
230232
assertNever('Unknown Build Id reachability operation', type);
@@ -263,15 +265,15 @@ export function reachabilityResponseFromProto(resp: GetWorkerTaskReachabilityRes
263265
function reachabilityTypeFromProto(rtype: temporal.api.enums.v1.TaskReachability): ReachabilityTypeResponse {
264266
switch (rtype) {
265267
case temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_UNSPECIFIED:
266-
return 'NotFetched';
268+
return 'NOT_FETCHED';
267269
case temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_NEW_WORKFLOWS:
268-
return 'NewWorkflows';
270+
return 'NEW_WORKFLOWS';
269271
case temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_EXISTING_WORKFLOWS:
270-
return 'ExistingWorkflows';
272+
return 'EXISTING_WORKFLOWS';
271273
case temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_OPEN_WORKFLOWS:
272-
return 'OpenWorkflows';
274+
return 'OPEN_WORKFLOWS';
273275
case temporal.api.enums.v1.TaskReachability.TASK_REACHABILITY_CLOSED_WORKFLOWS:
274-
return 'ClosedWorkflows';
276+
return 'CLOSED_WORKFLOWS';
275277
default:
276278
return assertNever('Unknown Build Id reachability operation', rtype);
277279
}

packages/common/src/activity-options.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { coresdk } from '@temporalio/proto';
22
import { RetryPolicy } from './retry-policy';
33
import { checkExtends } from './type-helpers';
44
import { Duration } from './time';
5+
import { VersioningIntent } from './versioning-intent';
56

67
// Avoid importing the proto implementation to reduce workflow bundle size
78
// Copied from coresdk.workflow_commands.ActivityCancellationType
@@ -100,6 +101,14 @@ export interface ActivityOptions {
100101
* @default true
101102
*/
102103
allowEagerDispatch?: boolean;
104+
105+
/**
106+
* When using the Worker Versioning feature, specifies whether this Activity should run on a
107+
* worker with a compatible Build Id or not. See {@link VersioningIntent}.
108+
*
109+
* @experimental
110+
*/
111+
versioningIntent?: VersioningIntent;
103112
}
104113

105114
/**

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export * from './retry-policy';
2323
export { type Timestamp, Duration, StringValue } from './time';
2424
export * from './workflow-handle';
2525
export * from './workflow-options';
26+
export * from './versioning-intent';
2627

2728
/**
2829
* Encode a UTF-8 string into a Uint8Array
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Indicates whether the user intends certain commands to be run on a compatible worker Build Id
3+
* version or not.
4+
*
5+
* `COMPATIBLE` indicates that the command should run on a worker with compatible version if
6+
* possible. It may not be possible if the target task queue does not also have knowledge of the
7+
* current worker's Build Id.
8+
*
9+
* `DEFAULT` indicates that the command should run on the target task queue's current
10+
* overall-default Build Id.
11+
*
12+
* Where this type is accepted optionally, an unset value indicates that the SDK should choose the
13+
* most sensible default behavior for the type of command, accounting for whether the command will
14+
* be run on the same task queue as the current worker.
15+
*
16+
* @experimental
17+
*/
18+
export type VersioningIntent = 'COMPATIBLE' | 'DEFAULT';

packages/test/src/test-replay.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { temporal } from '@temporalio/proto';
77
import { bundleWorkflowCode, ReplayError, WorkflowBundle } from '@temporalio/worker';
88
import { DeterminismViolationError } from '@temporalio/workflow';
99
import { Worker } from './helpers';
10-
import History = temporal.api.history.v1.History;
1110

1211
async function gen2array<T>(gen: AsyncIterable<T>): Promise<T[]> {
1312
const out: T[] = [];
@@ -21,19 +20,19 @@ export interface Context {
2120
bundle: WorkflowBundle;
2221
}
2322

24-
async function getHistories(fname: string): Promise<History> {
23+
async function getHistories(fname: string): Promise<temporal.api.history.v1.History> {
2524
const isJson = fname.endsWith('json');
2625
const fpath = path.resolve(__dirname, `../history_files/${fname}`);
2726
if (isJson) {
2827
const hist = await fs.promises.readFile(fpath, 'utf8');
2928
return JSON.parse(hist);
3029
} else {
3130
const hist = await fs.promises.readFile(fpath);
32-
return History.decode(hist);
31+
return temporal.api.history.v1.History.decode(hist);
3332
}
3433
}
3534

36-
function historator(histories: Array<History>) {
35+
function historator(histories: Array<temporal.api.history.v1.History>) {
3736
return (async function* () {
3837
for (const history of histories) {
3938
yield { workflowId: 'fake', history };

packages/test/src/test-worker-versioning-unit.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import test from 'ava';
22
import { reachabilityResponseFromProto, UnversionedBuildId } from '@temporalio/client/lib/task-queue-client';
33
import { temporal } from '@temporalio/proto';
4-
import TaskReachability = temporal.api.enums.v1.TaskReachability;
5-
import GetWorkerTaskReachabilityResponse = temporal.api.workflowservice.v1.GetWorkerTaskReachabilityResponse;
4+
5+
const TaskReachability = temporal.api.enums.v1.TaskReachability;
6+
const GetWorkerTaskReachabilityResponse = temporal.api.workflowservice.v1.GetWorkerTaskReachabilityResponse;
67

78
test('Worker versioning workers get appropriate tasks', async (t) => {
89
const res = reachabilityResponseFromProto(
@@ -79,12 +80,12 @@ test('Worker versioning workers get appropriate tasks', async (t) => {
7980
);
8081

8182
console.warn(res.buildIdReachability);
82-
t.deepEqual(res.buildIdReachability['2.0'].taskQueueReachability.foo, ['NewWorkflows']);
83-
t.deepEqual(res.buildIdReachability['1.0'].taskQueueReachability.foo, ['OpenWorkflows']);
84-
t.deepEqual(res.buildIdReachability['1.1'].taskQueueReachability.foo, ['ExistingWorkflows', 'NewWorkflows']);
85-
t.deepEqual(res.buildIdReachability['0.1'].taskQueueReachability.foo, ['ClosedWorkflows']);
83+
t.deepEqual(res.buildIdReachability['2.0'].taskQueueReachability.foo, ['NEW_WORKFLOWS']);
84+
t.deepEqual(res.buildIdReachability['1.0'].taskQueueReachability.foo, ['OPEN_WORKFLOWS']);
85+
t.deepEqual(res.buildIdReachability['1.1'].taskQueueReachability.foo, ['EXISTING_WORKFLOWS', 'NEW_WORKFLOWS']);
86+
t.deepEqual(res.buildIdReachability['0.1'].taskQueueReachability.foo, ['CLOSED_WORKFLOWS']);
8687
t.deepEqual(res.buildIdReachability['unreachable'].taskQueueReachability.foo, []);
87-
t.deepEqual(res.buildIdReachability['badboi'].taskQueueReachability.foo, ['NotFetched']);
88+
t.deepEqual(res.buildIdReachability['badboi'].taskQueueReachability.foo, ['NOT_FETCHED']);
8889
t.deepEqual(res.buildIdReachability[UnversionedBuildId].taskQueueReachability.foo, []);
8990

9091
t.pass();

packages/test/src/test-workflows.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import dedent from 'dedent';
66
import Long from 'long'; // eslint-disable-line import/no-named-as-default
77
import {
88
ApplicationFailure,
9-
defaultPayloadConverter,
109
defaultFailureConverter,
10+
defaultPayloadConverter,
1111
Payload,
1212
RetryState,
1313
toPayloads,
@@ -757,6 +757,7 @@ test('cancelWorkflow', async (t) => {
757757
startToCloseTimeout: msToTs('10m'),
758758
taskQueue: 'test',
759759
doNotEagerlyExecute: false,
760+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
760761
}),
761762
])
762763
);
@@ -791,6 +792,7 @@ test('cancelWorkflow', async (t) => {
791792
startToCloseTimeout: msToTs('10m'),
792793
taskQueue: 'test',
793794
doNotEagerlyExecute: false,
795+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
794796
}),
795797
])
796798
);
@@ -909,6 +911,7 @@ test('nonCancellable', async (t) => {
909911
startToCloseTimeout: msToTs('10m'),
910912
taskQueue: 'test',
911913
doNotEagerlyExecute: false,
914+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
912915
}),
913916
])
914917
);
@@ -937,6 +940,7 @@ test('resumeAfterCancellation', async (t) => {
937940
startToCloseTimeout: msToTs('10m'),
938941
taskQueue: 'test',
939942
doNotEagerlyExecute: false,
943+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
940944
}),
941945
])
942946
);
@@ -973,6 +977,7 @@ test('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => {
973977
startToCloseTimeout: msToTs('10m'),
974978
taskQueue: 'test',
975979
doNotEagerlyExecute: false,
980+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
976981
}),
977982
])
978983
);
@@ -998,6 +1003,7 @@ test('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => {
9981003
startToCloseTimeout: msToTs('10m'),
9991004
taskQueue: 'test',
10001005
doNotEagerlyExecute: false,
1006+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
10011007
}),
10021008
])
10031009
);
@@ -1028,6 +1034,7 @@ test('nestedCancellation', async (t) => {
10281034
startToCloseTimeout: msToTs('10m'),
10291035
taskQueue: 'test',
10301036
doNotEagerlyExecute: false,
1037+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
10311038
}),
10321039
])
10331040
);
@@ -1051,6 +1058,7 @@ test('nestedCancellation', async (t) => {
10511058
startToCloseTimeout: msToTs('10m'),
10521059
taskQueue: 'test',
10531060
doNotEagerlyExecute: false,
1061+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
10541062
}),
10551063
])
10561064
);
@@ -1074,6 +1082,7 @@ test('nestedCancellation', async (t) => {
10741082
startToCloseTimeout: msToTs('10m'),
10751083
taskQueue: 'test',
10761084
doNotEagerlyExecute: false,
1085+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
10771086
}),
10781087
])
10791088
);
@@ -1114,6 +1123,7 @@ test('sharedScopes', async (t) => {
11141123
startToCloseTimeout: msToTs('10m'),
11151124
taskQueue: 'test',
11161125
doNotEagerlyExecute: false,
1126+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
11171127
})
11181128
)
11191129
)
@@ -1150,6 +1160,7 @@ test('shieldAwaitedInRootScope', async (t) => {
11501160
startToCloseTimeout: msToTs('10m'),
11511161
taskQueue: 'test',
11521162
doNotEagerlyExecute: false,
1163+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
11531164
}),
11541165
])
11551166
);
@@ -1343,6 +1354,7 @@ test('cancelActivityAfterFirstCompletion', async (t) => {
13431354
startToCloseTimeout: msToTs('10m'),
13441355
taskQueue: 'test',
13451356
doNotEagerlyExecute: false,
1357+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
13461358
}),
13471359
])
13481360
);
@@ -1364,6 +1376,7 @@ test('cancelActivityAfterFirstCompletion', async (t) => {
13641376
startToCloseTimeout: msToTs('10m'),
13651377
taskQueue: 'test',
13661378
doNotEagerlyExecute: false,
1379+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
13671380
}),
13681381
])
13691382
);
@@ -1409,6 +1422,7 @@ test('multipleActivitiesSingleTimeout', async (t) => {
14091422
startToCloseTimeout: msToTs('1s'),
14101423
taskQueue: 'test',
14111424
doNotEagerlyExecute: false,
1425+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
14121426
})
14131427
)
14141428
)),
@@ -1450,6 +1464,7 @@ test('resolve activity with result - http', async (t) => {
14501464
startToCloseTimeout: msToTs('1 minute'),
14511465
taskQueue: 'test',
14521466
doNotEagerlyExecute: false,
1467+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
14531468
}),
14541469
])
14551470
);
@@ -1485,6 +1500,7 @@ test('resolve activity with failure - http', async (t) => {
14851500
startToCloseTimeout: msToTs('1 minute'),
14861501
taskQueue: 'test',
14871502
doNotEagerlyExecute: false,
1503+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
14881504
}),
14891505
])
14901506
);
@@ -1566,6 +1582,7 @@ test('continueAsNewSameWorkflow', async (t) => {
15661582
workflowType,
15671583
taskQueue: 'test',
15681584
arguments: toPayloads(defaultPayloadConverter, 'signal'),
1585+
versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED,
15691586
},
15701587
},
15711588
])

packages/worker/src/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ import { VMWorkflowCreator } from './workflow/vm';
9292
import { WorkflowBundleWithSourceMapAndFilename } from './workflow/workflow-worker-thread/input';
9393
import { GracefulShutdownPeriodExpiredError } from './errors';
9494

95-
import IWorkflowActivationJob = coresdk.workflow_activation.IWorkflowActivationJob;
95+
type IWorkflowActivationJob = coresdk.workflow_activation.IWorkflowActivationJob;
9696

9797
export { DataConverter, defaultPayloadConverter };
9898

packages/workflow/src/interfaces.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
SignalDefinition,
88
QueryDefinition,
99
Duration,
10+
VersioningIntent,
1011
} from '@temporalio/common';
1112
import { checkExtends } from '@temporalio/common/lib/type-helpers';
1213
import type { coresdk } from '@temporalio/proto';
@@ -220,6 +221,13 @@ export interface ContinueAsNewOptions {
220221
* Searchable attributes to attach to next Workflow run
221222
*/
222223
searchAttributes?: SearchAttributes;
224+
/**
225+
* When using the Worker Versioning feature, specifies whether this Workflow should
226+
* Continue-as-New onto a worker with a compatible Build Id or not. See {@link VersioningIntent}.
227+
*
228+
* @experimental
229+
*/
230+
versioningIntent?: VersioningIntent;
223231
}
224232

225233
/**
@@ -322,6 +330,14 @@ export interface ChildWorkflowOptions extends CommonWorkflowOptions {
322330
* @default {@link ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE}
323331
*/
324332
parentClosePolicy?: ParentClosePolicy;
333+
334+
/**
335+
* When using the Worker Versioning feature, specifies whether this Child Workflow should run on
336+
* a worker with a compatible Build Id or not. See {@link VersioningIntent}.
337+
*
338+
* @experimental
339+
*/
340+
versioningIntent?: VersioningIntent;
325341
}
326342

327343
export type RequiredChildWorkflowOptions = Required<Pick<ChildWorkflowOptions, 'workflowId' | 'cancellationType'>> & {

0 commit comments

Comments
 (0)