Skip to content

Commit b649606

Browse files
authored
feat(activity): Export some of activity.Context members at module level (#1252)
1 parent c4bb5cc commit b649606

File tree

8 files changed

+250
-122
lines changed

8 files changed

+250
-122
lines changed

packages/activity/src/index.ts

Lines changed: 141 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171

7272
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
7373
import { AsyncLocalStorage } from 'node:async_hooks';
74-
import { Logger, Duration } from '@temporalio/common';
74+
import { Logger, Duration, LogLevel, LogMetadata } from '@temporalio/common';
7575
import { msToNumber } from '@temporalio/common/lib/time';
7676
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
7777

@@ -115,78 +115,78 @@ export const asyncLocalStorage: AsyncLocalStorage<Context> = (globalThis as any)
115115
* Holds information about the current Activity Execution. Retrieved inside an Activity with `Context.current().info`.
116116
*/
117117
export interface Info {
118-
taskToken: Uint8Array;
118+
readonly taskToken: Uint8Array;
119119
/**
120120
* Base64 encoded `taskToken`
121121
*/
122-
base64TaskToken: string;
123-
activityId: string;
122+
readonly base64TaskToken: string;
123+
readonly activityId: string;
124124
/**
125125
* Exposed Activity function name
126126
*/
127-
activityType: string;
127+
readonly activityType: string;
128128
/**
129129
* The namespace this Activity is running in
130130
*/
131-
activityNamespace: string;
131+
readonly activityNamespace: string;
132132
/**
133133
* Attempt number for this activity
134134
*/
135-
attempt: number;
135+
readonly attempt: number;
136136
/**
137137
* Whether this activity is scheduled in local or remote mode
138138
*/
139-
isLocal: boolean;
139+
readonly isLocal: boolean;
140140
/**
141141
* Information about the Workflow that scheduled the Activity
142142
*/
143-
workflowExecution: {
144-
workflowId: string;
145-
runId: string;
143+
readonly workflowExecution: {
144+
readonly workflowId: string;
145+
readonly runId: string;
146146
};
147147
/**
148148
* The namespace of the Workflow that scheduled this Activity
149149
*/
150-
workflowNamespace: string;
150+
readonly workflowNamespace: string;
151151
/**
152152
* The module name of the Workflow that scheduled this Activity
153153
*/
154-
workflowType: string;
154+
readonly workflowType: string;
155155
/**
156156
* Timestamp for when this Activity was scheduled in milliseconds
157157
*/
158-
scheduledTimestampMs: number;
158+
readonly scheduledTimestampMs: number;
159159
/**
160160
* Timeout for this Activity from schedule to close in milliseconds.
161161
*/
162-
scheduleToCloseTimeoutMs: number;
162+
readonly scheduleToCloseTimeoutMs: number;
163163
/**
164164
* Timeout for this Activity from start to close in milliseconds
165165
*/
166-
startToCloseTimeoutMs: number;
166+
readonly startToCloseTimeoutMs: number;
167167
/**
168168
* Timestamp for when the current attempt of this Activity was scheduled in milliseconds
169169
*/
170-
currentAttemptScheduledTimestampMs: number;
170+
readonly currentAttemptScheduledTimestampMs: number;
171171
/**
172172
* Heartbeat timeout in milliseconds.
173173
* If this timeout is defined, the Activity must heartbeat before the timeout is reached.
174174
* The Activity must **not** heartbeat in case this timeout is not defined.
175175
*/
176-
heartbeatTimeoutMs?: number;
176+
readonly heartbeatTimeoutMs?: number;
177177
/**
178178
* The {@link Context.heartbeat | details} from the last recorded heartbeat from the last attempt of this Activity.
179179
*
180180
* Use this to resume your Activity from a checkpoint.
181181
*/
182-
heartbeatDetails: any;
182+
readonly heartbeatDetails: any;
183183

184184
/**
185185
* Task Queue the Activity is scheduled in.
186186
*
187187
* For Local Activities, this is set to the Workflow's Task Queue.
188188
*/
189-
taskQueue: string;
189+
readonly taskQueue: string;
190190
}
191191

192192
/**
@@ -200,18 +200,34 @@ export interface Info {
200200
* Call `Context.current()` from Activity code in order to get the current Activity's Context.
201201
*/
202202
export class Context {
203+
/**
204+
* Gets the context of the current Activity.
205+
*
206+
* Uses {@link https://nodejs.org/docs/latest-v14.x/api/async_hooks.html#async_hooks_class_asynclocalstorage | AsyncLocalStorage} under the hood to make it accessible in nested callbacks and promises.
207+
*/
208+
public static current(): Context {
209+
const store = asyncLocalStorage.getStore();
210+
if (store === undefined) {
211+
throw new Error('Activity context not initialized');
212+
}
213+
return store;
214+
}
215+
203216
/**
204217
* Holds information about the current executing Activity.
205218
*/
206-
public info: Info;
219+
public readonly info: Info;
220+
207221
/**
208-
* Await this promise in an Activity to get notified of cancellation.
222+
* A Promise that fails with a {@link CancelledFailure} when cancellation of this activity is requested. The promise
223+
* is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of cancellation.
209224
*
210-
* This promise will never resolve—it will only be rejected with a {@link CancelledFailure}.
225+
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
211226
*
212227
* @see [Cancellation](/api/namespaces/activity#cancellation)
213228
*/
214229
public readonly cancelled: Promise<never>;
230+
215231
/**
216232
* An {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to react to
217233
* Activity cancellation.
@@ -222,13 +238,17 @@ export class Context {
222238
* {@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options child_process}
223239
* to abort a child process, as well as other built-in node modules and modules found on npm.
224240
*
241+
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
242+
*
225243
* @see [Cancellation](/api/namespaces/activity#cancellation)
226244
*/
227245
public readonly cancellationSignal: AbortSignal;
246+
228247
/**
229248
* The heartbeat implementation, injected via the constructor.
230249
*/
231250
protected readonly heartbeatFn: (details?: any) => void;
251+
232252
/**
233253
* The logger for this Activity.
234254
*
@@ -283,33 +303,116 @@ export class Context {
283303
* :warning: Cancellation is not propagated from this function, use {@link cancelled} or {@link cancellationSignal} to
284304
* subscribe to cancellation notifications.
285305
*/
286-
public heartbeat = (details?: unknown): void => {
306+
public readonly heartbeat = (details?: unknown): void => {
287307
this.heartbeatFn(details);
288308
};
289309

290-
/**
291-
* Gets the context of the current Activity.
292-
*
293-
* Uses {@link https://nodejs.org/docs/latest-v14.x/api/async_hooks.html#async_hooks_class_asynclocalstorage | AsyncLocalStorage} under the hood to make it accessible in nested callbacks and promises.
294-
*/
295-
public static current(): Context {
296-
const store = asyncLocalStorage.getStore();
297-
if (store === undefined) {
298-
throw new Error('Activity context not initialized');
299-
}
300-
return store;
301-
}
302-
303310
/**
304311
* Helper function for sleeping in an Activity.
305312
* @param ms Sleep duration: number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
306313
* @returns A Promise that either resolves when `ms` is reached or rejects when the Activity is cancelled
307314
*/
308-
public sleep = (ms: Duration): Promise<void> => {
315+
public readonly sleep = (ms: Duration): Promise<void> => {
309316
let handle: NodeJS.Timeout;
310317
const timer = new Promise<void>((resolve) => {
311318
handle = setTimeout(resolve, msToNumber(ms));
312319
});
313320
return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]);
314321
};
315322
}
323+
324+
/**
325+
* The current Activity's context.
326+
*/
327+
export function activityInfo(): Info {
328+
// For consistency with workflow.workflowInfo(), we want activityInfo() to be a function, rather than a const object.
329+
return Context.current().info;
330+
}
331+
332+
/**
333+
* The logger for this Activity.
334+
*
335+
* This is a shortcut for `Context.current().log` (see {@link Context.log}).
336+
*/
337+
export const log: Logger = {
338+
// Context.current().log may legitimately change during the lifetime of an Activity, so we can't
339+
// just initialize that field to the value of Context.current().log and move on. Hence this indirection.
340+
log(level: LogLevel, message: string, meta?: LogMetadata): any {
341+
return Context.current().log.log(level, message, meta);
342+
},
343+
trace(message: string, meta?: LogMetadata): any {
344+
return Context.current().log.trace(message, meta);
345+
},
346+
debug(message: string, meta?: LogMetadata): any {
347+
return Context.current().log.debug(message, meta);
348+
},
349+
info(message: string, meta?: LogMetadata): any {
350+
return Context.current().log.info(message, meta);
351+
},
352+
warn(message: string, meta?: LogMetadata): any {
353+
return Context.current().log.warn(message, meta);
354+
},
355+
error(message: string, meta?: LogMetadata): any {
356+
return Context.current().log.error(message, meta);
357+
},
358+
};
359+
360+
/**
361+
* Helper function for sleeping in an Activity.
362+
*
363+
* This is a shortcut for `Context.current().sleep(ms)` (see {@link Context.sleep}).
364+
*
365+
* @param ms Sleep duration: number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
366+
* @returns A Promise that either resolves when `ms` is reached or rejects when the Activity is cancelled
367+
*/
368+
export function sleep(ms: Duration): Promise<void> {
369+
return Context.current().sleep(ms);
370+
}
371+
372+
/**
373+
* Send a {@link https://docs.temporal.io/concepts/what-is-an-activity-heartbeat | heartbeat} from an Activity.
374+
*
375+
* If an Activity times out, then during the next retry, the last value of `details` is available at
376+
* {@link Info.heartbeatDetails}. This acts as a periodic checkpoint mechanism for the progress of an Activity.
377+
*
378+
* If an Activity times out on the final retry (relevant in cases in which {@link RetryPolicy.maximumAttempts} is
379+
* set), the Activity function call in the Workflow code will throw an {@link ActivityFailure} with the `cause`
380+
* attribute set to a {@link TimeoutFailure}, which has the last value of `details` available at
381+
* {@link TimeoutFailure.lastHeartbeatDetails}.
382+
*
383+
* This is a shortcut for `Context.current().heatbeat(ms)` (see {@link Context.heartbeat}).
384+
*/
385+
export function heartbeat(details?: unknown): void {
386+
Context.current().heartbeat(details);
387+
}
388+
389+
/**
390+
* Return a Promise that fails with a {@link CancelledFailure} when cancellation of this activity is requested. The
391+
* promise is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of
392+
* cancellation.
393+
*
394+
* Note that to get notified of cancellation, an activity must _also_ do {@link Context.heartbeat}.
395+
*
396+
* This is a shortcut for `Context.current().cancelled` (see {@link Context.cancelled}).
397+
*/
398+
export function cancelled(): Promise<never> {
399+
return Context.current().cancelled;
400+
}
401+
402+
/**
403+
* Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to
404+
* react to Activity cancellation.
405+
*
406+
* This can be passed in to libraries such as
407+
* {@link https://www.npmjs.com/package/node-fetch#request-cancellation-with-abortsignal | fetch} to abort an
408+
* in-progress request and
409+
* {@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options child_process}
410+
* to abort a child process, as well as other built-in node modules and modules found on npm.
411+
*
412+
* Note that to get notified of cancellation, an activity must _also_ do {@link Context.heartbeat}.
413+
*
414+
* This is a shortcut for `Context.current().cancellationSignal` (see {@link Context.cancellationSignal}).
415+
*/
416+
export function cancellationSignal(): AbortSignal {
417+
return Context.current().cancellationSignal;
418+
}

packages/common/src/type-helpers.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,26 @@ export function SymbolBasedInstanceOfError<E extends Error>(markerName: string):
140140
});
141141
};
142142
}
143+
144+
// Thanks MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
145+
export function deepFreeze<T>(object: T): T {
146+
// Retrieve the property names defined on object
147+
const propNames = Object.getOwnPropertyNames(object);
148+
149+
// Freeze properties before freezing self
150+
for (const name of propNames) {
151+
const value = (object as any)[name];
152+
153+
if (value && typeof value === 'object') {
154+
try {
155+
deepFreeze(value);
156+
} catch (err) {
157+
// This is okay, there are some typed arrays that cannot be frozen (encodingKeys)
158+
}
159+
} else if (typeof value === 'function') {
160+
Object.freeze(value);
161+
}
162+
}
163+
164+
return Object.freeze(object);
165+
}

packages/worker/src/workflow/reusable-vm.ts

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,13 @@ import { AsyncLocalStorage } from 'node:async_hooks';
44
import vm from 'node:vm';
55
import * as internals from '@temporalio/workflow/lib/worker-interface';
66
import { IllegalStateError } from '@temporalio/common';
7+
import { deepFreeze } from '@temporalio/common/lib/type-helpers';
78
import { getTimeOfDay } from '@temporalio/core-bridge';
89
import { timeOfDayToBigint } from '../logger';
910
import { Workflow, WorkflowCreateOptions, WorkflowCreator } from './interface';
1011
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
1112
import { BaseVMWorkflow, globalHandlers, injectConsole, setUnhandledRejectionHandler } from './vm-shared';
1213

13-
// Thanks MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
14-
function deepFreeze(object: any) {
15-
// Retrieve the property names defined on object
16-
const propNames = Object.getOwnPropertyNames(object);
17-
18-
// Freeze properties before freezing self
19-
for (const name of propNames) {
20-
const value = object[name];
21-
22-
if (value && typeof value === 'object') {
23-
try {
24-
deepFreeze(value);
25-
} catch (err) {
26-
// This is okay, there are some typed arrays that cannot be frozen (encodingKeys)
27-
}
28-
} else if (typeof value === 'function') {
29-
Object.freeze(value);
30-
}
31-
}
32-
33-
return Object.freeze(object);
34-
}
35-
3614
/**
3715
* A WorkflowCreator that creates VMWorkflows in the current isolate
3816
*/

packages/worker/src/workflow/threaded-vm.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ export class VMWorkflowThreadProxy implements Workflow {
224224
if (output?.type !== 'sink-calls') {
225225
throw new TypeError(`Got invalid response output from Workflow Worker thread ${output}`);
226226
}
227+
227228
output.calls.forEach((call) => {
228-
call.workflowInfo.unsafe.now = Date.now;
229+
(call.workflowInfo.unsafe.now as any) = Date.now;
229230
});
230231
return output.calls;
231232
}

0 commit comments

Comments
 (0)