Skip to content

Commit 2e7c6f3

Browse files
authored
fix(worker): Disable Eager Activities if Worker has no activities (#1326)
## What changed and why - Force `WorkerOptions.enableNonLocalActivities` to false if no activities are registered on a Worker, which automatically implies that Core will not request Eager Activity Dispatch for activities scheduled by workflows running on that Worker. Until now, Core would request Eager Activity Dispatch for activities scheduled on the same task queue as the Workflow, even when the Worker was being run without any activities registered (splitting Workflow and Activity execution to distinct Workers is a diagnostic strategy we often suggest). In those cases, the activity execution would time out on the first attempt, adding delay before the activity finally get executed. - Enable several server features when running dev server for internal integration tests (applicable to tests using the new test helper). - Fixed an issue in bridge that was preventing passing `extraArgs` to dev server. - In `test-npm-init` CI tests, copy the current NPM version specifier from the SDK repo to the created sample repo. This is required in case packages get published to Verdaccio using a pre-release specifier (e.g. `1.9.0-rc.0`) as NPM will not consider a pre-release version to be an acceptable substitute for syntaxes such as `^1.8.0`, thus causing failure to install dependencies during sample testing.
1 parent 758cf3c commit 2e7c6f3

File tree

7 files changed

+130
-17
lines changed

7 files changed

+130
-17
lines changed

packages/core-bridge/src/conversions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ impl ArrayHandleConversionsExt for Handle<'_, JsArray> {
3737
let len = js_vec.len();
3838
let mut ret_vec = Vec::<String>::with_capacity(len);
3939

40-
for i in 0..len - 1 {
41-
ret_vec[i] = js_vec[i].downcast_or_throw::<JsString, _>(cx)?.value(cx);
40+
for i in 0..len {
41+
ret_vec.push(js_vec[i].downcast_or_throw::<JsString, _>(cx)?.value(cx));
4242
}
4343
Ok(ret_vec)
4444
}

packages/test/src/helpers-integration.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ export interface Context {
2828
workflowBundle: WorkflowBundle;
2929
}
3030

31+
const defaultDynamicConfigOptions = [
32+
'frontend.enableUpdateWorkflowExecution=true',
33+
'frontend.enableUpdateWorkflowExecutionAsyncAccepted=true',
34+
'frontend.workerVersioningDataAPIs=true',
35+
'frontend.workerVersioningWorkflowAPIs=true',
36+
'system.enableActivityEagerExecution=true',
37+
'system.enableEagerWorkflowStart=true',
38+
'system.forceSearchAttributesCacheRefreshOnRead=true',
39+
'worker.buildIdScavengerEnabled=true',
40+
'worker.removableBuildIdDurationSinceDefault=1',
41+
];
42+
3143
export function makeTestFunction(opts: {
3244
workflowsPath: string;
3345
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
@@ -37,7 +49,16 @@ export function makeTestFunction(opts: {
3749
test.before(async (t) => {
3850
// Ignore invalid log levels
3951
Runtime.install({ logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel) });
40-
const env = await TestWorkflowEnvironment.createLocal(opts.workflowEnvironmentOpts);
52+
const env = await TestWorkflowEnvironment.createLocal({
53+
...opts.workflowEnvironmentOpts,
54+
server: {
55+
...opts.workflowEnvironmentOpts?.server,
56+
extraArgs: [
57+
...defaultDynamicConfigOptions.flatMap((opt) => ['--dynamic-config-value', opt]),
58+
...(opts.workflowEnvironmentOpts?.server?.extraArgs ?? []),
59+
],
60+
},
61+
});
4162
const workflowBundle = await bundleWorkflowCode({
4263
...bundlerOptions,
4364
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],

packages/test/src/test-integration-workflows.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,86 @@ test('Start of workflow with signal is delayed', async (t) => {
222222
const startDelay = workflowExecutionStartedEvent?.workflowExecutionStartedEventAttributes?.firstWorkflowTaskBackoff;
223223
t.is(tsToMs(startDelay), 4678000);
224224
});
225+
226+
export async function executeEagerActivity(): Promise<void> {
227+
const scheduleActivity = () =>
228+
workflow
229+
.proxyActivities({ scheduleToCloseTimeout: '5s', allowEagerDispatch: true })
230+
.testActivity()
231+
.then((res) => {
232+
if (res !== 'workflow-and-activity-worker')
233+
throw workflow.ApplicationFailure.nonRetryable('Activity was not eagerly dispatched');
234+
});
235+
236+
for (let i = 0; i < 10; i++) {
237+
// Schedule 3 activities at a time (`MAX_EAGER_ACTIVITY_RESERVATIONS_PER_WORKFLOW_TASK`)
238+
await Promise.all([scheduleActivity(), scheduleActivity(), scheduleActivity()]);
239+
}
240+
}
241+
242+
test('Worker requests Eager Activity Dispatch if possible', async (t) => {
243+
const { createWorker, startWorkflow } = helpers(t);
244+
245+
// If eager activity dispatch is working, then the task will always be dispatched to the workflow
246+
// worker. Otherwise, chances are 50%-50% for either workers. The test workflow schedule the
247+
// activity 30 times to make sure that the workflow worker is really getting the task thanks to
248+
// eager activity dispatch, and not out of pure luck.
249+
250+
const activityWorker = await createWorker({
251+
activities: {
252+
testActivity: () => 'activity-only-worker',
253+
},
254+
// Override the default workflow bundle, to make this an activity-only worker
255+
workflowBundle: undefined,
256+
});
257+
const workflowWorker = await createWorker({
258+
activities: {
259+
testActivity: () => 'workflow-and-activity-worker',
260+
},
261+
});
262+
const handle = await startWorkflow(executeEagerActivity);
263+
await activityWorker.runUntil(workflowWorker.runUntil(handle.result()));
264+
const { events } = await handle.fetchHistory();
265+
266+
t.false(events?.some?.((ev) => ev.activityTaskTimedOutEventAttributes));
267+
const activityTaskStarted = events?.filter?.((ev) => ev.activityTaskStartedEventAttributes);
268+
t.is(activityTaskStarted?.length, 30);
269+
t.true(activityTaskStarted?.every((ev) => ev.activityTaskStartedEventAttributes?.attempt === 1));
270+
});
271+
272+
export async function dontExecuteEagerActivity(): Promise<string> {
273+
return (await workflow
274+
.proxyActivities({ scheduleToCloseTimeout: '5s', allowEagerDispatch: true })
275+
.testActivity()
276+
.catch(() => 'failed')) as string;
277+
}
278+
279+
test("Worker doesn't request Eager Activity Dispatch if no activities are registered", async (t) => {
280+
const { createWorker, startWorkflow } = helpers(t);
281+
282+
// If the activity was eagerly dispatched to the Workflow worker even though it is a Workflow-only
283+
// worker, then the activity execution will timeout (because tasks are not being polled) or
284+
// otherwise fail (because no activity is registered under that name). Therefore, if the history
285+
// shows only one attempt for that activity and no timeout, that can only mean that the activity
286+
// was not eagerly dispatched.
287+
288+
const activityWorker = await createWorker({
289+
activities: {
290+
testActivity: () => 'success',
291+
},
292+
// Override the default workflow bundle, to make this an activity-only worker
293+
workflowBundle: undefined,
294+
});
295+
const workflowWorker = await createWorker({
296+
activities: {},
297+
});
298+
const handle = await startWorkflow(dontExecuteEagerActivity);
299+
const result = await activityWorker.runUntil(workflowWorker.runUntil(handle.result()));
300+
const { events } = await handle.fetchHistory();
301+
302+
t.is(result, 'success');
303+
t.false(events?.some?.((ev) => ev.activityTaskTimedOutEventAttributes));
304+
const activityTaskStarted = events?.filter?.((ev) => ev.activityTaskStartedEventAttributes);
305+
t.is(activityTaskStarted?.length, 1);
306+
t.is(activityTaskStarted?.[0]?.activityTaskStartedEventAttributes?.attempt, 1);
307+
});

packages/test/src/test-worker-versioning.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ if (RUN_INTEGRATION_TESTS) {
176176
await asyncRetry(
177177
async () => {
178178
const reachResp = await conn.taskQueue.getReachability({ buildIds: ['2.0', '1.0', '1.1'] });
179-
assert.deepEqual(reachResp.buildIdReachability['2.0']?.taskQueueReachability[taskQueue], ['NewWorkflows']);
179+
assert.deepEqual(reachResp.buildIdReachability['2.0']?.taskQueueReachability[taskQueue], ['NEW_WORKFLOWS']);
180180
assert.deepEqual(reachResp.buildIdReachability['1.1']?.taskQueueReachability[taskQueue], []);
181181
assert.deepEqual(reachResp.buildIdReachability['1.0']?.taskQueueReachability[taskQueue], []);
182182
},
@@ -188,7 +188,9 @@ if (RUN_INTEGRATION_TESTS) {
188188
buildIds: [UnversionedBuildId],
189189
taskQueues: [taskQueue],
190190
});
191-
assert.deepEqual(reachResp.buildIdReachability[UnversionedBuildId]?.taskQueueReachability[taskQueue], []);
191+
assert.deepEqual(reachResp.buildIdReachability[UnversionedBuildId]?.taskQueueReachability[taskQueue], [
192+
'NEW_WORKFLOWS',
193+
]);
192194
},
193195
{ maxTimeout: 1000 }
194196
);

packages/worker/src/worker-options.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as os from 'node:os';
22
import * as v8 from 'node:v8';
33
import type { Configuration as WebpackConfiguration } from 'webpack';
4-
import { DataConverter, LoadedDataConverter } from '@temporalio/common';
4+
import { ActivityFunction, DataConverter, LoadedDataConverter } from '@temporalio/common';
55
import { Duration, msOptionalToNumber, msToNumber } from '@temporalio/common/lib/time';
66
import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow';
77
import { LoggerSinks } from '@temporalio/workflow';
@@ -200,6 +200,7 @@ export interface WorkerOptions {
200200
* Whether or not to poll on the Activity task queue.
201201
*
202202
* If disabled and activities are registered on the Worker, it will run only local Activities.
203+
* This setting is ignored if no activity is registed on the Worker.
203204
*
204205
* @default true
205206
*/
@@ -559,7 +560,8 @@ export type WorkerOptionsWithDefaults = WorkerOptions &
559560
* {@link WorkerOptions} where the attributes the Worker requires are required and time units are converted from ms
560561
* formatted strings to numbers.
561562
*/
562-
export interface CompiledWorkerOptions extends Omit<WorkerOptionsWithDefaults, 'serverOptions' | 'interceptors'> {
563+
export interface CompiledWorkerOptions
564+
extends Omit<WorkerOptionsWithDefaults, 'serverOptions' | 'interceptors' | 'activities'> {
563565
interceptors: CompiledWorkerInterceptors;
564566
shutdownGraceTimeMs: number;
565567
shutdownForceTimeMs?: number;
@@ -568,6 +570,7 @@ export interface CompiledWorkerOptions extends Omit<WorkerOptionsWithDefaults, '
568570
maxHeartbeatThrottleIntervalMs: number;
569571
defaultHeartbeatThrottleIntervalMs: number;
570572
loadedDataConverter: LoadedDataConverter;
573+
activities: Map<string, ActivityFunction>;
571574
}
572575

573576
/**
@@ -752,6 +755,8 @@ export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledW
752755
opts.maxConcurrentWorkflowTaskExecutions = 2;
753756
}
754757

758+
const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function'));
759+
755760
return {
756761
...opts,
757762
interceptors: compileWorkerInterceptors(opts.interceptors),
@@ -762,5 +767,7 @@ export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledW
762767
maxHeartbeatThrottleIntervalMs: msToNumber(opts.maxHeartbeatThrottleInterval),
763768
defaultHeartbeatThrottleIntervalMs: msToNumber(opts.defaultHeartbeatThrottleInterval),
764769
loadedDataConverter: loadDataConverter(opts.dataConverter),
770+
activities,
771+
enableNonLocalActivities: opts.enableNonLocalActivities && activities.size > 0,
765772
};
766773
}

packages/worker/src/worker.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -464,11 +464,7 @@ export class Worker {
464464
workflowBundle: WorkflowBundleWithSourceMapAndFilename,
465465
compiledOptions: CompiledWorkerOptions
466466
): Promise<WorkflowCreator> {
467-
const registeredActivityNames = new Set(
468-
Object.entries(compiledOptions.activities ?? {})
469-
.filter(([_, v]) => typeof v === 'function')
470-
.map(([k]) => k)
471-
);
467+
const registeredActivityNames = new Set(compiledOptions.activities.keys());
472468
// This isn't required for vscode, only for Chrome Dev Tools which doesn't support debugging worker threads.
473469
// We also rely on this in debug-replayer where we inject a global variable to be read from workflow context.
474470
if (compiledOptions.debugMode) {
@@ -858,16 +854,15 @@ export class Worker {
858854
);
859855

860856
const { activityType } = info;
861-
// activities is of type "object" which does not support string indexes
862-
const fn = (this.options.activities as any)?.[activityType];
857+
const fn = this.options.activities.get(activityType);
863858
if (typeof fn !== 'function') {
864859
output = {
865860
type: 'result',
866861
result: {
867862
failed: {
868863
failure: {
869864
message: `Activity function ${activityType} is not registered on this Worker, available activities: ${JSON.stringify(
870-
Object.keys(this.options.activities ?? {})
865+
[...this.options.activities.keys()]
871866
)}`,
872867
applicationFailureInfo: {
873868
type: 'NotFoundError',
@@ -1158,6 +1153,7 @@ export class Worker {
11581153
activation: coresdk.workflow_activation.WorkflowActivation,
11591154
startWorkflow: coresdk.workflow_activation.IStartWorkflow
11601155
): Promise<WorkflowWithLogAttributes> {
1156+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
11611157
const workflowCreator = this.workflowCreator!;
11621158
if (
11631159
!(
@@ -1531,7 +1527,7 @@ export class Worker {
15311527

15321528
protected activity$(): Observable<void> {
15331529
// This Worker did not register any activities, return early
1534-
if (this.options.activities === undefined || Object.keys(this.options.activities).length === 0) {
1530+
if (!this.options.activities?.size) {
15351531
if (!this.isReplayWorker) this.log.warn('No activities registered, not polling for activity tasks');
15361532
this.activityPollerStateSubject.next('SHUTDOWN');
15371533
return EMPTY;

scripts/init-from-verdaccio.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ const { spawnNpx } = require('./utils');
55

66
async function main() {
77
const { registryDir, targetDir, initArgs } = await getArgs();
8+
// Force samples to use the same version of @temporalio/* packages as the one
9+
// we are testing. This is required when testing against a pre-release version,
10+
// which would not be otherwise matched by specifier like ^1.8.0.
11+
const { version } = require('../lerna.json');
812

913
await withRegistry(registryDir, async () => {
1014
console.log('spawning npx @temporalio/create with args:', initArgs);
@@ -14,7 +18,7 @@ async function main() {
1418
writeFileSync(npmConfigFile, npmConfig, { encoding: 'utf-8' });
1519

1620
await spawnNpx(
17-
['@temporalio/create', targetDir, '--no-git-init', '--temporalio-version', 'latest', ...initArgs],
21+
[`@temporalio/create@${version}`, targetDir, '--no-git-init', '--sdk-version', version, ...initArgs],
1822
{
1923
stdio: 'inherit',
2024
stdout: 'inherit',

0 commit comments

Comments
 (0)