Skip to content

Commit 2d9b7d1

Browse files
authored
refactor(workflow)!: Move TaskInfo to WorkflowInfo (#761)
1 parent a261054 commit 2d9b7d1

File tree

10 files changed

+76
-79
lines changed

10 files changed

+76
-79
lines changed

packages/test/src/integration-tests.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
681681
searchAttributes: {},
682682
workflowType: 'returnWorkflowInfo',
683683
workflowId,
684+
historyLength: 3,
685+
unsafe: { isReplaying: false },
684686
});
685687
});
686688

packages/test/src/test-sinks.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ if (RUN_INTEGRATION_TESTS) {
100100
memo: undefined,
101101
parent: undefined,
102102
searchAttributes: {},
103+
historyLength: 3,
104+
unsafe: { isReplaying: false },
103105
};
104106

105107
t.deepEqual(recordedCalls, [

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ async function createWorkflow(
9191
taskTimeoutMs: 1000,
9292
taskQueue: 'test',
9393
searchAttributes: {},
94+
historyLength: 3,
95+
unsafe: { isReplaying: false },
9496
},
9597
randomnessSeed: Long.fromInt(1337).toBytes(),
9698
now: startTime,
9799
patches: [],
98-
isReplaying: false,
99-
historyLength: 3,
100100
})) as VMWorkflow;
101101
return workflow;
102102
}

packages/test/src/workflows/log-sink-tester.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@ const { logger } = wf.proxySinks<LoggerSinks>();
1313

1414
export async function logSinkTester(): Promise<void> {
1515
logger.info(
16-
`Workflow execution started, replaying: ${wf.taskInfo().unsafe.isReplaying}, hl: ${wf.taskInfo().historyLength}`
16+
`Workflow execution started, replaying: ${wf.workflowInfo().unsafe.isReplaying}, hl: ${
17+
wf.workflowInfo().historyLength
18+
}`
1719
);
1820
// We rely on the test to run with max cached workflows of 1.
1921
// Executing this child will flush the current workflow from the cache
2022
// causing replay or the first sink call.
2123
await wf.executeChild(successString);
2224
logger.info(
23-
`Workflow execution completed, replaying: ${wf.taskInfo().unsafe.isReplaying}, hl: ${wf.taskInfo().historyLength}`
25+
`Workflow execution completed, replaying: ${wf.workflowInfo().unsafe.isReplaying}, hl: ${
26+
wf.workflowInfo().historyLength
27+
}`
2428
);
2529
}

packages/worker/src/worker.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,10 @@ export class Worker {
10861086
cronSchedule: cronSchedule || undefined,
10871087
// 0 is the default, and not a valid value, since crons are at least a minute apart
10881088
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
1089+
historyLength: activation.historyLength,
1090+
unsafe: {
1091+
isReplaying: activation.isReplaying,
1092+
},
10891093
};
10901094
this.log.trace('Creating workflow', workflowLogAttributes(workflowInfo));
10911095
const patchJobs = activation.jobs.filter((j): j is PatchJob => j.notifyHasPatch != null);
@@ -1103,8 +1107,6 @@ export class Worker {
11031107
randomnessSeed: randomnessSeed.toBytes(),
11041108
now: tsToMs(activation.timestamp),
11051109
patches,
1106-
isReplaying: activation.isReplaying,
1107-
historyLength: activation.historyLength,
11081110
});
11091111
});
11101112

@@ -1521,7 +1523,7 @@ export class Worker {
15211523
}
15221524

15231525
/**
1524-
* {@link run | Run} the Worker until `fnOrPromise` completes.
1526+
* Run the Worker until `fnOrPromise` completes. Then {@link shutdown} and wait for {@link run} to complete.
15251527
*
15261528
* @returns the result of `fnOrPromise`
15271529
*
@@ -1544,11 +1546,14 @@ export class Worker {
15441546
}
15451547

15461548
/**
1547-
* Start polling on tasks, completes after graceful shutdown.
1549+
* Start polling on the Task Queue for tasks. Completes after graceful {@link shutdown}, once the Worker reaches the
1550+
* `'STOPPED'` state.
1551+
*
15481552
* Throws on a fatal error or failure to shutdown gracefully.
1553+
*
15491554
* @see {@link errors}
15501555
*
1551-
* To stop polling call {@link shutdown} or send one of {@link Runtime.options.shutdownSignals}.
1556+
* To stop polling, call {@link shutdown} or send one of {@link Runtime.options.shutdownSignals}.
15521557
*/
15531558
async run(): Promise<void> {
15541559
if (this.state !== 'INITIALIZED') {

packages/worker/src/workflow/vm.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export class VMWorkflowCreator implements WorkflowCreator {
7676
*/
7777
async createWorkflow(options: WorkflowCreateOptions): Promise<Workflow> {
7878
const context = await this.getContext();
79-
const activationContext = { isReplaying: options.isReplaying };
79+
const activationContext = { isReplaying: options.info.unsafe.isReplaying };
8080
this.injectConsole(context, options.info, activationContext);
8181
const { hasSeparateMicrotaskQueue, isolateExecutionTimeoutMs } = this;
8282
const workflowModule: WorkflowModule = new Proxy(

packages/workflow/src/interfaces.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ export interface WorkflowInfo {
2424

2525
/**
2626
* Indexed information attached to the Workflow Execution
27+
*
28+
* This value may change during the lifetime of an Execution.
2729
*/
2830
searchAttributes: SearchAttributes;
2931

@@ -49,6 +51,15 @@ export interface WorkflowInfo {
4951
*/
5052
lastFailure?: TemporalFailure;
5153

54+
/**
55+
* Length of Workflow history up until the current Workflow Task.
56+
*
57+
* This value changes during the lifetime of an Execution.
58+
*
59+
* You may safely use this information to decide when to {@link continueAsNew}.
60+
*/
61+
historyLength: number;
62+
5263
/**
5364
* Task queue this Workflow is executing on
5465
*/
@@ -114,6 +125,17 @@ export interface WorkflowInfo {
114125
* Milliseconds between Cron Runs
115126
*/
116127
cronScheduleToScheduleInterval?: number;
128+
129+
unsafe: UnsafeWorkflowInfo;
130+
}
131+
132+
/**
133+
* Unsafe information about the current Workflow Execution.
134+
*
135+
* Never rely on this information in Workflow logic as it will cause non-deterministic behavior.
136+
*/
137+
export interface UnsafeWorkflowInfo {
138+
isReplaying: boolean;
117139
}
118140

119141
export interface ParentWorkflowInfo {

packages/workflow/src/internals.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -476,16 +476,6 @@ export class State {
476476
*/
477477
public info?: WorkflowInfo;
478478

479-
/**
480-
* Whether a Workflow is replaying history or processing new events
481-
*/
482-
isReplaying?: boolean;
483-
484-
/**
485-
* ID of last WorkflowTaskStarted event
486-
*/
487-
historyLength?: number;
488-
489479
/**
490480
* A deterministic RNG, used by the isolate's overridden Math.random
491481
*/

packages/workflow/src/worker-interface.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ export interface WorkflowCreateOptions {
2222
randomnessSeed: number[];
2323
now: number;
2424
patches: string[];
25-
isReplaying: boolean;
26-
historyLength: number;
2725
}
2826

2927
export interface ImportFunctions {
@@ -108,14 +106,7 @@ export function overrideGlobals(): void {
108106
*
109107
* Sets required internal state and instantiates the workflow and interceptors.
110108
*/
111-
export async function initRuntime({
112-
info,
113-
randomnessSeed,
114-
now,
115-
patches,
116-
isReplaying,
117-
historyLength,
118-
}: WorkflowCreateOptions): Promise<void> {
109+
export async function initRuntime({ info, randomnessSeed, now, patches }: WorkflowCreateOptions): Promise<void> {
119110
const global = globalThis as any;
120111
// Set the runId globally on the context so it can be retrieved in the case
121112
// of an unhandled promise rejection.
@@ -129,9 +120,8 @@ export async function initRuntime({
129120
state.info = info;
130121
state.now = now;
131122
state.random = alea(randomnessSeed);
132-
state.historyLength = historyLength;
133123

134-
if (isReplaying) {
124+
if (info.unsafe.isReplaying) {
135125
for (const patch of patches) {
136126
state.knownPresentPatches.add(patch);
137127
}
@@ -193,8 +183,8 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
193183
if (activation.historyLength == null) {
194184
throw new TypeError('Got activation with no historyLength');
195185
}
196-
state.isReplaying = activation.isReplaying ?? false;
197-
state.historyLength = activation.historyLength;
186+
state.info.unsafe.isReplaying = activation.isReplaying ?? false;
187+
state.info.historyLength = activation.historyLength;
198188
}
199189

200190
// Cast from the interface to the class which has the `variant` attribute.

packages/workflow/src/workflow.ts

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,29 @@ export async function executeChild<T extends Workflow>(
817817
}
818818

819819
/**
820-
* Get information about the current Workflow
820+
* Get information about the current Workflow.
821+
*
822+
* ⚠️ We recommend calling `workflowInfo()` whenever accessing {@link WorkflowInfo} fields. Some WorkflowInfo fields
823+
* change during the lifetime of an Execution—like {@link WorkflowInfo.historyLength} and
824+
* {@link WorkflowInfo.searchAttributes}—and some may be changeable in the future—like {@link WorkflowInfo.taskQueue}.
825+
*
826+
* ```ts
827+
* // GOOD
828+
* function myWorkflow() {
829+
* doSomething(workflowInfo().searchAttributes)
830+
* ...
831+
* doSomethingElse(workflowInfo().searchAttributes)
832+
* }
833+
* ```
834+
*
835+
* ```ts
836+
* // BAD
837+
* function myWorkflow() {
838+
* const attributes = workflowInfo().searchAttributes
839+
* doSomething(attributes)
840+
* ...
841+
* doSomethingElse(attributes)
842+
* }
821843
*/
822844
export function workflowInfo(): WorkflowInfo {
823845
if (state.info === undefined) {
@@ -1037,7 +1059,10 @@ function patchInternal(patchId: string, deprecated: boolean): boolean {
10371059
if (state.workflow === undefined) {
10381060
throw new IllegalStateError('Patches cannot be used before Workflow starts');
10391061
}
1040-
const usePatch = !state.isReplaying || state.knownPresentPatches.has(patchId);
1062+
if (state.info === undefined) {
1063+
throw new IllegalStateError('Workflow uninitialized');
1064+
}
1065+
const usePatch = !state.info.unsafe.isReplaying || state.knownPresentPatches.has(patchId);
10411066
// Avoid sending commands for patches core already knows about.
10421067
// This optimization enables development of automatic patching tools.
10431068
if (usePatch && !state.sentPatches.has(patchId)) {
@@ -1219,47 +1244,4 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void
12191244
state.info.searchAttributes = mergedSearchAttributes;
12201245
}
12211246

1222-
/**
1223-
* Unsafe information about the currently executing Workflow Task.
1224-
*
1225-
* Never rely on this information in Workflow logic as it will cause non-deterministic behavior.
1226-
*/
1227-
export interface UnsafeTaskInfo {
1228-
isReplaying: boolean;
1229-
}
1230-
1231-
/**
1232-
* Information about the currently executing Workflow Task.
1233-
*
1234-
* Meant for advanced usage.
1235-
*/
1236-
export interface TaskInfo {
1237-
/**
1238-
* Length of Workflow history up until the current Workflow Task.
1239-
*
1240-
* You may safely use this information to decide when to {@link continueAsNew}.
1241-
*/
1242-
historyLength: number;
1243-
unsafe: UnsafeTaskInfo;
1244-
}
1245-
1246-
/**
1247-
* Get information about the currently executing Workflow Task.
1248-
*
1249-
* See {@link TaskInfo}
1250-
*/
1251-
export function taskInfo(): TaskInfo {
1252-
const { isReplaying, historyLength } = state;
1253-
if (isReplaying == null || historyLength == null) {
1254-
throw new IllegalStateError('Workflow uninitialized');
1255-
}
1256-
1257-
return {
1258-
historyLength,
1259-
unsafe: {
1260-
isReplaying,
1261-
},
1262-
};
1263-
}
1264-
12651247
export const stackTraceQuery = defineQuery<string>('__stack_trace');

0 commit comments

Comments
 (0)