Skip to content

Commit 450c713

Browse files
authored
fix: Fail workflow task if local activity not registered with worker (#1152)
Previously the worker would needlessly retry these local activities. With this change, the worker fails the workflow task allowing a user to deploy a fix. Replaces #1150 with better behavior. Matches Java behavior temporalio/sdk-java#1628.
1 parent 3bcb769 commit 450c713

File tree

13 files changed

+112
-55
lines changed

13 files changed

+112
-55
lines changed

packages/common/src/converter/payload-converter.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import type { temporal } from '@temporalio/proto';
21
import { decode, encode } from '../encoding';
32
import { PayloadConverterError, ValueError } from '../errors';
43
import { Payload } from '../interfaces';

packages/test/src/test-local-activities.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ export async function runMyLocalActivityWithOption(
141141
return await workflow.proxyLocalActivities(opts).myLocalActivity();
142142
}
143143

144-
test.serial('Local activity with ', async (t) => {
144+
test.serial('Local activity with various timeouts', async (t) => {
145145
const { executeWorkflow, createWorker } = helpers(t);
146146
const worker = await createWorker({
147147
activities: {
@@ -523,34 +523,43 @@ test.serial('Local activity can be intercepted', async (t) => {
523523
});
524524
});
525525

526-
export async function runANonExisitingLocalActivity(): Promise<void> {
527-
// TODO: default behavior should be to not retry activities that are not found
526+
export async function runNonExisitingLocalActivity(): Promise<void> {
528527
const { activityNotFound } = workflow.proxyLocalActivities({
529528
startToCloseTimeout: '1m',
530-
retry: { maximumAttempts: 1 },
531529
});
532530

533-
await activityNotFound();
531+
try {
532+
await activityNotFound();
533+
} catch (err) {
534+
if (err instanceof ReferenceError) {
535+
return;
536+
}
537+
throw err;
538+
}
539+
throw ApplicationFailure.nonRetryable('Unreachable');
534540
}
535541

536-
test.serial('Local activity fails if not registered on Worker', async (t) => {
542+
test.serial('Local activity not registered on Worker throws ReferenceError in workflow context', async (t) => {
537543
const { executeWorkflow, createWorker } = helpers(t);
544+
const worker = await createWorker();
545+
await worker.runUntil(executeWorkflow(runNonExisitingLocalActivity));
546+
t.pass();
547+
});
548+
549+
test.serial('Local activity not registered on replay Worker does not throw', async (t) => {
550+
const { startWorkflow, createWorker } = helpers(t);
538551
const worker = await createWorker({
539552
activities: {
540-
// We need at least one activity, so that the Worker start its activity poller
541-
async dummy(): Promise<string> {
542-
return 'dummy';
553+
async echo(input: string) {
554+
return input;
543555
},
544556
},
545557
});
546-
await worker.runUntil(async () => {
547-
const err: WorkflowFailedError | undefined = await t.throwsAsync(
548-
executeWorkflow(runANonExisitingLocalActivity, {}),
549-
{ instanceOf: WorkflowFailedError }
550-
);
551-
t.true(err?.cause instanceof ApplicationFailure && !err.cause.nonRetryable);
552-
t.truthy(err?.cause?.message?.startsWith('Activity function activityNotFound is not registered on this Worker'));
553-
});
558+
const handle = await startWorkflow(runOneLocalActivity, { args: ['hello'] });
559+
await worker.runUntil(() => handle.result());
560+
const history = await handle.fetchHistory();
561+
await Worker.runReplayHistory({ workflowBundle: t.context.workflowBundle }, history, handle.workflowId);
562+
t.pass();
554563
});
555564

556565
/**

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ test.before(async (t) => {
6464
const bundler = new WorkflowCodeBundler({ workflowsPath });
6565
const workflowBundle = parseWorkflowCode((await bundler.createBundle()).code);
6666
t.context.workflowCreator = REUSE_V8_CONTEXT
67-
? await TestReusableVMWorkflowCreator.create(workflowBundle, 200)
68-
: await TestVMWorkflowCreator.create(workflowBundle, 200);
67+
? await TestReusableVMWorkflowCreator.create(workflowBundle, 200, new Set())
68+
: await TestVMWorkflowCreator.create(workflowBundle, 200, new Set());
6969
});
7070

7171
test.after.always(async (t) => {

packages/worker/src/runtime.ts

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import * as v8 from 'node:v8';
33
import * as fs from 'node:fs';
44
import * as os from 'node:os';
55
import { Heap } from 'heap-js';
6-
import { Subject, firstValueFrom } from 'rxjs';
6+
import { BehaviorSubject, lastValueFrom, of } from 'rxjs';
7+
import { concatMap, delay, map, repeat, takeWhile } from 'rxjs/operators';
78
import * as native from '@temporalio/core-bridge';
89
import {
910
pollLogs,
@@ -134,7 +135,7 @@ export class Runtime {
134135
protected pendingCreations = 0;
135136
/** Track the registered native objects to automatically shutdown when all have been deregistered */
136137
protected readonly backRefs = new Set<TrackedNativeObject>();
137-
protected readonly stopPollingForLogs = new Subject<void>();
138+
protected readonly shouldPollForLogs = new BehaviorSubject<boolean>(false);
138139
protected readonly logPollPromise: Promise<void>;
139140
public readonly logger: Logger;
140141
protected readonly shutdownSignalCallbacks = new Set<() => void>();
@@ -265,30 +266,30 @@ export class Runtime {
265266
}
266267

267268
protected async initLogPolling(logger: CoreLogger): Promise<void> {
269+
this.shouldPollForLogs.next(true);
270+
268271
if (!this.isForwardingLogs()) {
269272
return;
270273
}
271-
272-
const stopPollingForLogs = firstValueFrom(this.stopPollingForLogs);
273-
274274
const poll = promisify(pollLogs);
275275
try {
276-
for (;;) {
277-
const logs = await poll(this.native);
278-
for (const log of logs) {
279-
logger.log(log.level, log.message, {
280-
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
281-
});
282-
}
283-
logger.flush();
284-
const stop = await Promise.race([
285-
stopPollingForLogs.then(() => true),
286-
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), 3)),
287-
]);
288-
if (stop) {
289-
break;
290-
}
291-
}
276+
await lastValueFrom(
277+
of(this.shouldPollForLogs).pipe(
278+
map((subject) => subject.getValue()),
279+
takeWhile((shouldPoll) => shouldPoll),
280+
concatMap(() => poll(this.native)),
281+
map((logs) => {
282+
for (const log of logs) {
283+
logger.log(log.level, log.message, {
284+
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
285+
});
286+
}
287+
logger.flush();
288+
}),
289+
delay(3), // Don't go wild polling as fast as possible
290+
repeat()
291+
)
292+
);
292293
} catch (error) {
293294
// Log using the original logger instead of buffering
294295
this.options.logger.warn('Error gathering forwarded logs from core', { error });
@@ -473,7 +474,7 @@ export class Runtime {
473474
public async shutdown(): Promise<void> {
474475
delete Runtime._instance;
475476
this.teardownShutdownHook();
476-
this.stopPollingForLogs.next();
477+
this.shouldPollForLogs.next(false);
477478
// This will effectively drain all logs
478479
await this.logPollPromise;
479480
await promisify(runtimeShutdown)(this.native);

packages/worker/src/worker.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,19 +472,33 @@ export class Worker {
472472
workflowBundle: WorkflowBundleWithSourceMapAndFilename,
473473
compiledOptions: CompiledWorkerOptions
474474
): Promise<WorkflowCreator> {
475+
const registeredActivityNames = new Set(
476+
Object.entries(compiledOptions.activities ?? {})
477+
.filter(([_, v]) => typeof v === 'function')
478+
.map(([k]) => k)
479+
);
475480
// This isn't required for vscode, only for Chrome Dev Tools which doesn't support debugging worker threads.
476481
// We also rely on this in debug-replayer where we inject a global variable to be read from workflow context.
477482
if (compiledOptions.debugMode) {
478483
if (compiledOptions.reuseV8Context) {
479-
return await ReusableVMWorkflowCreator.create(workflowBundle, compiledOptions.isolateExecutionTimeoutMs);
484+
return await ReusableVMWorkflowCreator.create(
485+
workflowBundle,
486+
compiledOptions.isolateExecutionTimeoutMs,
487+
registeredActivityNames
488+
);
480489
}
481-
return await VMWorkflowCreator.create(workflowBundle, compiledOptions.isolateExecutionTimeoutMs);
490+
return await VMWorkflowCreator.create(
491+
workflowBundle,
492+
compiledOptions.isolateExecutionTimeoutMs,
493+
registeredActivityNames
494+
);
482495
} else {
483496
return await ThreadedVMWorkflowCreator.create({
484497
workflowBundle,
485498
threadPoolSize: compiledOptions.workflowThreadPoolSize,
486499
isolateExecutionTimeoutMs: compiledOptions.isolateExecutionTimeoutMs,
487500
reuseV8Context: compiledOptions.reuseV8Context ?? false,
501+
registeredActivityNames,
488502
});
489503
}
490504
}

packages/worker/src/workflow/reusable-vm.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
5555

5656
constructor(
5757
script: vm.Script,
58-
public readonly workflowBundle: WorkflowBundleWithSourceMapAndFilename,
59-
public readonly isolateExecutionTimeoutMs: number
58+
protected readonly workflowBundle: WorkflowBundleWithSourceMapAndFilename,
59+
protected readonly isolateExecutionTimeoutMs: number,
60+
/** Known activity names registered on the executing worker */
61+
protected readonly registeredActivityNames: Set<string>
6062
) {
6163
if (!ReusableVMWorkflowCreator.unhandledRejectionHandlerHasBeenSet) {
6264
setUnhandledRejectionHandler((runId) => ReusableVMWorkflowCreator.workflowByRunId.get(runId));
@@ -166,6 +168,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
166168
...options,
167169
sourceMap: this.workflowBundle.sourceMap,
168170
getTimeOfDay: () => timeOfDayToBigint(getTimeOfDay()),
171+
registeredActivityNames: this.registeredActivityNames,
169172
});
170173
const activator = bag.__TEMPORAL_ACTIVATOR__ as any;
171174

@@ -182,12 +185,13 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
182185
public static async create<T extends typeof ReusableVMWorkflowCreator>(
183186
this: T,
184187
workflowBundle: WorkflowBundleWithSourceMapAndFilename,
185-
isolateExecutionTimeoutMs: number
188+
isolateExecutionTimeoutMs: number,
189+
registeredActivityNames: Set<string>
186190
): Promise<InstanceType<T>> {
187191
globalHandlers.install(); // Call is idempotent
188192
await globalHandlers.addWorkflowBundle(workflowBundle);
189193
const script = new vm.Script(workflowBundle.code, { filename: workflowBundle.filename });
190-
return new this(script, workflowBundle, isolateExecutionTimeoutMs) as InstanceType<T>;
194+
return new this(script, workflowBundle, isolateExecutionTimeoutMs, registeredActivityNames) as InstanceType<T>;
191195
}
192196

193197
/**

packages/worker/src/workflow/threaded-vm.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ export interface ThreadedVMWorkflowCreatorOptions {
136136
threadPoolSize: number;
137137
isolateExecutionTimeoutMs: number;
138138
reuseV8Context: boolean;
139+
registeredActivityNames: Set<string>;
139140
}
140141

141142
/**
@@ -152,13 +153,20 @@ export class ThreadedVMWorkflowCreator implements WorkflowCreator {
152153
workflowBundle,
153154
isolateExecutionTimeoutMs,
154155
reuseV8Context,
156+
registeredActivityNames,
155157
}: ThreadedVMWorkflowCreatorOptions): Promise<ThreadedVMWorkflowCreator> {
156158
const workerThreadClients = Array(threadPoolSize)
157159
.fill(0)
158160
.map(() => new WorkerThreadClient(new NodeWorker(require.resolve('./workflow-worker-thread'))));
159161
await Promise.all(
160162
workerThreadClients.map((client) =>
161-
client.send({ type: 'init', workflowBundle, isolateExecutionTimeoutMs, reuseV8Context })
163+
client.send({
164+
type: 'init',
165+
workflowBundle,
166+
isolateExecutionTimeoutMs,
167+
reuseV8Context,
168+
registeredActivityNames,
169+
})
162170
)
163171
);
164172
return new this(workerThreadClients);

packages/worker/src/workflow/vm.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ export class VMWorkflowCreator implements WorkflowCreator {
2525

2626
constructor(
2727
script: vm.Script,
28-
public readonly workflowBundle: WorkflowBundleWithSourceMapAndFilename,
29-
public readonly isolateExecutionTimeoutMs: number
28+
protected readonly workflowBundle: WorkflowBundleWithSourceMapAndFilename,
29+
protected readonly isolateExecutionTimeoutMs: number,
30+
protected readonly registeredActivityNames: Set<string>
3031
) {
3132
if (!VMWorkflowCreator.unhandledRejectionHandlerHasBeenSet) {
3233
setUnhandledRejectionHandler((runId) => VMWorkflowCreator.workflowByRunId.get(runId));
@@ -62,6 +63,7 @@ export class VMWorkflowCreator implements WorkflowCreator {
6263
...options,
6364
sourceMap: this.workflowBundle.sourceMap,
6465
getTimeOfDay: () => timeOfDayToBigint(getTimeOfDay()),
66+
registeredActivityNames: this.registeredActivityNames,
6567
});
6668
const activator = context.__TEMPORAL_ACTIVATOR__ as any;
6769

@@ -97,12 +99,13 @@ export class VMWorkflowCreator implements WorkflowCreator {
9799
public static async create<T extends typeof VMWorkflowCreator>(
98100
this: T,
99101
workflowBundle: WorkflowBundleWithSourceMapAndFilename,
100-
isolateExecutionTimeoutMs: number
102+
isolateExecutionTimeoutMs: number,
103+
registeredActivityNames: Set<string>
101104
): Promise<InstanceType<T>> {
102105
globalHandlers.install();
103106
await globalHandlers.addWorkflowBundle(workflowBundle);
104107
const script = new vm.Script(workflowBundle.code, { filename: workflowBundle.filename });
105-
return new this(script, workflowBundle, isolateExecutionTimeoutMs) as InstanceType<T>;
108+
return new this(script, workflowBundle, isolateExecutionTimeoutMs, registeredActivityNames) as InstanceType<T>;
106109
}
107110

108111
/**

packages/worker/src/workflow/workflow-worker-thread.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,18 @@ async function handleRequest({ requestId, input }: WorkerThreadRequest): Promise
3131
switch (input.type) {
3232
case 'init':
3333
if (input.reuseV8Context) {
34-
workflowCreator = await ReusableVMWorkflowCreator.create(input.workflowBundle, input.isolateExecutionTimeoutMs);
34+
workflowCreator = await ReusableVMWorkflowCreator.create(
35+
input.workflowBundle,
36+
input.isolateExecutionTimeoutMs,
37+
input.registeredActivityNames
38+
);
3539
workflowGetter = (runId) => ReusableVMWorkflowCreator.workflowByRunId.get(runId);
3640
} else {
37-
workflowCreator = await VMWorkflowCreator.create(input.workflowBundle, input.isolateExecutionTimeoutMs);
41+
workflowCreator = await VMWorkflowCreator.create(
42+
input.workflowBundle,
43+
input.isolateExecutionTimeoutMs,
44+
input.registeredActivityNames
45+
);
3846
workflowGetter = (runId) => VMWorkflowCreator.workflowByRunId.get(runId);
3947
}
4048
return ok(requestId);

packages/worker/src/workflow/workflow-worker-thread/input.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export interface Init {
1515
type: 'init';
1616
isolateExecutionTimeoutMs: number;
1717
workflowBundle: WorkflowBundleWithSourceMapAndFilename;
18+
registeredActivityNames: Set<string>;
1819
reuseV8Context: boolean;
1920
}
2021

packages/workflow/src/interfaces.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ export interface WorkflowCreateOptions {
401401

402402
export interface WorkflowCreateOptionsInternal extends WorkflowCreateOptions {
403403
sourceMap: RawSourceMap;
404+
registeredActivityNames: Set<string>;
404405
getTimeOfDay(): bigint;
405406
}
406407

packages/workflow/src/internals.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ export class Activator implements ActivationHandler {
281281
*/
282282
public readonly getTimeOfDay: () => bigint;
283283

284+
public readonly registeredActivityNames: Set<string>;
285+
284286
constructor({
285287
info,
286288
now,
@@ -289,13 +291,15 @@ export class Activator implements ActivationHandler {
289291
getTimeOfDay,
290292
randomnessSeed,
291293
patches,
294+
registeredActivityNames,
292295
}: WorkflowCreateOptionsInternal) {
293296
this.getTimeOfDay = getTimeOfDay;
294297
this.info = info;
295298
this.now = now;
296299
this.showStackTraceSources = showStackTraceSources;
297300
this.sourceMap = sourceMap;
298301
this.random = alea(randomnessSeed);
302+
this.registeredActivityNames = registeredActivityNames;
299303

300304
if (info.unsafe.isReplaying) {
301305
for (const patchId of patches) {

packages/workflow/src/workflow.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ async function scheduleLocalActivityNextHandler({
199199
originalScheduleTime,
200200
}: LocalActivityInput): Promise<unknown> {
201201
const activator = getActivator();
202+
// Eagerly fail the local activity (which will in turn fail the workflow task.
203+
// Do not fail on replay where the local activities may not be registered on the replay worker.
204+
if (!workflowInfo().unsafe.isReplaying && !activator.registeredActivityNames.has(activityType)) {
205+
throw new ReferenceError(`Local activity of type '${activityType}' not registered on worker`);
206+
}
202207
validateLocalActivityOptions(options);
203208

204209
return new Promise((resolve, reject) => {

0 commit comments

Comments
 (0)