Skip to content

Commit c553b61

Browse files
mjameswhbergundy
andauthored
fix(workflow): Fix internal patches NDE on replay (#1106)
Co-authored-by: Roey Berman <roey.berman@gmail.com>
1 parent 0b63bd9 commit c553b61

File tree

7 files changed

+19
-156
lines changed

7 files changed

+19
-156
lines changed

packages/test/src/helpers.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ export const bundlerOptions = {
8181
'@temporalio/common/lib/internal-non-workflow',
8282
'@temporalio/activity',
8383
'@temporalio/client',
84-
'@temporalio/proto',
8584
'@temporalio/testing',
8685
'@temporalio/worker',
8786
'ava',

packages/test/src/integration-tests.ts

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import { cleanOptionalStackTrace, registerDefaultCustomSearchAttributes, u8, Wor
4545
import * as workflows from './workflows';
4646
import { withZeroesHTTPServer } from './zeroes-http-server';
4747

48-
const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED, EVENT_TYPE_MARKER_RECORDED } =
48+
const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED } =
4949
iface.temporal.api.enums.v1.EventType;
5050

5151
const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]);
@@ -1385,27 +1385,4 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
13851385
}
13861386
t.pass();
13871387
});
1388-
1389-
test('Condition with timeout 0 does not block indefinitely', async (t) => {
1390-
const { client } = t.context;
1391-
const workflowId = uuid4();
1392-
const handle = await client.start(workflows.conditionTimeout0Simple, {
1393-
taskQueue: 'test',
1394-
workflowId,
1395-
});
1396-
1397-
t.false(await handle.result());
1398-
1399-
const history = await handle.fetchHistory();
1400-
const timerStartedEvents = history.events!.filter(({ eventType }) => eventType! === EVENT_TYPE_TIMER_STARTED);
1401-
t.is(timerStartedEvents.length, 2);
1402-
t.is(timerStartedEvents[0].timerStartedEventAttributes!.timerId, '1');
1403-
t.is(tsToMs(timerStartedEvents[0].timerStartedEventAttributes!.startToFireTimeout), 1000);
1404-
t.is(timerStartedEvents[1].timerStartedEventAttributes!.timerId, '2');
1405-
t.is(tsToMs(timerStartedEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1);
1406-
1407-
const markersEvents = history.events!.filter(({ eventType }) => eventType! === EVENT_TYPE_MARKER_RECORDED);
1408-
t.is(markersEvents.length, 1);
1409-
t.is(markersEvents[0].markerRecordedEventAttributes!.markerName, CHANGE_MARKER_NAME);
1410-
});
14111388
}

packages/test/src/test-integration-workflows.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,13 @@ test('Worker allows heartbeating activities after shutdown has been requested',
204204
await worker.run();
205205
t.is(cancelReason, 'CANCELLED');
206206
});
207+
208+
export async function conditionTimeout0(): Promise<boolean | undefined> {
209+
return await workflow.condition(() => false, 0);
210+
}
211+
212+
test('Condition 0 patch sets a timer', async (t) => {
213+
const { createWorker, executeWorkflow } = helpers(t);
214+
const worker = await createWorker();
215+
t.false(await worker.runUntil(executeWorkflow(conditionTimeout0)));
216+
});

packages/test/src/test-workflows.ts

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,86 +1889,6 @@ test('query not found - successString', async (t) => {
18891889
}
18901890
});
18911891

1892-
test('condition with timeout 0 maintain pre 1.5.0 compatibility - conditionTimeout0', async (t) => {
1893-
const { workflowType } = t.context;
1894-
{
1895-
const act: coresdk.workflow_activation.IWorkflowActivation = {
1896-
runId: 'test-runId',
1897-
timestamp: msToTs(Date.now()),
1898-
isReplaying: true,
1899-
jobs: [makeStartWorkflowJob(workflowType)],
1900-
};
1901-
const completion = await activate(t, act);
1902-
compareCompletion(t, completion, makeSuccess([]));
1903-
}
1904-
{
1905-
const completion = await activate(t, await makeSignalWorkflow('a', []));
1906-
compareCompletion(
1907-
t,
1908-
completion,
1909-
makeSuccess([
1910-
makeSetPatchMarker('__sdk_internal_patch_number:1', false),
1911-
makeStartTimerCommand({
1912-
seq: 1,
1913-
startToFireTimeout: msToTs(1),
1914-
}),
1915-
])
1916-
);
1917-
}
1918-
{
1919-
const completion = await activate(t, makeFireTimer(1));
1920-
compareCompletion(
1921-
t,
1922-
completion,
1923-
makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(1))])
1924-
);
1925-
}
1926-
});
1927-
1928-
test('condition with timeout 0 in >1.5.0 - conditionTimeout0', async (t) => {
1929-
const { workflowType } = t.context;
1930-
{
1931-
const act: coresdk.workflow_activation.IWorkflowActivation = {
1932-
runId: 'test-runId',
1933-
timestamp: msToTs(Date.now()),
1934-
jobs: [makeStartWorkflowJob(workflowType)],
1935-
};
1936-
const completion = await activate(t, act);
1937-
compareCompletion(
1938-
t,
1939-
completion,
1940-
makeSuccess([
1941-
makeSetPatchMarker('__sdk_internal_patch_number:1', false),
1942-
makeStartTimerCommand({
1943-
seq: 1,
1944-
startToFireTimeout: msToTs(1),
1945-
}),
1946-
])
1947-
);
1948-
}
1949-
{
1950-
const completion = await activate(t, makeFireTimer(1));
1951-
compareCompletion(
1952-
t,
1953-
completion,
1954-
makeSuccess([
1955-
makeStartTimerCommand({
1956-
seq: 2,
1957-
startToFireTimeout: msToTs(1),
1958-
}),
1959-
])
1960-
);
1961-
}
1962-
{
1963-
const completion = await activate(t, makeFireTimer(2));
1964-
compareCompletion(
1965-
t,
1966-
completion,
1967-
makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload(0))])
1968-
);
1969-
}
1970-
});
1971-
19721892
test('Buffered signals are dispatched to correct handler and in correct order - signalsOrdering', async (t) => {
19731893
const { workflowType } = t.context;
19741894
{

packages/test/src/workflows/condition-timeout-0.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* which means that the condition would block indefinitely and would return undefined once
44
* fn evaluates to true, rather than returning true or false.
55
*/
6-
import { condition, setHandler, defineSignal, sleep } from '@temporalio/workflow';
6+
import { condition, setHandler, defineSignal } from '@temporalio/workflow';
77

88
export const aSignal = defineSignal('a');
99
export const bSignal = defineSignal('b');
@@ -30,12 +30,3 @@ export async function conditionTimeout0(): Promise<number> {
3030

3131
return counter;
3232
}
33-
34-
export async function conditionTimeout0Simple(): Promise<boolean | undefined> {
35-
let validationTimerFired = false;
36-
sleep(1000)
37-
.then(() => (validationTimerFired = true))
38-
.catch((e) => console.log(e));
39-
40-
return await condition(() => validationTimerFired, 0);
41-
}

packages/workflow/src/internals.ts

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,6 @@ export type ActivationHandler = {
8686
[P in keyof coresdk.workflow_activation.IWorkflowActivationJob]: ActivationHandlerFunction<P>;
8787
};
8888

89-
/**
90-
* SDK Internal Patches are created by the SDK to avoid breaking history when behaviour
91-
* of existing API need to be modified. This is the patch number supported by the current
92-
* version of the SDK.
93-
*
94-
* History:
95-
* 1: Fix `condition(..., 0)` is not the same as `condition(..., undefined)`
96-
*/
97-
export const LATEST_INTERNAL_PATCH_NUMBER = 1;
98-
9989
/**
10090
* Keeps all of the Workflow runtime state like pending completions for activities and timers.
10191
*
@@ -281,12 +271,6 @@ export class Activator implements ActivationHandler {
281271
*/
282272
public readonly sentPatches = new Set<string>();
283273

284-
/**
285-
* SDK Internal Patches are created by the SDK to avoid breaking history when behaviour
286-
* of existing API need to be modified.
287-
*/
288-
public internalPatchNumber = 0;
289-
290274
sinkCalls = Array<SinkCall>();
291275

292276
constructor({
@@ -304,8 +288,8 @@ export class Activator implements ActivationHandler {
304288
this.random = alea(randomnessSeed);
305289

306290
if (info.unsafe.isReplaying) {
307-
for (const patch of patches) {
308-
this.knownPresentPatches.add(patch);
291+
for (const patchId of patches) {
292+
this.notifyHasPatch({ patchId });
309293
}
310294
}
311295
}
@@ -611,28 +595,7 @@ export class Activator implements ActivationHandler {
611595
if (!activation.patchId) {
612596
throw new TypeError('Notify has patch missing patch name');
613597
}
614-
if (activation.patchId.startsWith('__sdk_internal_patch_number:')) {
615-
const internalPatchNumber = parseInt(activation.patchId.substring('__sdk_internal_patch_number:'.length));
616-
if (internalPatchNumber > LATEST_INTERNAL_PATCH_NUMBER)
617-
throw new IllegalStateError(
618-
`Unsupported internal patch number: ${internalPatchNumber} > ${LATEST_INTERNAL_PATCH_NUMBER}`
619-
);
620-
if (this.internalPatchNumber < internalPatchNumber) this.internalPatchNumber = internalPatchNumber;
621-
} else {
622-
this.knownPresentPatches.add(activation.patchId);
623-
}
624-
}
625-
626-
public checkInternalPatchAtLeast(minimumPatchNumber: number): boolean {
627-
if (this.internalPatchNumber >= minimumPatchNumber) return true;
628-
if (!this.info.unsafe.isReplaying) {
629-
this.internalPatchNumber = minimumPatchNumber;
630-
this.pushCommand({
631-
setPatchMarker: { patchId: `__sdk_internal_patch_number:${LATEST_INTERNAL_PATCH_NUMBER}`, deprecated: false },
632-
});
633-
return true;
634-
}
635-
return false;
598+
this.knownPresentPatches.add(activation.patchId);
636599
}
637600

638601
public removeFromCache(): void {

packages/workflow/src/workflow.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,9 @@ export function proxyLocalActivities<A = UntypedActivities>(options: LocalActivi
565565

566566
// TODO: deprecate this patch after "enough" time has passed
567567
const EXTERNAL_WF_CANCEL_PATCH = '__temporal_internal_connect_external_handle_cancel_to_scope';
568+
// This generic name of this patch comes from an attempt to build a generic internal patching mechanism.
569+
// That effort has been abandoned in favor of a newer WorkflowTaskCompletedMetadata based mechanism.
570+
const CONDITION_0_PATCH = '__sdk_internal_patch_number:1';
568571

569572
/**
570573
* Returns a client-side handle that can be used to signal and cancel an existing Workflow execution.
@@ -1072,7 +1075,7 @@ export function condition(fn: () => boolean): Promise<void>;
10721075

10731076
export async function condition(fn: () => boolean, timeout?: number | string): Promise<void | boolean> {
10741077
// Prior to 1.5.0, `condition(fn, 0)` was treated as equivalent to `condition(fn, undefined)`
1075-
if (timeout === 0 && !getActivator().checkInternalPatchAtLeast(1)) {
1078+
if (timeout === 0 && !patched(CONDITION_0_PATCH)) {
10761079
return conditionInner(fn);
10771080
}
10781081
if (typeof timeout === 'number' || typeof timeout === 'string') {

0 commit comments

Comments
 (0)