Skip to content

Commit 510c533

Browse files
authored
fix(workflow): Correctly handle condition with timeout of 0 (#985)
1 parent 42eb50e commit 510c533

File tree

7 files changed

+191
-4
lines changed

7 files changed

+191
-4
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,12 @@ jobs:
292292

293293
# Runs the sdk features repo tests with this repo's current SDK code
294294
sdk-features-tests:
295-
uses: temporalio/sdk-features/.github/workflows/typescript.yaml@main
295+
uses: temporalio/sdk-features/.github/workflows/typescript.yaml@ts-next
296296
with:
297297
typescript-repo-path: ${{github.event.pull_request.head.repo.full_name}}
298298
version: ${{github.event.pull_request.head.ref}}
299299
version-is-repo-ref: true
300+
sdk-features-repo-ref: ts-next
300301

301302
stress-tests:
302303
uses: ./.github/workflows/stress.yml

packages/test/src/integration-tests.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import { cleanOptionalStackTrace, u8 } from './helpers';
5252
import * as workflows from './workflows';
5353
import { withZeroesHTTPServer } from './zeroes-http-server';
5454

55-
const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED } =
55+
const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED, EVENT_TYPE_MARKER_RECORDED } =
5656
iface.temporal.api.enums.v1.EventType;
5757

5858
const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]);
@@ -1421,4 +1421,27 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
14211421
);
14221422
t.pass();
14231423
});
1424+
1425+
test('Condition with timeout 0 does not block indefinitely', async (t) => {
1426+
const { client } = t.context;
1427+
const workflowId = uuid4();
1428+
const handle = await client.start(workflows.conditionTimeout0Simple, {
1429+
taskQueue: 'test',
1430+
workflowId,
1431+
});
1432+
1433+
t.false(await handle.result());
1434+
1435+
const history = await handle.fetchHistory();
1436+
const timerStartedEvents = history.events!.filter(({ eventType }) => eventType! === EVENT_TYPE_TIMER_STARTED);
1437+
t.is(timerStartedEvents.length, 2);
1438+
t.is(timerStartedEvents[0].timerStartedEventAttributes!.timerId, '1');
1439+
t.is(tsToMs(timerStartedEvents[0].timerStartedEventAttributes!.startToFireTimeout), 1000);
1440+
t.is(timerStartedEvents[1].timerStartedEventAttributes!.timerId, '2');
1441+
t.is(tsToMs(timerStartedEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1);
1442+
1443+
const markersEvents = history.events!.filter(({ eventType }) => eventType! === EVENT_TYPE_MARKER_RECORDED);
1444+
t.is(markersEvents.length, 1);
1445+
t.is(markersEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME);
1446+
});
14241447
}

packages/test/src/test-workflows.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1872,3 +1872,83 @@ test('query not found - successString', async (t) => {
18721872
);
18731873
}
18741874
});
1875+
1876+
test('condition with timeout 0 maintain pre 1.5.0 compatibility - conditionTimeout0', async (t) => {
1877+
const { workflowType } = t.context;
1878+
{
1879+
const act: coresdk.workflow_activation.IWorkflowActivation = {
1880+
runId: 'test-runId',
1881+
timestamp: msToTs(Date.now()),
1882+
isReplaying: true,
1883+
jobs: [makeStartWorkflowJob(workflowType)],
1884+
};
1885+
const completion = await activate(t, act);
1886+
compareCompletion(t, completion, makeSuccess([]));
1887+
}
1888+
{
1889+
const completion = await activate(t, await makeSignalWorkflow('a', []));
1890+
compareCompletion(
1891+
t,
1892+
completion,
1893+
makeSuccess([
1894+
makeSetPatchMarker('__sdk_internal_patch_number:1', false),
1895+
makeStartTimerCommand({
1896+
seq: 1,
1897+
startToFireTimeout: msToTs(1),
1898+
}),
1899+
])
1900+
);
1901+
}
1902+
{
1903+
const completion = await activate(t, await makeFireTimer(1));
1904+
compareCompletion(
1905+
t,
1906+
completion,
1907+
makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(1))])
1908+
);
1909+
}
1910+
});
1911+
1912+
test('condition with timeout 0 in >1.5.0 - conditionTimeout0', async (t) => {
1913+
const { workflowType } = t.context;
1914+
{
1915+
const act: coresdk.workflow_activation.IWorkflowActivation = {
1916+
runId: 'test-runId',
1917+
timestamp: msToTs(Date.now()),
1918+
jobs: [makeStartWorkflowJob(workflowType)],
1919+
};
1920+
const completion = await activate(t, act);
1921+
compareCompletion(
1922+
t,
1923+
completion,
1924+
makeSuccess([
1925+
makeSetPatchMarker('__sdk_internal_patch_number:1', false),
1926+
makeStartTimerCommand({
1927+
seq: 1,
1928+
startToFireTimeout: msToTs(1),
1929+
}),
1930+
])
1931+
);
1932+
}
1933+
{
1934+
const completion = await activate(t, await makeFireTimer(1));
1935+
compareCompletion(
1936+
t,
1937+
completion,
1938+
makeSuccess([
1939+
makeStartTimerCommand({
1940+
seq: 2,
1941+
startToFireTimeout: msToTs(1),
1942+
}),
1943+
])
1944+
);
1945+
}
1946+
{
1947+
const completion = await activate(t, await makeFireTimer(2));
1948+
compareCompletion(
1949+
t,
1950+
completion,
1951+
makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(0))])
1952+
);
1953+
}
1954+
});
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Prior to 1.5.0, `condition(fn, 0)` was treated the same as `condition(..., undefined)`,
3+
* which means that the condition would block indefinitely and would return undefined once
4+
* fn evaluates to true, rather than returning true or false.
5+
*/
6+
import { condition, setHandler, defineSignal, sleep, ApplicationFailure } from '@temporalio/workflow';
7+
8+
export const aSignal = defineSignal('a');
9+
export const bSignal = defineSignal('b');
10+
11+
export async function conditionTimeout0(): Promise<number> {
12+
let counter = 0;
13+
14+
let aSignalReceived = false;
15+
setHandler(aSignal, () => {
16+
aSignalReceived = true;
17+
});
18+
19+
let bSignalReceived = false;
20+
setHandler(bSignal, () => {
21+
bSignalReceived = true;
22+
});
23+
24+
const aResult = await condition(() => aSignalReceived, 0);
25+
if (aResult === true || aResult === undefined) counter += 1;
26+
27+
// Do it a second time, so that we can validate that patching logic works
28+
const bResult = await condition(() => bSignalReceived, 0);
29+
if (bResult === true || bResult === undefined) counter += 10;
30+
31+
return counter;
32+
}
33+
34+
export async function conditionTimeout0Simple(): Promise<boolean | undefined> {
35+
let validationTimerFired = false;
36+
sleep(1000)
37+
.then(() => (validationTimerFired = true))
38+
.catch((e) => console.log(e));
39+
40+
return await condition(() => validationTimerFired, 0);
41+
}

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export * from './child-workflow-termination';
2626
export * from './child-workflow-timeout';
2727
export * from './condition';
2828
export * from './condition-completion-race';
29+
export * from './condition-timeout-0';
2930
export * from './continue-as-new-same-workflow';
3031
export * from './continue-as-new-to-different-workflow';
3132
export * from './date';

packages/workflow/src/internals.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ export type ActivationHandler = {
8484
[P in keyof coresdk.workflow_activation.IWorkflowActivationJob]: ActivationHandlerFunction<P>;
8585
};
8686

87+
/**
88+
* SDK Internal Patches are created by the SDK to avoid breaking history when behaviour
89+
* of existing API need to be modified. This is the patch number supported by the current
90+
* version of the SDK.
91+
*
92+
* History:
93+
* 1: Fix `condition(..., 0)` is not the same as `condition(..., undefined)`
94+
*/
95+
export const LATEST_INTERNAL_PATCH_NUMBER = 1;
96+
8797
/**
8898
* Keeps all of the Workflow runtime state like pending completions for activities and timers.
8999
*
@@ -260,6 +270,12 @@ export class Activator implements ActivationHandler {
260270
*/
261271
public readonly sentPatches = new Set<string>();
262272

273+
/**
274+
* SDK Internal Patches are created by the SDK to avoid breaking history when behaviour
275+
* of existing API need to be modified.
276+
*/
277+
public internalPatchNumber = 0;
278+
263279
sinkCalls = Array<SinkCall>();
264280

265281
constructor({
@@ -571,7 +587,28 @@ export class Activator implements ActivationHandler {
571587
if (!activation.patchId) {
572588
throw new TypeError('Notify has patch missing patch name');
573589
}
574-
this.knownPresentPatches.add(activation.patchId);
590+
if (activation.patchId.startsWith('__sdk_internal_patch_number:')) {
591+
const internalPatchNumber = parseInt(activation.patchId.substring('__sdk_internal_patch_number:'.length));
592+
if (internalPatchNumber > LATEST_INTERNAL_PATCH_NUMBER)
593+
throw new IllegalStateError(
594+
`Unsupported internal patch number: ${internalPatchNumber} > ${LATEST_INTERNAL_PATCH_NUMBER}`
595+
);
596+
if (this.internalPatchNumber < internalPatchNumber) this.internalPatchNumber = internalPatchNumber;
597+
} else {
598+
this.knownPresentPatches.add(activation.patchId);
599+
}
600+
}
601+
602+
public checkInternalPatchAtLeast(minimumPatchNumber: number): boolean {
603+
if (this.internalPatchNumber >= minimumPatchNumber) return true;
604+
if (!this.info.unsafe.isReplaying) {
605+
this.internalPatchNumber = minimumPatchNumber;
606+
this.pushCommand({
607+
setPatchMarker: { patchId: `__sdk_internal_patch_number:${LATEST_INTERNAL_PATCH_NUMBER}`, deprecated: false },
608+
});
609+
return true;
610+
}
611+
return false;
575612
}
576613

577614
public removeFromCache(): void {

packages/workflow/src/workflow.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,11 @@ export function condition(fn: () => boolean, timeout: number | string): Promise<
10801080
export function condition(fn: () => boolean): Promise<void>;
10811081

10821082
export async function condition(fn: () => boolean, timeout?: number | string): Promise<void | boolean> {
1083-
if (timeout) {
1083+
// Prior to 1.5.0, `condition(fn, 0)` was treated as equivalent to `condition(fn, undefined)`
1084+
if (timeout === 0 && !getActivator().checkInternalPatchAtLeast(1)) {
1085+
return conditionInner(fn);
1086+
}
1087+
if (typeof timeout === 'number' || typeof timeout === 'string') {
10841088
return CancellationScope.cancellable(async () => {
10851089
try {
10861090
return await Promise.race([sleep(timeout).then(() => false), conditionInner(fn).then(() => true)]);

0 commit comments

Comments
 (0)