Skip to content

Commit 4dd83c1

Browse files
authored
fix(worker): Don't fail Worker on Activity Task decode failures (#1473)
1 parent 9a08924 commit 4dd83c1

File tree

7 files changed

+190
-94
lines changed

7 files changed

+190
-94
lines changed

packages/client/src/helpers.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
SearchAttributes,
88
} from '@temporalio/common';
99
import { Replace } from '@temporalio/common/lib/type-helpers';
10-
import { optionalTsToDate, tsToDate } from '@temporalio/common/lib/time';
10+
import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time';
1111
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
1212
import { temporal, google } from '@temporalio/proto';
1313
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
@@ -62,7 +62,7 @@ export async function executionInfoFromRaw<T>(
6262
// Exact truncation for multi-petabyte histories
6363
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
6464
historySize: raw.historySizeBytes?.toNumber() || undefined,
65-
startTime: tsToDate(raw.startTime!),
65+
startTime: requiredTsToDate(raw.startTime, 'startTime'),
6666
executionTime: optionalTsToDate(raw.executionTime),
6767
closeTime: optionalTsToDate(raw.closeTime),
6868
memo: await decodeMapFromPayloads(dataConverter, raw.memo?.fields),

packages/client/src/schedule-client.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import {
88
filterNullAndUndefined,
99
} from '@temporalio/common/lib/internal-non-workflow';
1010
import { temporal } from '@temporalio/proto';
11-
import { optionalDateToTs, optionalTsToDate, optionalTsToMs, tsToDate } from '@temporalio/common/lib/time';
11+
import {
12+
optionalDateToTs,
13+
optionalTsToDate,
14+
optionalTsToMs,
15+
requiredTsToDate,
16+
tsToDate,
17+
} from '@temporalio/common/lib/time';
1218
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
1319
import { CreateScheduleInput, CreateScheduleOutput, ScheduleClientInterceptor } from './interceptors';
1420
import { WorkflowService } from './types';
@@ -430,8 +436,7 @@ export class ScheduleClient extends BaseClient {
430436
info: {
431437
recentActions: decodeScheduleRecentActions(raw.info?.recentActions),
432438
nextActionTimes: raw.info?.futureActionTimes?.map(tsToDate) ?? [],
433-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
434-
createdAt: tsToDate(raw.info!.createTime!),
439+
createdAt: requiredTsToDate(raw.info?.createTime, 'createTime'),
435440
lastUpdatedAt: optionalTsToDate(raw.info?.updateTime),
436441
runningActions: decodeScheduleRunningActions(raw.info?.runningWorkflows),
437442
numActionsMissedCatchupWindow: raw.info?.missedCatchupWindow?.toNumber() ?? 0,

packages/client/src/schedule-helpers.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
optionalDateToTs,
2424
optionalTsToDate,
2525
optionalTsToMs,
26-
tsToDate,
26+
requiredTsToDate,
2727
} from '@temporalio/common/lib/time';
2828
import {
2929
CalendarSpec,
@@ -405,10 +405,8 @@ export function decodeScheduleRecentActions(
405405
} else throw new TypeError('Unsupported schedule action');
406406

407407
return {
408-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
409-
scheduledAt: tsToDate(executionResult.scheduleTime!),
410-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
411-
takenAt: tsToDate(executionResult.actualTime!),
408+
scheduledAt: requiredTsToDate(executionResult.scheduleTime, 'scheduleTime'),
409+
takenAt: requiredTsToDate(executionResult.actualTime, 'actualTime'),
412410
action,
413411
};
414412
}

packages/common/src/time.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ export function optionalTsToMs(ts: Timestamp | null | undefined): number | undef
2828
return tsToMs(ts);
2929
}
3030

31+
/**
32+
* Lossy conversion function from Timestamp to number due to possible overflow.
33+
* If ts is null or undefined, throws a TypeError, with error message including the name of the field.
34+
*/
35+
export function requiredTsToMs(ts: Timestamp | null | undefined, fieldName: string): number {
36+
if (ts === undefined || ts === null) {
37+
throw new TypeError(`Expected ${fieldName} to be a timestamp, got ${ts}`);
38+
}
39+
return tsToMs(ts);
40+
}
41+
3142
/**
3243
* Lossy conversion function from Timestamp to number due to possible overflow
3344
*/
@@ -83,6 +94,11 @@ export function tsToDate(ts: Timestamp): Date {
8394
return new Date(tsToMs(ts));
8495
}
8596

97+
// ts-prune-ignore-next
98+
export function requiredTsToDate(ts: Timestamp | null | undefined, fieldName: string): Date {
99+
return new Date(requiredTsToMs(ts, fieldName));
100+
}
101+
86102
export function optionalTsToDate(ts: Timestamp | null | undefined): Date | undefined {
87103
if (ts === undefined || ts === null) {
88104
return undefined;

packages/test/src/test-worker-activities.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,66 @@ test('Worker fails activity with proper message when it is not registered', asyn
222222
});
223223
});
224224

225+
test('Worker fails activity with proper message if activity info contains null ScheduledTime', async (t) => {
226+
const worker = isolateFreeWorker({
227+
...defaultOptions,
228+
activities: {
229+
async dummy(): Promise<void> {},
230+
},
231+
});
232+
t.context.worker = worker;
233+
234+
await runWorker(t, async () => {
235+
const taskToken = Buffer.from(uuid4());
236+
const { result } = await worker.native.runActivityTask({
237+
taskToken,
238+
start: {
239+
activityType: 'dummy',
240+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
241+
input: toPayloads(defaultPayloadConverter),
242+
scheduledTime: null,
243+
},
244+
});
245+
t.is(worker.getState(), 'RUNNING');
246+
t.is(result?.failed?.failure?.applicationFailureInfo?.type, 'TypeError');
247+
t.is(result?.failed?.failure?.message, 'Expected scheduledTime to be a timestamp, got null');
248+
t.true(/worker\.[jt]s/.test(result?.failed?.failure?.stackTrace ?? ''));
249+
});
250+
});
251+
252+
test('Worker fails activity task if interceptor factory throws', async (t) => {
253+
const worker = isolateFreeWorker({
254+
...defaultOptions,
255+
activities: {
256+
async dummy(): Promise<void> {},
257+
},
258+
interceptors: {
259+
activity: [
260+
() => {
261+
throw new Error('I am a bad interceptor');
262+
},
263+
],
264+
},
265+
});
266+
t.context.worker = worker;
267+
268+
await runWorker(t, async () => {
269+
const taskToken = Buffer.from(uuid4());
270+
const { result } = await worker.native.runActivityTask({
271+
taskToken,
272+
start: {
273+
activityType: 'dummy',
274+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
275+
input: toPayloads(defaultPayloadConverter),
276+
},
277+
});
278+
t.is(worker.getState(), 'RUNNING');
279+
t.is(result?.failed?.failure?.applicationFailureInfo?.type, 'Error');
280+
t.is(result?.failed?.failure?.message, 'I am a bad interceptor');
281+
t.true(/test-worker-activities\.[tj]s/.test(result?.failed?.failure?.stackTrace ?? ''));
282+
});
283+
});
284+
225285
test('Non ApplicationFailure TemporalFailures thrown from Activity are wrapped with ApplicationFailure', async (t) => {
226286
const worker = isolateFreeWorker({
227287
...defaultOptions,

0 commit comments

Comments
 (0)