Skip to content

Commit 14189f1

Browse files
authored
feat: Workflow logger improvements (#1117)
- [Lazily initialize the default sinks logger](540896e) - [Export `log` from workflow](5a27896) - a logger that funnels messages to the default worker logger sink. ### Example: ```ts import * as workflow from '@temporalio/workflow'; export async function myWorkflow(): Promise<void> { workflow.log.info('hello from my workflow', { key: 'value' }); } ```
1 parent 5609966 commit 14189f1

File tree

10 files changed

+84
-19
lines changed

10 files changed

+84
-19
lines changed

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
} from '@temporalio/common';
1515
import { msToTs } from '@temporalio/common/lib/time';
1616
import { coresdk } from '@temporalio/proto';
17+
import { LogTimestamp } from '@temporalio/worker';
1718
import { WorkflowCodeBundler } from '@temporalio/worker/lib/workflow/bundler';
1819
import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/vm';
1920
import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm';
@@ -1532,6 +1533,7 @@ test('logAndTimeout', async (t) => {
15321533
message: 'Script execution timed out after 200ms',
15331534
});
15341535
const calls = await workflow.getAndResetSinkCalls();
1536+
delete calls[0].args[1][LogTimestamp];
15351537
t.deepEqual(calls, [
15361538
{
15371539
ifaceName: 'defaultWorkerLogger',

packages/worker/src/logger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export interface Logger {
2525

2626
export { LogLevel };
2727

28-
export const LogTimestamp = Symbol('log_timestamp');
28+
export const LogTimestamp = Symbol.for('log_timestamp');
2929

3030
const severities: LogLevel[] = ['TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR'];
3131

packages/worker/src/worker-options.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { LoggerSinks } from '@temporalio/workflow';
88
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
99
import { NativeConnection } from './connection';
1010
import { WorkerInterceptors } from './interceptors';
11+
import { Logger } from './logger';
1112
import { Runtime } from './runtime';
1213
import { InjectedSinks } from './sinks';
1314
import { GiB } from './utils';
@@ -431,31 +432,36 @@ export interface ReplayWorkerOptions
431432
*
432433
* @param logger a {@link Logger} - defaults to the {@link Runtime} singleton logger.
433434
*/
434-
export function defaultSinks(logger = Runtime.instance().logger): InjectedSinks<LoggerSinks> {
435+
export function defaultSinks(logger?: Logger): InjectedSinks<LoggerSinks> {
435436
return {
436437
defaultWorkerLogger: {
437438
trace: {
438439
fn(_, message, attrs) {
440+
logger ??= Runtime.instance().logger;
439441
logger.trace(message, attrs);
440442
},
441443
},
442444
debug: {
443445
fn(_, message, attrs) {
446+
logger ??= Runtime.instance().logger;
444447
logger.debug(message, attrs);
445448
},
446449
},
447450
info: {
448451
fn(_, message, attrs) {
452+
logger ??= Runtime.instance().logger;
449453
logger.info(message, attrs);
450454
},
451455
},
452456
warn: {
453457
fn(_, message, attrs) {
458+
logger ??= Runtime.instance().logger;
454459
logger.warn(message, attrs);
455460
},
456461
},
457462
error: {
458463
fn(_, message, attrs) {
464+
logger ??= Runtime.instance().logger;
459465
logger.error(message, attrs);
460466
},
461467
},

packages/worker/src/workflow-log-interceptor.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import {
22
isCancellation,
3-
LoggerSinks,
43
Next,
5-
proxySinks,
64
WorkflowExecuteInput,
75
WorkflowInboundCallsInterceptor,
86
workflowInfo,
97
WorkflowInfo,
108
WorkflowInterceptorsFactory,
9+
log,
1110
} from '@temporalio/workflow';
1211
import { untrackPromise } from '@temporalio/workflow/lib/stack-helpers';
1312

@@ -31,27 +30,25 @@ export class WorkflowInboundLogInterceptor implements WorkflowInboundCallsInterc
3130
}
3231

3332
execute(input: WorkflowExecuteInput, next: Next<WorkflowInboundCallsInterceptor, 'execute'>): Promise<unknown> {
34-
const { defaultWorkerLogger: logger } = proxySinks<LoggerSinks>();
35-
36-
logger.debug('Workflow started', this.logAttributes());
33+
log.debug('Workflow started', this.logAttributes());
3734
const p = next(input).then(
3835
(res) => {
39-
logger.debug('Workflow completed', this.logAttributes());
36+
log.debug('Workflow completed', this.logAttributes());
4037
return res;
4138
},
4239
(error) => {
4340
// Avoid using instanceof checks in case the modules they're defined in loaded more than once,
4441
// e.g. by jest or when multiple versions are installed.
4542
if (typeof error === 'object' && error != null) {
4643
if (isCancellation(error)) {
47-
logger.debug('Workflow completed as cancelled', this.logAttributes());
44+
log.debug('Workflow completed as cancelled', this.logAttributes());
4845
throw error;
4946
} else if (error.name === 'ContinueAsNew') {
50-
logger.debug('Workflow continued as new', this.logAttributes());
47+
log.debug('Workflow continued as new', this.logAttributes());
5148
throw error;
5249
}
5350
}
54-
logger.warn('Workflow failed', { error, ...this.logAttributes() });
51+
log.warn('Workflow failed', { error, ...this.logAttributes() });
5552
throw error;
5653
}
5754
);

packages/worker/src/workflow/reusable-vm.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { AsyncLocalStorage } from 'node:async_hooks';
33
import vm from 'node:vm';
44
import * as internals from '@temporalio/workflow/lib/worker-interface';
55
import { IllegalStateError } from '@temporalio/common';
6+
import { getTimeOfDay } from '@temporalio/core-bridge';
7+
import { timeOfDayToBigint } from '../logger';
68
import { Workflow, WorkflowCreateOptions, WorkflowCreator } from './interface';
79
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
810
import { BaseVMWorkflow, globalHandlers, injectConsole, setUnhandledRejectionHandler } from './vm-shared';
@@ -160,7 +162,11 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
160162
}
161163
) as any;
162164

163-
workflowModule.initRuntime({ ...options, sourceMap: this.workflowBundle.sourceMap });
165+
workflowModule.initRuntime({
166+
...options,
167+
sourceMap: this.workflowBundle.sourceMap,
168+
getTimeOfDay: () => timeOfDayToBigint(getTimeOfDay()),
169+
});
164170
const activator = bag.__TEMPORAL_ACTIVATOR__ as any;
165171

166172
const newVM = new ReusableVMWorkflow(options.info, context, activator, workflowModule, isolateExecutionTimeoutMs);

packages/worker/src/workflow/vm.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import assert from 'node:assert';
22
import { AsyncLocalStorage } from 'node:async_hooks';
33
import vm from 'node:vm';
44
import { IllegalStateError } from '@temporalio/common';
5+
import { getTimeOfDay } from '@temporalio/core-bridge';
6+
import { timeOfDayToBigint } from '../logger';
57
import { Workflow, WorkflowCreateOptions, WorkflowCreator } from './interface';
68
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
79
import {
@@ -56,7 +58,11 @@ export class VMWorkflowCreator implements WorkflowCreator {
5658
}
5759
) as any;
5860

59-
workflowModule.initRuntime({ ...options, sourceMap: this.workflowBundle.sourceMap });
61+
workflowModule.initRuntime({
62+
...options,
63+
sourceMap: this.workflowBundle.sourceMap,
64+
getTimeOfDay: () => timeOfDayToBigint(getTimeOfDay()),
65+
});
6066
const activator = context.__TEMPORAL_ACTIVATOR__ as any;
6167

6268
const newVM = new VMWorkflow(options.info, context, activator, workflowModule, isolateExecutionTimeoutMs);

packages/workflow/src/interfaces.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,9 @@ export interface WorkflowCreateOptions {
384384
showStackTraceSources: boolean;
385385
}
386386

387-
export interface WorkflowCreateOptionsWithSourceMap extends WorkflowCreateOptions {
387+
export interface WorkflowCreateOptionsInternal extends WorkflowCreateOptions {
388388
sourceMap: RawSourceMap;
389+
getTimeOfDay(): bigint;
389390
}
390391

391392
/**

packages/workflow/src/internals.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import {
2929
EnhancedStackTrace,
3030
FileLocation,
3131
WorkflowInfo,
32-
WorkflowCreateOptionsWithSourceMap,
32+
WorkflowCreateOptionsInternal,
3333
} from './interfaces';
3434
import { SinkCall } from './sinks';
3535
import { untrackPromise } from './stack-helpers';
@@ -271,16 +271,26 @@ export class Activator implements ActivationHandler {
271271
*/
272272
public readonly sentPatches = new Set<string>();
273273

274+
/**
275+
* Buffered sink calls per activation
276+
*/
274277
sinkCalls = Array<SinkCall>();
275278

279+
/**
280+
* A nanosecond resolution time function, externally injected
281+
*/
282+
public readonly getTimeOfDay: () => bigint;
283+
276284
constructor({
277285
info,
278286
now,
279287
showStackTraceSources,
280288
sourceMap,
289+
getTimeOfDay,
281290
randomnessSeed,
282291
patches,
283-
}: WorkflowCreateOptionsWithSourceMap) {
292+
}: WorkflowCreateOptionsInternal) {
293+
this.getTimeOfDay = getTimeOfDay;
284294
this.info = info;
285295
this.now = now;
286296
this.showStackTraceSources = showStackTraceSources;

packages/workflow/src/worker-interface.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { coresdk } from '@temporalio/proto';
1010
import { disableStorage } from './cancellation-scope';
1111
import { DeterminismViolationError } from './errors';
1212
import { WorkflowInterceptorsFactory } from './interceptors';
13-
import { WorkflowCreateOptionsWithSourceMap, WorkflowInfo } from './interfaces';
13+
import { WorkflowCreateOptionsInternal, WorkflowInfo } from './interfaces';
1414
import { Activator, getActivator } from './internals';
1515
import { SinkCall } from './sinks';
1616
import { setActivatorUntyped } from './global-attributes';
@@ -92,7 +92,7 @@ export function overrideGlobals(): void {
9292
*
9393
* Sets required internal state and instantiates the workflow and interceptors.
9494
*/
95-
export function initRuntime(options: WorkflowCreateOptionsWithSourceMap): void {
95+
export function initRuntime(options: WorkflowCreateOptionsInternal): void {
9696
const info: WorkflowInfo = fixPrototypes(options.info);
9797
info.unsafe.now = OriginalDate.now;
9898
const activator = new Activator({ ...options, info });

packages/workflow/src/workflow.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import {
3838
WorkflowInfo,
3939
} from './interfaces';
4040
import { LocalActivityDoBackoff, getActivator, maybeGetActivator } from './internals';
41-
import { Sinks } from './sinks';
41+
import { LoggerSinks, Sinks } from './sinks';
4242
import { untrackPromise } from './stack-helpers';
4343
import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle';
4444

@@ -1253,3 +1253,40 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void
12531253

12541254
export const stackTraceQuery = defineQuery<string>('__stack_trace');
12551255
export const enhancedStackTraceQuery = defineQuery<EnhancedStackTrace>('__enhanced_stack_trace');
1256+
1257+
const loggerSinks = proxySinks<LoggerSinks>();
1258+
1259+
/**
1260+
* Symbol used by the SDK logger to extract a timestamp from log attributes.
1261+
* Also defined in `worker/logger.ts` - intentionally not shared.
1262+
*/
1263+
const LogTimestamp = Symbol.for('log_timestamp');
1264+
1265+
/**
1266+
* Default workflow logger.
1267+
* This logger is replay-aware and will omit log messages on workflow replay.
1268+
* The messages emitted by this logger are funnelled to the worker's `defaultSinks`, which are installed by default.
1269+
*
1270+
* Note that since sinks are used to power this logger, any log attributes must be transferable via the
1271+
* {@link https://nodejs.org/api/worker_threads.html#worker_threads_port_postmessage_value_transferlist | postMessage}
1272+
* API.
1273+
*
1274+
* `defaultSinks` accepts a user logger and defaults to the `Runtime`'s logger.
1275+
*
1276+
* See the documentation for `WorkerOptions`, `defaultSinks`, and `Runtime` for more information.
1277+
*/
1278+
export const log: LoggerSinks['defaultWorkerLogger'] = Object.fromEntries(
1279+
(['trace', 'debug', 'info', 'warn', 'error'] as Array<keyof LoggerSinks['defaultWorkerLogger']>).map((level) => {
1280+
return [
1281+
level,
1282+
(message: string, attrs: Record<string, unknown>) => {
1283+
return loggerSinks.defaultWorkerLogger[level](message, {
1284+
// Inject the call time in nanosecond resolution as expected by the worker logger.
1285+
[LogTimestamp]: getActivator().getTimeOfDay(),
1286+
// Only available from node 17.
1287+
...((globalThis as any).structuredClone ? (globalThis as any).structuredClone(attrs) : attrs),
1288+
});
1289+
},
1290+
];
1291+
})
1292+
) as any;

0 commit comments

Comments
 (0)