Skip to content

Commit 51c513a

Browse files
authored
fix(workflow): Fix isReplaying being incorrectly true due to query (#1211)
1 parent 634f2ec commit 51c513a

File tree

4 files changed

+77
-1
lines changed

4 files changed

+77
-1
lines changed

packages/test/src/test-sinks.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
Runtime,
99
LoggerSinks as DefaultLoggerSinks,
1010
InjectedSinkFunction,
11+
WorkerOptions,
1112
} from '@temporalio/worker';
1213
import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow';
1314
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
@@ -406,4 +407,56 @@ if (RUN_INTEGRATION_TESTS) {
406407
},
407408
]);
408409
});
410+
411+
test('Core issue 589', async (t) => {
412+
const taskQueue = `${__filename}-${t.title}`;
413+
414+
const recordedMessages = Array<{ message: string; historyLength: number; isReplaying: boolean }>();
415+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
416+
customLogger: {
417+
info: {
418+
fn: async (info, message) => {
419+
recordedMessages.push({
420+
message,
421+
historyLength: info.historyLength,
422+
isReplaying: info.unsafe.isReplaying,
423+
});
424+
},
425+
callDuringReplay: true,
426+
},
427+
},
428+
};
429+
430+
const client = new WorkflowClient();
431+
const handle = await client.start(workflows.coreIssue589, { taskQueue, workflowId: uuid4() });
432+
433+
const workerOptions: WorkerOptions = {
434+
...defaultOptions,
435+
taskQueue,
436+
sinks,
437+
maxCachedWorkflows: 2,
438+
maxConcurrentWorkflowTaskExecutions: 2,
439+
440+
// Cut down on execution time
441+
stickyQueueScheduleToStartTimeout: 1,
442+
};
443+
444+
await (await Worker.create(workerOptions)).runUntil(new Promise((resolve) => setTimeout(resolve, 1000)));
445+
await (
446+
await Worker.create(workerOptions)
447+
).runUntil(async () => {
448+
await handle.query('q').catch(() => undefined);
449+
await handle.signal(workflows.unblockSignal);
450+
await handle.result();
451+
});
452+
453+
const checkpointEntries = recordedMessages.filter((m) => m.message.startsWith('Checkpoint'));
454+
t.deepEqual(checkpointEntries, [
455+
{
456+
message: 'Checkpoint, replaying: false, hl: 8',
457+
historyLength: 8,
458+
isReplaying: false,
459+
},
460+
]);
461+
});
409462
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import * as wf from '@temporalio/workflow';
2+
import { CustomLoggerSinks } from './log-sink-tester';
3+
import { unblockSignal } from './definitions';
4+
5+
const { customLogger } = wf.proxySinks<CustomLoggerSinks>();
6+
7+
// Demo for https://github.com/temporalio/sdk-core/issues/589
8+
export async function coreIssue589(): Promise<void> {
9+
wf.setHandler(wf.defineQuery('q'), () => {
10+
return 'not important';
11+
});
12+
13+
let unblocked = false;
14+
wf.setHandler(unblockSignal, () => {
15+
unblocked = true;
16+
});
17+
await wf.condition(() => unblocked, 10000);
18+
19+
customLogger.info(
20+
`Checkpoint, replaying: ${wf.workflowInfo().unsafe.isReplaying}, hl: ${wf.workflowInfo().historyLength}`
21+
);
22+
}

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export * from './condition-completion-race';
2929
export * from './condition-timeout-0';
3030
export * from './continue-as-new-same-workflow';
3131
export * from './continue-as-new-to-different-workflow';
32+
export * from './core-issue-589';
3233
export * from './date';
3334
export * from './deferred-resolve';
3435
export * from './definitions';

0 commit comments

Comments
 (0)