Skip to content

Commit a2c67bd

Browse files
authored
fix(workflow): Fix isReplaying incorrectly true in signal-evict-query scenario (#1234)
1 parent aab4149 commit a2c67bd

File tree

6 files changed

+109
-4
lines changed

6 files changed

+109
-4
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import crypto from 'node:crypto';
2+
import test from 'ava';
3+
import * as wf from '@temporalio/workflow';
4+
import { TestWorkflowEnvironment } from '@temporalio/testing';
5+
import { Worker } from './helpers';
6+
import * as workflows from './workflows/signal-query-patch-pre-patch';
7+
8+
test('Signal+Query+Patch does not cause non-determinism error on replay', async (t) => {
9+
const env = await TestWorkflowEnvironment.createLocal();
10+
try {
11+
const workflowId = crypto.randomUUID();
12+
13+
// Create the first worker with pre-patched version of the workflow
14+
const worker1 = await Worker.create({
15+
connection: env.nativeConnection,
16+
taskQueue: 'signal-query-patch',
17+
workflowsPath: require.resolve('./workflows/signal-query-patch-pre-patch'),
18+
19+
// Avoid waiting for sticky execution timeout on worker transition
20+
stickyQueueScheduleToStartTimeout: '1s',
21+
});
22+
23+
// Start the workflow, wait for the first task to be processed, then send it a signal and wait for it to be completed
24+
const handle = await worker1.runUntil(async () => {
25+
const handle = await env.client.workflow.start(workflows.patchQuerySignal, {
26+
taskQueue: 'signal-query-patch',
27+
workflowId,
28+
});
29+
await handle.signal(wf.defineSignal('signal'));
30+
31+
// Make sure the signal got processed before we shutdown the worker
32+
await handle.query('query');
33+
return handle;
34+
});
35+
36+
// Create the second worker with post-patched version of the workflow
37+
const worker2 = await Worker.create({
38+
connection: env.nativeConnection,
39+
taskQueue: 'signal-query-patch',
40+
workflowsPath: require.resolve('./workflows/signal-query-patch-post-patch'),
41+
});
42+
43+
// Trigger a query and wait for it to be processed
44+
const enteredPatchBlock = await worker2.runUntil(async () => {
45+
await handle.query('query');
46+
await handle.signal('unblock');
47+
48+
return await handle.result();
49+
});
50+
51+
t.false(enteredPatchBlock);
52+
} finally {
53+
await env.teardown();
54+
}
55+
});

packages/test/src/test-sinks.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,11 +444,14 @@ if (RUN_INTEGRATION_TESTS) {
444444
stickyQueueScheduleToStartTimeout: 1,
445445
};
446446

447-
await (await Worker.create(workerOptions)).runUntil(new Promise((resolve) => setTimeout(resolve, 1000)));
447+
// Start the first worker and wait for the first task to complete before shutdown that worker
448+
await (await Worker.create(workerOptions)).runUntil(handle.query('q'));
449+
450+
// Start the second worker
448451
await (
449452
await Worker.create(workerOptions)
450453
).runUntil(async () => {
451-
await handle.query('q').catch(() => undefined);
454+
await handle.query('q');
452455
await handle.signal(workflows.unblockSignal);
453456
await handle.result();
454457
});
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Post-patched version of a workflow used to reproduce an issue with signal+query+patch
3+
*
4+
* @module
5+
*/
6+
import * as wf from '@temporalio/workflow';
7+
8+
export async function patchQuerySignal(): Promise<boolean> {
9+
let enteredPatchBlock = false;
10+
11+
wf.setHandler(wf.defineQuery<boolean, [string]>('query'), () => true);
12+
wf.setHandler(wf.defineSignal('signal'), () => {
13+
// This block should not execute, since the patch did not get registered when the signal was first handled.
14+
if (wf.patched('should_never_be_set')) {
15+
enteredPatchBlock = true;
16+
}
17+
});
18+
19+
let blocked = true;
20+
wf.setHandler(wf.defineSignal('unblock'), () => {
21+
blocked = false;
22+
});
23+
await wf.condition(() => !blocked);
24+
25+
return enteredPatchBlock;
26+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Pre-patched version of a workflow used to reproduce an issue with signal+query+patch
3+
*
4+
* @module
5+
*/
6+
import * as wf from '@temporalio/workflow';
7+
8+
export async function patchQuerySignal(): Promise<boolean> {
9+
wf.setHandler(wf.defineQuery<boolean, [string]>('query'), () => true);
10+
wf.setHandler(wf.defineSignal('signal'), () => {
11+
// Nothing. In post-patch version, there will be a block here, surrounded by a patch check.
12+
});
13+
14+
let blocked = true;
15+
wf.setHandler(wf.defineSignal('unblock'), () => {
16+
blocked = false;
17+
});
18+
await wf.condition(() => !blocked);
19+
20+
throw new Error('Execution should not reach this point in pre-patch version');
21+
}

packages/workflow/src/workflow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ export function proxyLocalActivities<A = UntypedActivities>(options: LocalActivi
573573

574574
// TODO: deprecate this patch after "enough" time has passed
575575
const EXTERNAL_WF_CANCEL_PATCH = '__temporal_internal_connect_external_handle_cancel_to_scope';
576-
// This generic name of this patch comes from an attempt to build a generic internal patching mechanism.
576+
// The name of this patch comes from an attempt to build a generic internal patching mechanism.
577577
// That effort has been abandoned in favor of a newer WorkflowTaskCompletedMetadata based mechanism.
578578
const CONDITION_0_PATCH = '__sdk_internal_patch_number:1';
579579

0 commit comments

Comments
 (0)