Skip to content

Commit aab4149

Browse files
feat(workflow): Add historySizeInBytes and continueAsNewSuggested to WorkflowInfo (#695) (#1223)
## What was changed <!-- Describe what has changed in this PR --> Makes visible to workflows the new info fields historySizeInBytes and continueAsNewSuggested ## Why? Makes it easier for workflow code to decide when to continueAsNew() before history becomes too large ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> #695 2. How was this tested: A new system test and unit test added 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io --> jsdoc of the new fields
1 parent be98c23 commit aab4149

File tree

12 files changed

+119
-14
lines changed

12 files changed

+119
-14
lines changed

packages/client/src/helpers.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ export async function executionInfoFromRaw<T>(
5959
},
6060
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
6161
historyLength: raw.historyLength!.toNumber(),
62+
// Exact truncation for multi-petabyte histories
63+
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
64+
historySize: raw.historySizeBytes?.toNumber() || undefined,
6265
startTime: tsToDate(raw.startTime!),
6366
executionTime: optionalTsToDate(raw.executionTime),
6467
closeTime: optionalTsToDate(raw.closeTime),

packages/client/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ export interface WorkflowExecutionInfo {
3636
taskQueue: string;
3737
status: { code: proto.temporal.api.enums.v1.WorkflowExecutionStatus; name: WorkflowExecutionStatusName };
3838
historyLength: number;
39+
/**
40+
 * Size of Workflow history in bytes.
41+
 *
42+
 * This value is only available in server versions >= 1.20
43+
 */
44+
historySize?: number;
3945
startTime: Date;
4046
executionTime?: Date;
4147
closeTime?: Date;

packages/common/src/time.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
// eslint-disable-next-line import/no-named-as-default
2-
import Long from 'long';
1+
import Long from 'long'; // eslint-disable-line import/no-named-as-default
32
import ms, { StringValue } from 'ms';
43
import type { google } from '@temporalio/proto';
54
import { ValueError } from './errors';

packages/test/src/integration-tests.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
680680
},
681681
});
682682
const result = await workflow.result();
683+
t.assert(result.historySize > 300);
683684
t.deepEqual(result, {
684685
memo: {
685686
nested: { object: true },
@@ -694,6 +695,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
694695
workflowType: 'returnWorkflowInfo',
695696
workflowId,
696697
historyLength: 3,
698+
continueAsNewSuggested: false,
699+
// values ignored for the purpose of comparison
700+
historySize: result.historySize,
697701
startTime: result.startTime,
698702
runStartTime: result.runStartTime,
699703
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast

packages/test/src/test-integration-workflows.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,52 @@ test('Condition 0 patch sets a timer', async (t) => {
216216
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
217217
});
218218

219+
export async function historySizeGrows(): Promise<[number, number]> {
220+
const before = workflow.workflowInfo().historySize;
221+
await workflow.sleep(1);
222+
const after = workflow.workflowInfo().historySize;
223+
return [before, after];
224+
}
225+
226+
test('HistorySize grows with new WFT', async (t) => {
227+
const { createWorker, executeWorkflow } = helpers(t);
228+
const worker = await createWorker();
229+
const [before, after] = await worker.runUntil(executeWorkflow(historySizeGrows));
230+
t.true(after > before && before > 100);
231+
});
232+
233+
test('HistorySize is visible in WorkflowExecutionInfo', async (t) => {
234+
const { createWorker, startWorkflow } = helpers(t);
235+
const worker = await createWorker();
236+
const handle = await startWorkflow(historySizeGrows);
237+
238+
await worker.runUntil(handle.result());
239+
const historySize = (await handle.describe()).historySize;
240+
t.true(historySize && historySize > 100);
241+
});
242+
243+
export async function suggestedCAN(): Promise<boolean> {
244+
const maxEvents = 40_000;
245+
const batchSize = 100;
246+
if (workflow.workflowInfo().continueAsNewSuggested) {
247+
return false;
248+
}
249+
while (workflow.workflowInfo().historyLength < maxEvents) {
250+
await Promise.all(new Array(batchSize).fill(undefined).map((_) => workflow.sleep(1)));
251+
if (workflow.workflowInfo().continueAsNewSuggested) {
252+
return true;
253+
}
254+
}
255+
return false;
256+
}
257+
258+
test('ContinueAsNew is suggested', async (t) => {
259+
const { createWorker, executeWorkflow } = helpers(t);
260+
const worker = await createWorker();
261+
const flaggedCAN = await worker.runUntil(executeWorkflow(suggestedCAN));
262+
t.true(flaggedCAN);
263+
});
264+
219265
test('Activity initialInterval is not getting rounded', async (t) => {
220266
const { createWorker, startWorkflow } = helpers(t);
221267
const worker = await createWorker({

packages/test/src/test-sinks.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,9 @@ if (RUN_INTEGRATION_TESTS) {
5555
fn: string;
5656
}
5757

58-
const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
5958
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
6059
delete (input.unsafe as any).now;
61-
return {
62-
...input,
63-
startTime: dummyDate,
64-
runStartTime: dummyDate,
65-
};
60+
return input;
6661
}
6762

6863
const recordedCalls: RecordedCall[] = [];
@@ -112,6 +107,11 @@ if (RUN_INTEGRATION_TESTS) {
112107
await wf.result();
113108
return wf;
114109
});
110+
111+
// Capture volatile values that are hard to predict
112+
const { historySize, startTime, runStartTime } = recordedCalls[0].info;
113+
t.true(historySize > 300);
114+
115115
const info: WorkflowInfo = {
116116
namespace: 'default',
117117
firstExecutionRunId: wf.firstExecutionRunId,
@@ -134,8 +134,11 @@ if (RUN_INTEGRATION_TESTS) {
134134
parent: undefined,
135135
searchAttributes: {},
136136
historyLength: 3,
137-
startTime: dummyDate,
138-
runStartTime: dummyDate,
137+
continueAsNewSuggested: false,
138+
// values ignored for the purpose of comparison
139+
historySize,
140+
startTime,
141+
runStartTime,
139142
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
140143
unsafe: {
141144
isReplaying: false,

packages/test/src/test-workflows.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ async function createWorkflow(
111111
taskQueue: 'test',
112112
searchAttributes: {},
113113
historyLength: 3,
114+
historySize: 300,
115+
continueAsNewSuggested: false,
114116
unsafe: { isReplaying: false, now: Date.now },
115117
startTime: new Date(),
116118
runStartTime: new Date(),
@@ -353,6 +355,14 @@ test('successString', async (t) => {
353355
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))]));
354356
});
355357

358+
test('continueAsNewSuggested', async (t) => {
359+
const { workflowType } = t.context;
360+
const activation = makeStartWorkflow(workflowType);
361+
activation.continueAsNewSuggested = true;
362+
const req = await activate(t, activation);
363+
compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(true))]));
364+
});
365+
356366
function cleanWorkflowFailureStackTrace(
357367
req: coresdk.workflow_completion.IWorkflowActivationCompletion,
358368
commandIndex = 0
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { workflowInfo } from '@temporalio/workflow';
2+
3+
export async function continueAsNewSuggested(): Promise<boolean> {
4+
return workflowInfo().continueAsNewSuggested;
5+
}

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export * from './condition';
2828
export * from './condition-completion-race';
2929
export * from './condition-timeout-0';
3030
export * from './continue-as-new-same-workflow';
31+
export * from './continue-as-new-suggested';
3132
export * from './continue-as-new-to-different-workflow';
3233
export * from './core-issue-589';
3334
export * from './date';

packages/worker/src/worker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,6 +1193,10 @@ export class Worker {
11931193
// 0 is the default, and not a valid value, since crons are at least a minute apart
11941194
cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined,
11951195
historyLength: activation.historyLength,
1196+
// Exact truncation for multi-petabyte histories
1197+
// A zero value means that it was not set by the server
1198+
historySize: activation.historySizeBytes.toNumber(),
1199+
continueAsNewSuggested: activation.continueAsNewSuggested,
11961200
unsafe: {
11971201
now: () => Date.now(), // re-set in initRuntime
11981202
isReplaying: activation.isReplaying,

packages/workflow/src/interfaces.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,27 @@ export interface WorkflowInfo {
7070
*/
7171
historyLength: number;
7272

73+
/**
74+
* Size of Workflow history in bytes until the current Workflow Task.
75+
*
76+
* This value changes during the lifetime of an Execution.
77+
*
78+
* Supported only on Temporal Server 1.20+, always zero on older servers.
79+
*
80+
* You may safely use this information to decide when to {@link continueAsNew}.
81+
*/
82+
historySize: number;
83+
84+
/**
85+
* A hint provided by the current WorkflowTaskStarted event recommending whether to
86+
* {@link continueAsNew}.
87+
*
88+
* This value changes during the lifetime of an Execution.
89+
*
90+
* Supported only on Temporal Server 1.20+, always `false` on older servers.
91+
*/
92+
continueAsNewSuggested: boolean;
93+
7394
/**
7495
* Task queue this Workflow is executing on
7596
*/

packages/workflow/src/worker-interface.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,14 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
186186
// timestamp will not be updated for activation that contain only queries
187187
activator.now = tsToMs(activation.timestamp);
188188
}
189-
if (activation.historyLength == null) {
190-
throw new TypeError('Got activation with no historyLength');
191-
}
189+
190+
// The Rust Core ensures that these activation fields are not null
192191
activator.info.unsafe.isReplaying = activation.isReplaying ?? false;
193-
activator.info.historyLength = activation.historyLength;
192+
activator.info.historyLength = activation.historyLength as number;
193+
// Exact truncation for multi-petabyte histories
194+
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
195+
activator.info.historySize = activation.historySizeBytes?.toNumber() || 0;
196+
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested ?? false;
194197
}
195198

196199
// Cast from the interface to the class which has the `variant` attribute.

0 commit comments

Comments
 (0)