Skip to content

Commit 6505f8d

Browse files
authored
feat(worker): Enable reuseV8Context by default (#1310)
1 parent 22a8f1d commit 6505f8d

File tree

5 files changed

+84
-33
lines changed

5 files changed

+84
-33
lines changed

packages/test/src/helpers.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ export function u8(s: string): Uint8Array {
1818
return new TextEncoder().encode(s);
1919
}
2020

21-
export function isSet(env: string | undefined): boolean {
22-
if (env === undefined) return false;
21+
function isSet(env: string | undefined, def: boolean): boolean {
22+
if (env === undefined) return def;
2323
env = env.toLocaleLowerCase();
2424
return env === '1' || env === 't' || env === 'true';
2525
}
2626

27-
export const RUN_INTEGRATION_TESTS = inWorkflowContext() || isSet(process.env.RUN_INTEGRATION_TESTS);
28-
export const REUSE_V8_CONTEXT = inWorkflowContext() || isSet(process.env.REUSE_V8_CONTEXT);
27+
export const RUN_INTEGRATION_TESTS = inWorkflowContext() || isSet(process.env.RUN_INTEGRATION_TESTS, false);
28+
export const REUSE_V8_CONTEXT = inWorkflowContext() || isSet(process.env.REUSE_V8_CONTEXT, true);
2929

3030
export async function sleep(ms: number): Promise<void> {
3131
return new Promise((resolve) => setTimeout(resolve, ms));

packages/test/src/load/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ async function main() {
176176
activity: [() => ({ inbound: new ConnectionInjectorInterceptor(clientConnection) })],
177177
},
178178
// Can't reuse the helper because it defines `test` and ava thinks it's an ava test.
179-
reuseV8Context: ['1', 't', 'true'].includes((process.env.REUSE_V8_CONTEXT ?? 'false').toLowerCase()),
179+
reuseV8Context: ['1', 't', 'true'].includes((process.env.REUSE_V8_CONTEXT ?? 'true').toLowerCase()),
180180
});
181181

182182
await withOptionalStatusServer(worker, statusPort, async () => {

packages/test/src/test-isolation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const test = anyTest as TestFn<Context>;
1818

1919
const withReusableContext = test.macro<[ImplementationFn<[], Context>]>(async (t, fn) => {
2020
if (!REUSE_V8_CONTEXT) {
21-
t.pass('Skipped since REUSE_V8_CONTEXT is not set');
21+
t.pass('Skipped since REUSE_V8_CONTEXT is set to false');
2222
return;
2323
}
2424
await fn(t);

packages/worker/src/worker-options.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -487,9 +487,10 @@ export interface WorkerOptions {
487487
* From running basic stress tests we've observed 2/3 reduction in memory usage and 1/3 to 1/2 in CPU usage with this
488488
* feature turned on.
489489
*
490-
* Note that we plan to turn this option on by default starting with 1.9.0.
491-
*
492-
* @default false (will change in the future)
490+
* @default true
491+
* @deprecated There is currently no known reason to disable the Reuse V8 Context execution model.
492+
* The legacy execution model will be completely removed at some point in the future (no earlier than 1.10.0).
493+
* Please report any issue that requires you to disable `reuseV8Context`.
493494
*/
494495
reuseV8Context?: boolean;
495496

@@ -665,7 +666,6 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
665666
maxCachedWorkflows,
666667
showStackTraceSources,
667668
namespace,
668-
reuseV8Context,
669669
sinks,
670670
nonStickyToStickyPollRatio,
671671
interceptors,
@@ -675,6 +675,9 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
675675
const maxConcurrentWorkflowTaskExecutions = options.maxConcurrentWorkflowTaskExecutions ?? 40;
676676
const maxConcurrentActivityTaskExecutions = options.maxConcurrentActivityTaskExecutions ?? 100;
677677

678+
// eslint-disable-next-line deprecation/deprecation
679+
const reuseV8Context = options.reuseV8Context ?? true;
680+
678681
const heapSizeMiB = v8.getHeapStatistics().heap_size_limit / MiB;
679682
const defaultMaxCachedWorkflows = reuseV8Context
680683
? Math.max(Math.floor((Math.max(heapSizeMiB - 200, 0) * 600) / 1024), 10)
@@ -702,7 +705,6 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
702705
workflowThreadPoolSize: reuseV8Context ? 1 : 2,
703706
maxCachedWorkflows: maxCachedWorkflows ?? defaultMaxCachedWorkflows,
704707
showStackTraceSources: showStackTraceSources ?? false,
705-
reuseV8Context: reuseV8Context ?? false,
706708
debugMode: debugMode ?? false,
707709
interceptors: {
708710
activity: interceptors?.activity ?? [],
@@ -718,6 +720,7 @@ export function addDefaultWorkerOptions(options: WorkerOptions): WorkerOptionsWi
718720
...sinks,
719721
},
720722
...rest,
723+
reuseV8Context,
721724
maxConcurrentWorkflowTaskExecutions,
722725
maxConcurrentActivityTaskExecutions,
723726
};

packages/worker/src/worker.ts

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ export type ActivityTaskWithBase64Token = {
118118
base64TaskToken: string;
119119
};
120120

121-
type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & { buildId: string };
121+
type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & {
122+
buildId: string;
123+
};
122124

123125
/**
124126
* Combined error information for {@link Worker.runUntil}
@@ -208,7 +210,10 @@ export class NativeWorker implements NativeWorkerLike {
208210
public static async createReplay(options: CompiledWorkerOptionsWithBuildId): Promise<NativeReplayHandle> {
209211
const runtime = Runtime.instance();
210212
const replayer = await runtime.createReplayWorker(options);
211-
return { worker: new NativeWorker(runtime, replayer.worker), historyPusher: replayer.pusher };
213+
return {
214+
worker: new NativeWorker(runtime, replayer.worker),
215+
historyPusher: replayer.pusher,
216+
};
212217
}
213218

214219
protected constructor(protected readonly runtime: Runtime, protected readonly nativeWorker: native.Worker) {
@@ -461,6 +466,7 @@ export class Worker {
461466
// This isn't required for vscode, only for Chrome Dev Tools which doesn't support debugging worker threads.
462467
// We also rely on this in debug-replayer where we inject a global variable to be read from workflow context.
463468
if (compiledOptions.debugMode) {
469+
// eslint-disable-next-line deprecation/deprecation
464470
if (compiledOptions.reuseV8Context) {
465471
return await ReusableVMWorkflowCreator.create(
466472
workflowBundle,
@@ -478,7 +484,8 @@ export class Worker {
478484
workflowBundle,
479485
threadPoolSize: compiledOptions.workflowThreadPoolSize,
480486
isolateExecutionTimeoutMs: compiledOptions.isolateExecutionTimeoutMs,
481-
reuseV8Context: compiledOptions.reuseV8Context ?? false,
487+
// eslint-disable-next-line deprecation/deprecation
488+
reuseV8Context: compiledOptions.reuseV8Context ?? true,
482489
registeredActivityNames,
483490
});
484491
}
@@ -828,7 +835,11 @@ export class Worker {
828835
type: 'result';
829836
result: coresdk.activity_result.IActivityExecutionResult;
830837
}
831-
| { type: 'run'; activity: Activity; input: ActivityExecuteInput }
838+
| {
839+
type: 'run';
840+
activity: Activity;
841+
input: ActivityExecuteInput;
842+
}
832843
| { type: 'ignore' };
833844
switch (variant) {
834845
case 'start': {
@@ -854,7 +865,10 @@ export class Worker {
854865
message: `Activity function ${activityType} is not registered on this Worker, available activities: ${JSON.stringify(
855866
Object.keys(this.options.activities ?? {})
856867
)}`,
857-
applicationFailureInfo: { type: 'NotFoundError', nonRetryable: false },
868+
applicationFailureInfo: {
869+
type: 'NotFoundError',
870+
nonRetryable: false,
871+
},
858872
},
859873
},
860874
},
@@ -912,7 +926,9 @@ export class Worker {
912926
case 'cancel': {
913927
output = { type: 'ignore' };
914928
if (activity === undefined) {
915-
this.log.error('Tried to cancel a non-existing activity', { taskToken: base64TaskToken });
929+
this.log.error('Tried to cancel a non-existing activity', {
930+
taskToken: base64TaskToken,
931+
});
916932
break;
917933
}
918934
// NOTE: activity will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure)
@@ -1013,15 +1029,24 @@ export class Worker {
10131029
// Core has indicated that it will not return any more poll results, evict all cached WFs
10141030
filter((state) => state !== 'POLLING'),
10151031
first(),
1016-
map((): { activation: coresdk.workflow_activation.WorkflowActivation; synthetic: true } => {
1017-
return {
1018-
activation: coresdk.workflow_activation.WorkflowActivation.create({
1019-
runId: group$.key,
1020-
jobs: [{ removeFromCache: Worker.SELF_INDUCED_SHUTDOWN_EVICTION }],
1021-
}),
1022-
synthetic: true,
1023-
};
1024-
}),
1032+
map(
1033+
(): {
1034+
activation: coresdk.workflow_activation.WorkflowActivation;
1035+
synthetic: true;
1036+
} => {
1037+
return {
1038+
activation: coresdk.workflow_activation.WorkflowActivation.create({
1039+
runId: group$.key,
1040+
jobs: [
1041+
{
1042+
removeFromCache: Worker.SELF_INDUCED_SHUTDOWN_EVICTION,
1043+
},
1044+
],
1045+
}),
1046+
synthetic: true,
1047+
};
1048+
}
1049+
),
10251050
takeUntil(group$.pipe(last(undefined, null)))
10261051
)
10271052
).pipe(
@@ -1227,7 +1252,10 @@ export class Worker {
12271252
}).finish();
12281253
// We do not dispose of the Workflow yet, wait to be evicted from Core.
12291254
// This is done to simplify the Workflow lifecycle so Core is the sole driver.
1230-
return { state: undefined, output: { close: true, completion } };
1255+
return {
1256+
state: undefined,
1257+
output: { close: true, completion },
1258+
};
12311259
}
12321260
},
12331261
undefined
@@ -1298,7 +1326,10 @@ export class Worker {
12981326
*/
12991327
protected activityHeartbeat$(): Observable<void> {
13001328
function process(state: HeartbeatState, heartbeat: Heartbeat): HeartbeatStateAndOutput {
1301-
return { state: { ...state, processing: true, pending: undefined }, output: { type: 'send', heartbeat } };
1329+
return {
1330+
state: { ...state, processing: true, pending: undefined },
1331+
output: { type: 'send', heartbeat },
1332+
};
13021333
}
13031334

13041335
function storePending(state: HeartbeatState, heartbeat: Heartbeat): HeartbeatStateAndOutput {
@@ -1307,7 +1338,12 @@ export class Worker {
13071338

13081339
function complete(callback: () => void): HeartbeatStateAndOutput {
13091340
return {
1310-
state: { pending: undefined, completionCallback: undefined, processing: false, closed: true },
1341+
state: {
1342+
pending: undefined,
1343+
completionCallback: undefined,
1344+
processing: false,
1345+
closed: true,
1346+
},
13111347
output: { type: 'close', completionCallback: callback },
13121348
};
13131349
}
@@ -1325,7 +1361,10 @@ export class Worker {
13251361
(state: HeartbeatState, input: HeartbeatInput): HeartbeatStateAndOutput => {
13261362
if (input.type === 'create') {
13271363
this.numHeartbeatingActivitiesSubject.next(this.numHeartbeatingActivitiesSubject.value + 1);
1328-
return { state: { processing: false, closed: false }, output: null };
1364+
return {
1365+
state: { processing: false, closed: false },
1366+
output: null,
1367+
};
13291368
}
13301369
// Ignore any input if we've marked this activity heartbeat stream as closed
13311370
// (rogue heartbeat)
@@ -1350,7 +1389,10 @@ export class Worker {
13501389
return complete(state.completionCallback);
13511390
} else {
13521391
// Nothing to do, wait for completion or heartbeat
1353-
return { state: { ...state, processing: false }, output: null };
1392+
return {
1393+
state: { ...state, processing: false },
1394+
output: null,
1395+
};
13541396
}
13551397
case 'completion':
13561398
if (state.processing) {
@@ -1409,7 +1451,10 @@ export class Worker {
14091451
}).finish();
14101452
this.nativeWorker.recordActivityHeartbeat(byteArrayToBuffer(arr));
14111453
} finally {
1412-
this.activityHeartbeatSubject.next({ type: 'flush', base64TaskToken });
1454+
this.activityHeartbeatSubject.next({
1455+
type: 'flush',
1456+
base64TaskToken,
1457+
});
14131458
}
14141459
}),
14151460
tap({ complete: group$.close })
@@ -1484,7 +1529,10 @@ export class Worker {
14841529
const task = coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer));
14851530
const { taskToken, ...rest } = task;
14861531
const base64TaskToken = formatTaskToken(taskToken);
1487-
this.log.trace('Got activity task', { taskToken: base64TaskToken, ...rest });
1532+
this.log.trace('Got activity task', {
1533+
taskToken: base64TaskToken,
1534+
...rest,
1535+
});
14881536
const { variant } = task;
14891537
if (variant === undefined) {
14901538
throw new TypeError('Got an activity task without a "variant" attribute');

0 commit comments

Comments
 (0)