Skip to content

Commit 411a6fe

Browse files
authored
fix(workflow): Fix multiple context leaks in reuseV8Context executor (#1605)
1 parent d8925d0 commit 411a6fe

File tree

8 files changed

+619
-198
lines changed

8 files changed

+619
-198
lines changed

packages/common/src/type-helpers.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -218,26 +218,3 @@ export function SymbolBasedInstanceOfError<E extends Error>(markerName: string):
218218
});
219219
};
220220
}
221-
222-
// Thanks MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
223-
export function deepFreeze<T>(object: T): T {
224-
// Retrieve the property names defined on object
225-
const propNames = Object.getOwnPropertyNames(object);
226-
227-
// Freeze properties before freezing self
228-
for (const name of propNames) {
229-
const value = (object as any)[name];
230-
231-
if (value && typeof value === 'object') {
232-
try {
233-
deepFreeze(value);
234-
} catch (_err) {
235-
// This is okay, there are some typed arrays that cannot be frozen (encodingKeys)
236-
}
237-
} else if (typeof value === 'function') {
238-
Object.freeze(value);
239-
}
240-
}
241-
242-
return Object.freeze(object);
243-
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
import { randomUUID } from 'crypto';
2+
import Long from 'long';
3+
import { msToTs } from '@temporalio/common/lib/time';
4+
import { coresdk } from '@temporalio/proto';
5+
import { ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm';
6+
import { WorkflowCodeBundler } from '@temporalio/worker/lib/workflow/bundler';
7+
import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
8+
import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/vm';
9+
import * as wf from '@temporalio/workflow';
10+
11+
// WARNING: This file is a quick and dirty utility to run Workflow Activation performance testing
12+
// localy. It is not part of our regular test suite and hasn't been reviewed.
13+
14+
function isSet(env: string | undefined, def: boolean): boolean {
15+
if (env === undefined) return def;
16+
env = env.toLocaleLowerCase();
17+
return env === '1' || env === 't' || env === 'true';
18+
}
19+
20+
export const REUSE_V8_CONTEXT = wf.inWorkflowContext() || isSet(process.env.REUSE_V8_CONTEXT, true);
21+
22+
export const bundlerOptions = {
23+
// This is a bit ugly but it does the trick, when a test that includes workflow
24+
// code tries to import a forbidden workflow module, add it to this list:
25+
ignoreModules: [
26+
'@temporalio/common/lib/internal-non-workflow',
27+
'@temporalio/activity',
28+
'@temporalio/client',
29+
'@temporalio/testing',
30+
'@temporalio/worker',
31+
'@temporalio/proto',
32+
'inspector',
33+
'ava',
34+
'crypto',
35+
'timers/promises',
36+
'fs',
37+
'module',
38+
'path',
39+
'perf_hooks',
40+
'stack-utils',
41+
'@grpc/grpc-js',
42+
'async-retry',
43+
'uuid',
44+
'net',
45+
'fs/promises',
46+
'@temporalio/worker/lib/workflow/bundler',
47+
require.resolve('./activities'),
48+
],
49+
};
50+
51+
export interface Context {
52+
workflowCreator: VMWorkflowCreator | ReusableVMWorkflowCreator;
53+
}
54+
55+
if (!wf.inWorkflowContext()) {
56+
// eslint-disable-next-line no-inner-declarations
57+
async function runPerfTest() {
58+
const bundler = new WorkflowCodeBundler({
59+
workflowsPath: __filename,
60+
ignoreModules: [...bundlerOptions.ignoreModules],
61+
});
62+
63+
const workflowBundle = parseWorkflowCode((await bundler.createBundle()).code);
64+
65+
const workflowCreator = REUSE_V8_CONTEXT
66+
? await ReusableVMWorkflowCreator.create(workflowBundle, 400, new Set())
67+
: await VMWorkflowCreator.create(workflowBundle, 400, new Set());
68+
69+
async function createWorkflow(workflowType: wf.Workflow): Promise<{ workflow: VMWorkflow; info: wf.WorkflowInfo }> {
70+
const startTime = Date.now();
71+
const runId = randomUUID(); // That one is using a strong entropy; could this slow doen our tests?
72+
73+
const info: wf.WorkflowInfo = {
74+
workflowType: workflowType.name,
75+
runId,
76+
workflowId: 'test-workflowId',
77+
namespace: 'default',
78+
firstExecutionRunId: runId,
79+
attempt: 1,
80+
taskTimeoutMs: 1000,
81+
taskQueue: 'test',
82+
searchAttributes: {},
83+
historyLength: 3,
84+
historySize: 300,
85+
continueAsNewSuggested: false,
86+
unsafe: { isReplaying: false, now: Date.now },
87+
startTime: new Date(),
88+
runStartTime: new Date(),
89+
};
90+
91+
const workflow = (await workflowCreator.createWorkflow({
92+
info,
93+
randomnessSeed: Long.fromInt(1337).toBytes(),
94+
now: startTime,
95+
showStackTraceSources: true,
96+
})) as VMWorkflow;
97+
98+
return { workflow, info };
99+
}
100+
101+
async function activate(workflow: VMWorkflow, activation: coresdk.workflow_activation.IWorkflowActivation) {
102+
// Core guarantees the following jobs ordering:
103+
// initWf -> patches -> update random seed -> signals+update -> others -> Resolve LA
104+
// reference: github.com/temporalio/sdk-core/blob/a8150d5c7c3fc1bfd5a941fd315abff1556cd9dc/core/src/worker/workflow/mod.rs#L1363-L1378
105+
// Tests are likely to fail if we artifically make an activation that does not follow that order
106+
const jobs: coresdk.workflow_activation.IWorkflowActivationJob[] = activation.jobs ?? [];
107+
function getPriority(job: coresdk.workflow_activation.IWorkflowActivationJob) {
108+
if (job.initializeWorkflow) return 0;
109+
if (job.notifyHasPatch) return 1;
110+
if (job.updateRandomSeed) return 2;
111+
if (job.signalWorkflow || job.doUpdate) return 3;
112+
if (job.resolveActivity && job.resolveActivity.isLocal) return 5;
113+
return 4;
114+
}
115+
jobs.reduce((prevPriority: number, currJob) => {
116+
const currPriority = getPriority(currJob);
117+
if (prevPriority > currPriority) {
118+
throw new Error('Jobs are not correctly sorted');
119+
}
120+
return currPriority;
121+
}, 0);
122+
123+
const completion = await workflow.activate(coresdk.workflow_activation.WorkflowActivation.fromObject(activation));
124+
const sinkCalls = await workflow.getAndResetSinkCalls();
125+
126+
return { completion, sinkCalls };
127+
}
128+
129+
function makeActivation(
130+
info: wf.WorkflowInfo,
131+
timestamp: number = Date.now(),
132+
...jobs: coresdk.workflow_activation.IWorkflowActivationJob[]
133+
): coresdk.workflow_activation.IWorkflowActivation {
134+
return {
135+
runId: info.runId,
136+
timestamp: msToTs(timestamp),
137+
jobs,
138+
};
139+
}
140+
function makeStartWorkflow(info: wf.WorkflowInfo): coresdk.workflow_activation.IWorkflowActivation {
141+
const timestamp = Date.now();
142+
return makeActivation(info, timestamp, makeInitializeWorkflowJob(info));
143+
}
144+
145+
function makeInitializeWorkflowJob(info: wf.WorkflowInfo): {
146+
initializeWorkflow: coresdk.workflow_activation.IInitializeWorkflow;
147+
} {
148+
return {
149+
initializeWorkflow: { workflowId: info.workflowId, workflowType: info.workflowType, arguments: [] },
150+
};
151+
}
152+
153+
function makeFireTimer(
154+
info: wf.WorkflowInfo,
155+
seq: number,
156+
timestamp: number = Date.now()
157+
): coresdk.workflow_activation.IWorkflowActivation {
158+
return makeActivation(info, timestamp, makeFireTimerJob(seq));
159+
}
160+
161+
function makeFireTimerJob(seq: number): coresdk.workflow_activation.IWorkflowActivationJob {
162+
return {
163+
fireTimer: { seq },
164+
};
165+
}
166+
167+
const workflows = [];
168+
for (let i = 0; i < 5; i++) {
169+
const { workflow, info } = await createWorkflow(xxxWorkflow);
170+
let lastCompletion = await activate(workflow, makeStartWorkflow(info));
171+
172+
// eslint-disable-next-line no-inner-declarations
173+
function getTimerSeq(): number {
174+
const startTimerCommand = lastCompletion.completion.successful?.commands?.filter((c) => c.startTimer)[0];
175+
return startTimerCommand?.startTimer?.seq || 0;
176+
}
177+
178+
// eslint-disable-next-line no-inner-declarations
179+
async function doActivate() {
180+
lastCompletion = await activate(workflow, makeFireTimer(info, getTimerSeq()));
181+
}
182+
183+
workflows.push({ doActivate });
184+
}
185+
186+
const startTime = Date.now();
187+
for (let i = 1; i <= 50_000; i++) {
188+
await workflows[Math.floor(Math.random() * workflows.length)].doActivate();
189+
if (i % 10_000 === 0) {
190+
console.log(` ${i}: ${Math.round(((Date.now() - startTime) / i) * 1000)}us per activation`);
191+
}
192+
}
193+
}
194+
195+
runPerfTest()
196+
.catch((err) => {
197+
console.error(err);
198+
})
199+
.finally(() => {});
200+
}
201+
202+
export async function xxxWorkflow(): Promise<void> {
203+
// We don't care about history size, as this workflow is only to be used with synthetic activations
204+
for (;;) {
205+
await wf.sleep(1);
206+
}
207+
}

0 commit comments

Comments
 (0)