Skip to content

Commit 682003e

Browse files
authored
feat: Implement entrypoint for debug replayer (#848)
Will be used in the VS Code debugger extension
1 parent cda094c commit 682003e

File tree

12 files changed

+367
-2
lines changed

12 files changed

+367
-2
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"temporal.replayerEntrypoint": "packages/test/src/debug-replayer.ts"
3+
}

docs/debug-replay.mermaid

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
sequenceDiagram
2+
participant R as Runner (e.g. VS Code)
3+
participant MT as Node.js Main Thread
4+
participant WF as Workflow Context
5+
participant WT as Worker Thread
6+
7+
R->>MT: Launch
8+
MT->>R: Get history
9+
MT->>MT: Replay workflow
10+
loop
11+
MT->>WF: Activate workflow
12+
WF->>MT: Update runner with current eventId (blocking call)
13+
MT->>WT: Update runner (block using atomics)
14+
WT->>R: Set current event ID
15+
alt has breakpoint?
16+
R->>R: ⏸ Pause execution
17+
end
18+
R->>WT: Respond OK
19+
end

packages/test/src/debug-replayer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { startDebugReplayer } from '@temporalio/worker';
2+
3+
startDebugReplayer({
4+
workflowsPath: require.resolve('./workflows'),
5+
});

packages/test/src/workflows/http.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// @@@SNIPSTART typescript-schedule-activity-workflow
21
import { proxyActivities } from '@temporalio/workflow';
32
import type * as activities from '../activities';
43

@@ -9,4 +8,3 @@ const { httpGet } = proxyActivities<typeof activities>({
98
export async function http(): Promise<string> {
109
return await httpGet('https://temporal.io');
1110
}
12-
// @@@SNIPEND
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import http from 'node:http';
2+
import pkg from '../pkg';
3+
4+
interface ClientOptions {
5+
baseUrl: string;
6+
}
7+
8+
/**
9+
* "High level" HTTP client, used to avoid adding more dependencies to the worker package.
10+
*
11+
* DO NOT use this in a real application, it's meant to only be used for calling a "runner" (e.g. VS Code debugger
12+
* extension).
13+
*/
14+
export class Client {
15+
constructor(public readonly options: ClientOptions) {}
16+
17+
async post(url: string, options: http.RequestOptions, body: Buffer): Promise<http.IncomingMessage> {
18+
const request = http.request(`${this.options.baseUrl}/${url}`, {
19+
...options,
20+
method: 'POST',
21+
headers: {
22+
'Temporal-Client-Name': 'temporal-typescript',
23+
'Temporal-Client-Version': pkg.version,
24+
'Content-Length': body.length,
25+
...options?.headers,
26+
},
27+
});
28+
if (body) {
29+
await new Promise<void>((resolve, reject) => {
30+
request.once('error', reject);
31+
request.write(body, (err) => {
32+
request.off('error', reject);
33+
if (err) {
34+
reject();
35+
} else {
36+
resolve();
37+
}
38+
});
39+
request.end();
40+
});
41+
}
42+
const response = await new Promise<http.IncomingMessage>((resolve, reject) => {
43+
request.once('error', reject);
44+
request.once('response', resolve);
45+
});
46+
if (response.statusCode !== 200) {
47+
throw new Error(`Bad response code from VS Code: ${response.statusCode}`);
48+
}
49+
return response;
50+
}
51+
52+
async get(url: string, options?: http.RequestOptions): Promise<http.IncomingMessage> {
53+
const request = http.get(`${this.options.baseUrl}/${url}`, {
54+
...options,
55+
headers: {
56+
'Temporal-Client-Name': 'temporal-typescript',
57+
'Temporal-Client-Version': pkg.version,
58+
...options?.headers,
59+
},
60+
});
61+
const response = await new Promise<http.IncomingMessage>((resolve, reject) => {
62+
request.once('error', reject);
63+
request.once('response', resolve);
64+
});
65+
if (response.statusCode !== 200) {
66+
throw new Error(`Bad response code from VS Code: ${response.statusCode}`);
67+
}
68+
return response;
69+
}
70+
71+
static async readAll(response: http.IncomingMessage): Promise<Buffer> {
72+
const chunks = Array<Buffer>();
73+
for await (const chunk of response) {
74+
chunks.push(chunk);
75+
}
76+
return Buffer.concat(chunks);
77+
}
78+
79+
static contentLength(response: http.IncomingMessage): number {
80+
const contentLength = response.headers['content-length'];
81+
if (!contentLength) {
82+
throw new Error('Empty response body when getting history');
83+
}
84+
return parseInt(contentLength);
85+
}
86+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Debug replayer workflow inbound interceptors.
3+
* Notify the runner when workflow starts or a signal is received, required for setting breakpoints on workflow tasks.
4+
*
5+
* @module
6+
*/
7+
import { WorkflowInterceptorsFactory } from '@temporalio/workflow';
8+
import { notifyRunner } from './workflow-notifier';
9+
10+
export const interceptors: WorkflowInterceptorsFactory = () => ({
11+
inbound: [
12+
{
13+
execute(input, next) {
14+
notifyRunner();
15+
return next(input);
16+
},
17+
handleSignal(input, next) {
18+
notifyRunner();
19+
return next(input);
20+
},
21+
},
22+
],
23+
});
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import worker_threads from 'node:worker_threads';
2+
import { temporal } from '@temporalio/proto';
3+
import { ReplayWorkerOptions } from '../worker-options';
4+
import { Worker } from '../worker';
5+
import { Client } from './client';
6+
7+
let thread: worker_threads.Worker | undefined = undefined;
8+
9+
async function run(options: ReplayWorkerOptions): Promise<void> {
10+
const baseUrl = process.env.TEMPORAL_DEBUGGER_PLUGIN_URL;
11+
if (!baseUrl) {
12+
throw new Error('Missing TEMPORAL_DEBUGGER_PLUGIN_URL environment variable');
13+
}
14+
const client = new Client({ baseUrl });
15+
const response = await client.get('history');
16+
const history = temporal.api.history.v1.History.decode(
17+
await Client.readAll(response),
18+
Client.contentLength(response)
19+
);
20+
21+
// Only create one per process.
22+
// Not caring about globals here for to get simpler DX, this isn't meant for production use cases.
23+
thread = thread || new worker_threads.Worker(require.resolve('./worker-thread'));
24+
25+
const rejectedPromise = new Promise<void>((_, reject) => {
26+
// Set the global notifyRunner runner function that can be used from the workflow interceptors.
27+
// The function makes an HTTP request to the runner in a separate thread and blocks the current thread until a
28+
// response is received.
29+
(globalThis as any).notifyRunner = (wftStartEventId: number) => {
30+
const sab = new SharedArrayBuffer(4);
31+
const responseBuffer = new Int32Array(sab);
32+
thread?.postMessage({ eventId: wftStartEventId, responseBuffer });
33+
Atomics.wait(responseBuffer, 0, 0);
34+
if (responseBuffer[0] === 2) {
35+
// Error occurred (logged by worker thread)
36+
reject(new Error('Failed to call runner back'));
37+
}
38+
};
39+
});
40+
41+
await Promise.race([
42+
rejectedPromise,
43+
Worker.runReplayHistory(
44+
{
45+
...options,
46+
interceptors: {
47+
...options.interceptors,
48+
workflowModules: [
49+
// Inbound goes first so user can set breakpoints in own inbound interceptors.
50+
require.resolve('./inbound-interceptor'),
51+
...(options.interceptors?.workflowModules ?? []),
52+
// Outbound goes last - notifies the runner in the finally block, user-provided outbound interceptors are
53+
// resumed after the interceptor methods resolve.
54+
require.resolve('./outbound-interceptor'),
55+
],
56+
},
57+
},
58+
history
59+
),
60+
]);
61+
}
62+
63+
/**
64+
* Start a replayer for debugging purposes.
65+
*
66+
* Use this method to integrate the replayer with external debuggers like the Temporal VS Code debbuger extension.
67+
*/
68+
export function startDebugReplayer(options: ReplayWorkerOptions): void {
69+
run(options).then(
70+
() => {
71+
console.log('Replay completed successfully');
72+
process.exit(0);
73+
},
74+
(err) => {
75+
console.error('Replay failed', err);
76+
process.exit(1);
77+
}
78+
);
79+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Debug replayer workflow outbound interceptors.
3+
* Notify the runner when outbound operations resolve, required for setting breakpoints on workflow tasks.
4+
*
5+
* @module
6+
*/
7+
import { WorkflowInterceptorsFactory } from '@temporalio/workflow';
8+
import { notifyRunner } from './workflow-notifier';
9+
10+
export const interceptors: WorkflowInterceptorsFactory = () => ({
11+
outbound: [
12+
{
13+
async scheduleActivity(input, next) {
14+
try {
15+
return await next(input);
16+
} finally {
17+
notifyRunner();
18+
}
19+
},
20+
async scheduleLocalActivity(input, next) {
21+
try {
22+
return await next(input);
23+
} finally {
24+
notifyRunner();
25+
}
26+
},
27+
async startTimer(input, next) {
28+
try {
29+
return await next(input);
30+
} finally {
31+
notifyRunner();
32+
}
33+
},
34+
async signalWorkflow(input, next) {
35+
try {
36+
return await next(input);
37+
} finally {
38+
notifyRunner();
39+
}
40+
},
41+
async startChildWorkflowExecution(input, next) {
42+
const [startPromise, completePromise] = await next(input);
43+
startPromise.finally(notifyRunner);
44+
completePromise.finally(notifyRunner);
45+
return [startPromise, completePromise];
46+
},
47+
},
48+
],
49+
});
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Worker thread entrypoint used by ./index.ts to synchronously make an HTTP call to the "runner".
3+
*/
4+
import { isMainThread, parentPort as parentPortOrNull } from 'node:worker_threads';
5+
import { Client } from './client';
6+
7+
/**
8+
* Request from parent thread, the worker thread should signal a "runner" when it gets this request.
9+
*/
10+
export interface Request {
11+
type: 'wft-started';
12+
/**
13+
* Event ID of the started request.
14+
*/
15+
eventId: number;
16+
/**
17+
* Used to signal back that the request is complete.
18+
*/
19+
responseBuffer: Int32Array;
20+
}
21+
22+
if (isMainThread) {
23+
throw new Error(`Imported ${__filename} from main thread`);
24+
}
25+
26+
if (parentPortOrNull === null) {
27+
throw new TypeError(`${__filename} got a null parentPort`);
28+
}
29+
30+
// Create a new parentPort variable that is not nullable to please TS
31+
const parentPort = parentPortOrNull;
32+
33+
const baseUrl = process.env.TEMPORAL_DEBUGGER_PLUGIN_URL;
34+
if (!baseUrl) {
35+
throw new Error('Missing TEMPORAL_DEBUGGER_PLUGIN_URL environment variable');
36+
}
37+
const client = new Client({ baseUrl });
38+
39+
parentPort.on('message', async (request: Request) => {
40+
const { eventId, responseBuffer } = request;
41+
try {
42+
await client.post(
43+
'current-wft-started',
44+
{ headers: { 'Content-Type': 'application/json' }, timeout: 5000 },
45+
Buffer.from(JSON.stringify({ eventId }))
46+
);
47+
Atomics.store(responseBuffer, 0, 1);
48+
} catch (err) {
49+
console.error(err);
50+
Atomics.store(responseBuffer, 0, 2);
51+
} finally {
52+
Atomics.notify(responseBuffer, 0, 1);
53+
}
54+
});
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { workflowInfo } from '@temporalio/workflow';
2+
3+
class WorkflowNotifier {
4+
static _instance?: WorkflowNotifier;
5+
6+
/**
7+
* Access the singleton instance - one per workflow context
8+
*/
9+
static instance(): WorkflowNotifier {
10+
if (this._instance === undefined) {
11+
this._instance = new this();
12+
}
13+
return this._instance;
14+
}
15+
16+
private constructor() {
17+
// Dear eslint,
18+
// I left this empty to mark the constructor private, OK?
19+
//
20+
// Best regards,
21+
// - An anonymous developer
22+
}
23+
24+
lastNotifiedStartEvent = -1;
25+
26+
notifyRunner(): void {
27+
const eventId = workflowInfo().historyLength;
28+
if (this.lastNotifiedStartEvent >= eventId) return;
29+
this.lastNotifiedStartEvent = eventId;
30+
try {
31+
// Use global `notifyRunner` function, should be injected outside of workflow context.
32+
// Using globalThis.constructor.constructor, we break out of the workflow context to Node.js land.
33+
const notifyRunner = globalThis.constructor.constructor('return notifyRunner')();
34+
notifyRunner(eventId);
35+
} catch {
36+
// ignore
37+
}
38+
}
39+
}
40+
41+
/**
42+
* Notify a runner process when a workflow task is picked up
43+
*/
44+
export function notifyRunner(): void {
45+
WorkflowNotifier.instance().notifyRunner();
46+
}

packages/worker/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,4 @@ export {
4040
} from './worker-options';
4141
export { WorkflowInboundLogInterceptor, workflowLogAttributes } from './workflow-log-interceptor';
4242
export { BundleOptions, bundleWorkflowCode, WorkflowBundleWithSourceMap } from './workflow/bundler';
43+
export { startDebugReplayer } from './debug-replayer';

packages/worker/src/worker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,8 @@ export class Worker {
432432
workflowBundle: WorkflowBundleWithSourceMapAndFilename,
433433
compiledOptions: CompiledWorkerOptions
434434
): Promise<WorkflowCreator> {
435+
// This isn't required for vscode, only for Chrome Dev Tools which doesn't support debugging worker threads.
436+
// We also rely on this in debug-replayer where we inject a global variable to be read from workflow context.
435437
if (compiledOptions.debugMode) {
436438
return await VMWorkflowCreator.create(workflowBundle, compiledOptions.isolateExecutionTimeoutMs);
437439
} else {

0 commit comments

Comments
 (0)