Skip to content

Commit a9098ee

Browse files
authored
fix(workflow)!: Make breakpoints work inside workflow isolate context (#819)
1 parent a57df99 commit a9098ee

File tree

11 files changed

+107
-72
lines changed

11 files changed

+107
-72
lines changed

packages/test/src/test-bundler.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ if (RUN_INTEGRATION_TESTS) {
3737

3838
test('Worker can be created from bundle path', async (t) => {
3939
const taskQueue = `${t.title}-${uuid4()}`;
40-
const { code, sourceMap } = await bundleWorkflowCode({
40+
const { code } = await bundleWorkflowCode({
4141
workflowsPath: require.resolve('./workflows'),
4242
});
4343
const uid = uuid4();
4444
const codePath = pathJoin(os.tmpdir(), `workflow-bundle-${uid}.js`);
45-
const sourceMapPath = pathJoin(os.tmpdir(), `workflow-bundle-${uid}.map.js`);
46-
await Promise.all([writeFile(codePath, code), writeFile(sourceMapPath, sourceMap)]);
47-
const workflowBundle = { codePath, sourceMapPath };
45+
await writeFile(codePath, code);
46+
const workflowBundle = { codePath };
4847
const worker = await Worker.create({
4948
taskQueue,
5049
workflowBundle,

packages/test/src/test-local-activities.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ApplicationFailure, defaultPayloadConverter, WorkflowClient, WorkflowFailedError } from '@temporalio/client';
22
import { temporal } from '@temporalio/proto';
3-
import { bundleWorkflowCode, Worker, WorkflowBundleWithSourceMap } from '@temporalio/worker';
3+
import { bundleWorkflowCode, Worker, WorkflowBundle } from '@temporalio/worker';
44
import { isCancellation } from '@temporalio/workflow';
55
import anyTest, { TestInterface } from 'ava';
66
import { firstValueFrom, Subject } from 'rxjs';
@@ -10,7 +10,7 @@ import { RUN_INTEGRATION_TESTS } from './helpers';
1010
import * as workflows from './workflows/local-activity-testers';
1111

1212
interface Context {
13-
workflowBundle: WorkflowBundleWithSourceMap;
13+
workflowBundle: WorkflowBundle;
1414
taskQueue: string;
1515
client: WorkflowClient;
1616
getWorker: () => Promise<Worker>;

packages/test/src/test-workflows.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { msToTs } from '@temporalio/internal-workflow-common';
1010
import { coresdk } from '@temporalio/proto';
1111
import { WorkflowCodeBundler } from '@temporalio/worker/lib/workflow/bundler';
1212
import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/vm';
13+
import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
1314
import { WorkflowInfo } from '@temporalio/workflow';
1415
import anyTest, { ExecutionContext, TestInterface } from 'ava';
1516
import dedent from 'dedent';
@@ -46,8 +47,8 @@ const test = anyTest as TestInterface<Context>;
4647
test.before(async (t) => {
4748
const workflowsPath = path.join(__dirname, 'workflows');
4849
const bundler = new WorkflowCodeBundler({ workflowsPath });
49-
const { code, sourceMap } = await bundler.createBundle();
50-
t.context.workflowCreator = await TestVMWorkflowCreator.create(code, sourceMap, 100);
50+
const workflowBundle = parseWorkflowCode((await bundler.createBundle()).code);
51+
t.context.workflowCreator = await TestVMWorkflowCreator.create(workflowBundle, 100);
5152
});
5253

5354
test.after.always(async (t) => {

packages/worker/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export {
3434
ReplayWorkerOptions,
3535
WorkerOptions,
3636
WorkflowBundleOption,
37-
WorkflowBundlePathWithSourceMap,
37+
WorkflowBundlePath,
3838
} from './worker-options';
3939
export { WorkflowInboundLogInterceptor, workflowLogAttributes } from './workflow-log-interceptor';
40-
export { BundleOptions, bundleWorkflowCode, WorkflowBundleWithSourceMap } from './workflow/bundler';
40+
export { BundleOptions, bundleWorkflowCode, WorkflowBundle } from './workflow/bundler';

packages/worker/src/worker-options.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,25 @@ import { Runtime } from './runtime';
99
import { InjectedSinks } from './sinks';
1010
import { GiB } from './utils';
1111
import { LoggerSinks } from './workflow-log-interceptor';
12-
import { WorkflowBundleWithSourceMap } from './workflow/bundler';
12+
import { WorkflowBundle } from './workflow/bundler';
1313
import * as v8 from 'v8';
1414
import * as os from 'os';
1515

1616
export type { WebpackConfiguration };
1717

18-
export interface WorkflowBundlePathWithSourceMap {
18+
export interface WorkflowBundlePath {
1919
codePath: string;
20-
sourceMapPath: string;
2120
}
22-
export type WorkflowBundleOption = WorkflowBundleWithSourceMap | WorkflowBundlePathWithSourceMap;
21+
export type WorkflowBundleOption = WorkflowBundle | WorkflowBundlePath;
2322

24-
export function isCodeBundleOption(bundleOpt: WorkflowBundleOption): bundleOpt is WorkflowBundleWithSourceMap {
23+
export function isCodeBundleOption(bundleOpt: WorkflowBundleOption): bundleOpt is WorkflowBundle {
2524
const opt = bundleOpt as any; // Cast to access properties without TS complaining
26-
return typeof opt.code === 'string' && typeof opt.sourceMap === 'string';
25+
return typeof opt.code === 'string';
2726
}
2827

29-
export function isPathBundleOption(bundleOpt: WorkflowBundleOption): bundleOpt is WorkflowBundlePathWithSourceMap {
28+
export function isPathBundleOption(bundleOpt: WorkflowBundleOption): bundleOpt is WorkflowBundlePath {
3029
const opt = bundleOpt as any; // Cast to access properties without TS complaining
31-
return typeof opt.codePath === 'string' && opt.sourceMapPath;
30+
return typeof opt.codePath === 'string';
3231
}
3332

3433
/**

packages/worker/src/worker.ts

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,13 @@ import {
7575
WorkerOptions,
7676
} from './worker-options';
7777
import { WorkflowCodecRunner } from './workflow-codec-runner';
78-
import { WorkflowBundleWithSourceMap, WorkflowCodeBundler } from './workflow/bundler';
78+
import { WorkflowBundle, WorkflowCodeBundler } from './workflow/bundler';
7979
import { Workflow, WorkflowCreator } from './workflow/interface';
8080
import { ThreadedVMWorkflowCreator } from './workflow/threaded-vm';
8181
import { VMWorkflowCreator } from './workflow/vm';
82+
import type { RawSourceMap } from 'source-map';
83+
import * as vm from 'node:vm';
84+
import * as path from 'node:path';
8285

8386
import IWorkflowActivationJob = coresdk.workflow_activation.IWorkflowActivationJob;
8487
import EvictionReason = coresdk.workflow_activation.RemoveFromCache.EvictionReason;
@@ -145,13 +148,9 @@ export interface WorkerConstructor {
145148
create(
146149
connection: NativeConnection,
147150
options: CompiledWorkerOptions,
148-
bundle?: WorkflowBundleWithSourceMap
149-
): Promise<NativeWorkerLike>;
150-
createReplay(
151-
options: CompiledWorkerOptions,
152-
history: History,
153-
bundle: WorkflowBundleWithSourceMap
151+
bundle?: WorkflowBundle
154152
): Promise<NativeWorkerLike>;
153+
createReplay(options: CompiledWorkerOptions, history: History, bundle: WorkflowBundle): Promise<NativeWorkerLike>;
155154
}
156155

157156
function isOptionsWithBuildId<T extends CompiledWorkerOptions>(options: T): options is T & { buildId: string } {
@@ -180,7 +179,7 @@ export class NativeWorker implements NativeWorkerLike {
180179
public static async create(
181180
connection: NativeConnection,
182181
options: CompiledWorkerOptions,
183-
bundle?: WorkflowBundleWithSourceMap
182+
bundle?: WorkflowBundle
184183
): Promise<NativeWorkerLike> {
185184
const runtime = Runtime.instance();
186185
const nativeWorker = await runtime.registerWorker(
@@ -193,7 +192,7 @@ export class NativeWorker implements NativeWorkerLike {
193192
public static async createReplay(
194193
options: CompiledWorkerOptions,
195194
history: History,
196-
bundle: WorkflowBundleWithSourceMap
195+
bundle: WorkflowBundle
197196
): Promise<NativeWorkerLike> {
198197
const runtime = Runtime.instance();
199198
const nativeWorker = await runtime.createReplayWorker(addBuildIdIfMissing(options, bundle.code), history);
@@ -410,9 +409,8 @@ export class Worker {
410409
...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle)
411410
? {
412411
// Avoid dumping workflow bundle code to the console
413-
workflowBundle: <WorkflowBundleWithSourceMap>{
412+
workflowBundle: <WorkflowBundle>{
414413
code: `<string of length ${compiledOptions.workflowBundle.code.length}>`,
415-
sourceMap: `<string of length ${compiledOptions.workflowBundle.sourceMap.length}>`,
416414
},
417415
}
418416
: {}),
@@ -441,14 +439,14 @@ export class Worker {
441439
}
442440

443441
protected static async createWorkflowCreator(
444-
bundle: WorkflowBundleWithSourceMap,
442+
workflowBundle: WorkflowBundleWithSourceMap,
445443
compiledOptions: CompiledWorkerOptions
446444
): Promise<WorkflowCreator> {
447445
if (compiledOptions.debugMode) {
448-
return await VMWorkflowCreator.create(bundle.code, bundle.sourceMap, compiledOptions.isolateExecutionTimeoutMs);
446+
return await VMWorkflowCreator.create(workflowBundle, compiledOptions.isolateExecutionTimeoutMs);
449447
} else {
450448
return await ThreadedVMWorkflowCreator.create({
451-
...bundle,
449+
workflowBundle,
452450
threadPoolSize: compiledOptions.workflowThreadPoolSize,
453451
isolateExecutionTimeoutMs: compiledOptions.isolateExecutionTimeoutMs,
454452
});
@@ -560,20 +558,17 @@ export class Worker {
560558
});
561559
const bundle = await bundler.createBundle();
562560
logger.info('Workflow bundle created', { size: `${toMB(bundle.code.length)}MB` });
563-
return bundle;
561+
return parseWorkflowCode(bundle.code);
564562
} else if (compiledOptions.workflowBundle) {
565563
if (compiledOptions.bundlerOptions) {
566564
throw new ValueError(`You cannot set both WorkerOptions.workflowBundle and .bundlerOptions`);
567565
}
568566

569567
if (isCodeBundleOption(compiledOptions.workflowBundle)) {
570-
return compiledOptions.workflowBundle;
568+
return parseWorkflowCode(compiledOptions.workflowBundle.code);
571569
} else if (isPathBundleOption(compiledOptions.workflowBundle)) {
572-
const [code, sourceMap] = await Promise.all([
573-
fs.readFile(compiledOptions.workflowBundle.codePath, 'utf8'),
574-
fs.readFile(compiledOptions.workflowBundle.sourceMapPath, 'utf8'),
575-
]);
576-
return { code, sourceMap };
570+
const code = await fs.readFile(compiledOptions.workflowBundle.codePath, 'utf8');
571+
return parseWorkflowCode(code, compiledOptions.workflowBundle.codePath);
577572
} else {
578573
throw new TypeError('Invalid WorkflowOptions.workflowBundle');
579574
}
@@ -1649,6 +1644,51 @@ export class Worker {
16491644
}
16501645
}
16511646

1647+
export interface WorkflowBundleWithSourceMap {
1648+
code: string;
1649+
sourceMap: RawSourceMap;
1650+
filename: string;
1651+
}
1652+
1653+
export function parseWorkflowCode(code: string, codePath?: string): WorkflowBundleWithSourceMap {
1654+
const sourceMappingUrlDataRegex = /\s*\n[/][/][#]\s+sourceMappingURL=data:(?:[^,]*;)base64,([0-9A-Za-z+/=]+)\s*$/;
1655+
const sourceMapMatcher = code.match(sourceMappingUrlDataRegex);
1656+
if (!sourceMapMatcher) throw new Error("Can't extract inlined source map from the provided Workflow Bundle");
1657+
1658+
const sourceMapJson = Buffer.from(sourceMapMatcher[1], 'base64').toString();
1659+
const sourceMap: RawSourceMap = JSON.parse(sourceMapJson);
1660+
1661+
// JS debuggers (at least VSCode's) have a few requirements regarding the script and its source map, notably:
1662+
// - The script file name's must look like an absolute path (relative paths are treated as node internals scripts)
1663+
// - If the script contains a sourceMapURL directive, the executable 'file' indicated by the source map must match the
1664+
// filename of the script itself. If the source map's file is a relative path, then it gets resolved relative to cwd
1665+
const filename = path.resolve(process.cwd(), codePath ?? sourceMap.file);
1666+
if (filename !== codePath) {
1667+
sourceMap.file = filename;
1668+
const patchedSourceMapJson = Buffer.from(JSON.stringify(sourceMap)).toString('base64');
1669+
const fixedSourceMappingUrl = `\n//# sourceMappingURL=data:application/json;base64,${patchedSourceMapJson}`;
1670+
code = code.slice(0, -sourceMapMatcher[1].length) + fixedSourceMappingUrl;
1671+
}
1672+
1673+
// Preloading the script makes breakpoints significantly more reliable and more responsive
1674+
let script: vm.Script | undefined = new vm.Script(code, { filename });
1675+
let context: any = vm.createContext({});
1676+
try {
1677+
script.runInContext(context);
1678+
} catch (e) {
1679+
// Context has not been properly configured, so eventual errors are possible. Just ignore at this point
1680+
}
1681+
1682+
// Keep these objects from GC long enough for debugger to complete parsing the source map and reporting locations
1683+
// to the node process. Otherwise, the debugger risks source mapping resolution errors, meaning breakpoints wont work.
1684+
setTimeout(() => {
1685+
script = undefined;
1686+
context = undefined;
1687+
}, 10000);
1688+
1689+
return { code, sourceMap, filename };
1690+
}
1691+
16521692
type NonNullableObject<T> = { [P in keyof T]-?: NonNullable<T[P]> };
16531693

16541694
/**

packages/worker/src/workflow/bundler.ts

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ export function moduleMatches(userModule: string, modules: string[]): boolean {
2626
return modules.some((module) => userModule === module || userModule.startsWith(`${module}/`));
2727
}
2828

29-
export interface WorkflowBundleWithSourceMap {
29+
export interface WorkflowBundle {
3030
code: string;
31-
sourceMap: string;
3231
}
3332

3433
/**
@@ -64,9 +63,9 @@ export class WorkflowCodeBundler {
6463
}
6564

6665
/**
67-
* @return a {@link WorkflowBundleWithSourceMap} containing bundled code and source map
66+
* @return a {@link WorkflowBundle} containing bundled code, including inlined source map
6867
*/
69-
public async createBundle(): Promise<WorkflowBundleWithSourceMap> {
68+
public async createBundle(): Promise<WorkflowBundle> {
7069
const vol = new memfs.Volume();
7170
const ufs = new unionfs.Union();
7271

@@ -99,12 +98,11 @@ export class WorkflowCodeBundler {
9998
const entrypointPath = this.makeEntrypointPath(ufs, this.workflowsPath);
10099

101100
this.genEntrypoint(vol, entrypointPath);
102-
await this.bundle(ufs, memoryFs, entrypointPath, distDir);
101+
const bundleFilePath = await this.bundle(ufs, memoryFs, entrypointPath, distDir);
103102

104103
// Cast because the type definitions are inaccurate
105104
return {
106-
code: memoryFs.readFileSync(path.join(distDir, 'main.js'), 'utf8') as string,
107-
sourceMap: memoryFs.readFileSync(path.join(distDir, 'main.source.js'), 'utf8') as string,
105+
code: memoryFs.readFileSync(bundleFilePath, 'utf8') as string,
108106
};
109107
}
110108

@@ -165,7 +163,7 @@ export { api };
165163
outputFilesystem: memfs.IFs,
166164
entry: string,
167165
distDir: string
168-
): Promise<void> {
166+
): Promise<string> {
169167
const captureProblematicModules: webpack.Configuration['externals'] = async (
170168
data,
171169
_callback
@@ -221,11 +219,10 @@ export { api };
221219
},
222220
entry: [entry],
223221
mode: 'development',
224-
devtool: 'source-map',
222+
devtool: 'inline-source-map',
225223
output: {
226224
path: distDir,
227-
filename: 'main.js',
228-
sourceMapFilename: 'main.source.js',
225+
filename: 'workflow-isolate-[fullhash].js',
229226
devtoolModuleFilenameTemplate: '[absolute-resource-path]',
230227
library: '__TEMPORAL__',
231228
},
@@ -241,7 +238,7 @@ export { api };
241238
compiler.outputFileSystem = outputFilesystem as any;
242239

243240
try {
244-
await new Promise<void>((resolve, reject) => {
241+
return await new Promise<string>((resolve, reject) => {
245242
compiler.run((err, stats) => {
246243
if (stats !== undefined) {
247244
const hasError = stats.hasErrors();
@@ -274,12 +271,13 @@ export { api };
274271

275272
reject(err);
276273
}
274+
275+
const outputFilename = Object.keys(stats.compilation.assets)[0];
276+
if (!err) {
277+
resolve(path.join(distDir, outputFilename));
278+
}
277279
}
278-
if (err) {
279-
reject(err);
280-
} else {
281-
resolve();
282-
}
280+
reject(err);
283281
});
284282
});
285283
} finally {
@@ -335,7 +333,7 @@ export interface BundleOptions {
335333
* When using with {@link Worker.runReplayHistory}, make sure to pass the same interceptors and payload converter used
336334
* when the history was generated.
337335
*/
338-
export async function bundleWorkflowCode(options: BundleOptions): Promise<WorkflowBundleWithSourceMap> {
336+
export async function bundleWorkflowCode(options: BundleOptions): Promise<WorkflowBundle> {
339337
const bundler = new WorkflowCodeBundler(options);
340338
return await bundler.createBundle();
341339
}

packages/worker/src/workflow/threaded-vm.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { coresdk } from '@temporalio/proto';
1313
import { IllegalStateError, SinkCall } from '@temporalio/workflow';
1414
import { Worker as NodeWorker } from 'worker_threads';
1515
import { UnexpectedError } from '../errors';
16+
import { WorkflowBundleWithSourceMap } from '../worker';
1617
import { Workflow, WorkflowCreateOptions, WorkflowCreator } from './interface';
1718
import { WorkerThreadInput, WorkerThreadRequest } from './workflow-worker-thread/input';
1819
import { WorkerThreadOutput, WorkerThreadResponse } from './workflow-worker-thread/output';
@@ -118,8 +119,7 @@ export class WorkerThreadClient {
118119
}
119120

120121
export interface ThreadedVMWorkflowCreatorOptions {
121-
code: string;
122-
sourceMap: string;
122+
workflowBundle: WorkflowBundleWithSourceMap;
123123
threadPoolSize: number;
124124
isolateExecutionTimeoutMs: number;
125125
}
@@ -137,15 +137,14 @@ export class ThreadedVMWorkflowCreator implements WorkflowCreator {
137137
*/
138138
static async create({
139139
threadPoolSize,
140-
code,
141-
sourceMap,
140+
workflowBundle,
142141
isolateExecutionTimeoutMs,
143142
}: ThreadedVMWorkflowCreatorOptions): Promise<ThreadedVMWorkflowCreator> {
144143
const workerThreadClients = Array(threadPoolSize)
145144
.fill(0)
146145
.map(() => new WorkerThreadClient(new NodeWorker(require.resolve('./workflow-worker-thread'))));
147146
await Promise.all(
148-
workerThreadClients.map((client) => client.send({ type: 'init', code, sourceMap, isolateExecutionTimeoutMs }))
147+
workerThreadClients.map((client) => client.send({ type: 'init', workflowBundle, isolateExecutionTimeoutMs }))
149148
);
150149
return new this(workerThreadClients);
151150
}

0 commit comments

Comments
 (0)