Skip to content

Commit 0239b51

Browse files
authored
fix(workflow): Fix non-determinism on replay when using a patched statement in a condition (#859)
1 parent 9d0fb5c commit 0239b51

File tree

7 files changed

+120
-10
lines changed

7 files changed

+120
-10
lines changed

packages/test/src/load/worker.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ async function main() {
155155
activityInbound: [() => new ConnectionInjectorInterceptor(clientConnection)],
156156
},
157157
});
158-
console.log('Created worker with options', worker.options);
159158

160159
await withOptionalStatusServer(worker, statusPort, async () => {
161160
await worker.run();
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import crypto from 'node:crypto';
2+
import test from 'ava';
3+
import { Worker } from '@temporalio/worker';
4+
import { WorkflowClient } from '@temporalio/client';
5+
import { RUN_INTEGRATION_TESTS } from './helpers';
6+
import * as workflows from './workflows/patch-and-condition-pre-patch';
7+
8+
if (RUN_INTEGRATION_TESTS) {
9+
test('Patch in condition does not cause non-determinism error on replay', async (t) => {
10+
const client = new WorkflowClient();
11+
const workflowId = crypto.randomUUID();
12+
13+
// Create the first worker with pre-patched version of the workflow
14+
const worker1 = await Worker.create({
15+
taskQueue: 'patch-in-condition',
16+
workflowsPath: require.resolve('./workflows/patch-and-condition-pre-patch'),
17+
});
18+
19+
// Start the workflow and wait for the first task to be processed
20+
const handle = await worker1.runUntil(async () => {
21+
const handle = await client.start(workflows.patchInCondition, {
22+
taskQueue: 'patch-in-condition',
23+
workflowId,
24+
workflowTaskTimeout: '1m', // Give our local activities enough time to run in CI
25+
});
26+
await handle.query('__stack_trace');
27+
return handle;
28+
});
29+
30+
// Create the second worker with post-patched version of the workflow
31+
const worker2 = await Worker.create({
32+
taskQueue: 'patch-in-condition',
33+
workflowsPath: require.resolve('./workflows/patch-and-condition-post-patch'),
34+
});
35+
36+
// Trigger a signal and wait for it to be processed
37+
await worker2.runUntil(async () => {
38+
await handle.signal(workflows.generateCommandSignal);
39+
await handle.query('__stack_trace');
40+
});
41+
42+
// Create the third worker that is identical to the second one
43+
const worker3 = await Worker.create({
44+
taskQueue: 'patch-in-condition',
45+
workflowsPath: require.resolve('./workflows/patch-and-condition-post-patch'),
46+
});
47+
48+
// Trigger a workflow task that will cause replay.
49+
await worker3.runUntil(async () => {
50+
await handle.signal(workflows.generateCommandSignal);
51+
await handle.result();
52+
});
53+
54+
// If the workflow completes, commands are generated in the right order and it is safe to use a patched statement
55+
// inside a condition.
56+
t.pass();
57+
});
58+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Post-patched version of a workflow used to reproduce an issue with patch inside a condition.
3+
*
4+
* Uses a patched statement inside a condition to replicate a bug where the runtime could generate out of order
5+
* commands on replay.
6+
*
7+
* @module
8+
*/
9+
import * as wf from '@temporalio/workflow';
10+
import { generateCommandSignal } from './patch-and-condition-pre-patch';
11+
12+
/**
13+
* Patches the workflow from ./patch-and-condition-pre-patch, adds a patched statement inside a condition.
14+
*
15+
*/
16+
export async function patchInCondition(): Promise<void> {
17+
// The signal handler here is important for the repro.
18+
// We use it so the workflow generates a command that will conflict with the patch.
19+
wf.setHandler(generateCommandSignal, async () => {
20+
// Ignore completion, it's irrelevant, just generate a command
21+
await wf.sleep('1s');
22+
});
23+
24+
// Note that the condition always returns false, we don't want it to resolve the promise, just to using it to test the
25+
// edge case.
26+
await Promise.race([wf.sleep('5s'), wf.condition(() => wf.patched('irrelevant') && false)]);
27+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Pre-patched version of a workflow used to reproduce an issue with patch inside a condition.
3+
*
4+
* @module
5+
*/
6+
import * as wf from '@temporalio/workflow';
7+
8+
export const generateCommandSignal = wf.defineSignal('generate-command');
9+
10+
/**
11+
* Unpatched version of the workflow - just sleep and set up our signal handler
12+
*/
13+
export async function patchInCondition(): Promise<void> {
14+
// The signal handler here is important for the repro.
15+
// We use it so the workflow generates a command that will conflict with the patch.
16+
wf.setHandler(generateCommandSignal, async () => {
17+
// Ignore completion, it's irrelevant, just generate a command
18+
await wf.sleep('1s');
19+
});
20+
21+
await wf.sleep('5s');
22+
}

packages/worker/src/workflow/bundler.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import crypto from 'crypto';
21
import * as realFS from 'fs';
32
import * as memfs from 'memfs';
43
import { builtinModules } from 'module';
@@ -121,11 +120,11 @@ export class WorkflowCodeBundler {
121120
if (stat.isFile()) {
122121
// workflowsPath is a file; make the entrypoint a sibling of that file
123122
const { root, dir, name } = path.parse(workflowsPath);
124-
return path.format({ root, dir, base: `${name}-entrypoint-${crypto.randomBytes(8).toString('hex')}.js` });
123+
return path.format({ root, dir, base: `${name}-autogenerated-entrypoint.js` });
125124
} else {
126125
// workflowsPath is a directory; make the entrypoint a sibling of that directory
127126
const { root, dir, base } = path.parse(workflowsPath);
128-
return path.format({ root, dir, base: `${base}-entrypoint-${crypto.randomBytes(8).toString('hex')}.js` });
127+
return path.format({ root, dir, base: `${base}-autogenerated-entrypoint.js` });
129128
}
130129
}
131130

@@ -232,7 +231,7 @@ export { api };
232231
devtool: 'inline-source-map',
233232
output: {
234233
path: distDir,
235-
filename: 'workflow-isolate-[fullhash].js',
234+
filename: 'workflow-bundle-[fullhash].js',
236235
devtoolModuleFilenameTemplate: '[absolute-resource-path]',
237236
library: '__TEMPORAL__',
238237
},

packages/worker/src/workflow/vm.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { cutoffStackTrace, IllegalStateError } from '@temporalio/common';
22
import { coresdk } from '@temporalio/proto';
33
import { WorkflowInfo, FileLocation } from '@temporalio/workflow';
44
import { SinkCall } from '@temporalio/workflow/lib/sinks';
5-
import type * as internals from '@temporalio/workflow/lib/worker-interface';
5+
import * as internals from '@temporalio/workflow/lib/worker-interface';
66
import assert from 'assert';
77
import { AsyncLocalStorage } from 'async_hooks';
88
import semver from 'semver';
@@ -397,8 +397,7 @@ export class VMWorkflow implements Workflow {
397397
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
398398
batchIndex++
399399
);
400-
// Only trigger conditions for non-query jobs
401-
if (!jobs[0].queryWorkflow) {
400+
if (internals.showUnblockConditions(jobs[0])) {
402401
await this.tryUnblockConditions();
403402
}
404403
}

packages/workflow/src/worker-interface.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,7 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
222222
return;
223223
}
224224
state.activator[job.variant](variant as any /* TS can't infer this type */);
225-
// Only trigger conditions for non-query jobs
226-
if (!job.queryWorkflow) {
225+
if (showUnblockConditions(job)) {
227226
tryUnblockConditions();
228227
}
229228
}
@@ -279,6 +278,13 @@ export function tryUnblockConditions(): number {
279278
return numUnblocked;
280279
}
281280

281+
/**
282+
* Predicate used to prevent triggering conditions for non-query and non-patch jobs.
283+
*/
284+
export function showUnblockConditions(job: coresdk.workflow_activation.IWorkflowActivationJob): boolean {
285+
return !job.queryWorkflow && !job.notifyHasPatch;
286+
}
287+
282288
export async function dispose(): Promise<void> {
283289
const dispose = composeInterceptors(state.interceptors.internals, 'dispose', async () => {
284290
storage.disable();

0 commit comments

Comments
 (0)