Skip to content

Commit 38b81d3

Browse files
authored
feat(activity): Add Activity Context logger (#1138)
1 parent fe1c18f commit 38b81d3

File tree

12 files changed

+172
-45
lines changed

12 files changed

+172
-45
lines changed

packages/activity/src/index.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171

7272
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
7373
import { AsyncLocalStorage } from 'node:async_hooks';
74-
import { Duration, msToNumber } from '@temporalio/common/lib/time';
74+
import { Logger, Duration } from '@temporalio/common';
75+
import { msToNumber } from '@temporalio/common/lib/time';
7576

7677
export {
7778
ActivityFunction,
@@ -243,6 +244,19 @@ export class Context {
243244
* The heartbeat implementation, injected via the constructor.
244245
*/
245246
protected readonly heartbeatFn: (details?: any) => void;
247+
/**
248+
* The logger for this Activity.
249+
*
250+
* This defaults to the `Runtime`'s Logger (see {@link Runtime.logger}). If the {@link ActivityInboundLogInterceptor}
251+
* is installed (by default, it is; see {@link WorkerOptions.interceptors}), then various attributes from the current
252+
* Activity context will automatically be included as metadata on every log entries, and some key events of the
253+
* Activity's life cycle will automatically be logged (at 'DEBUG' level for most messages; 'WARN' for failures).
254+
*
255+
* To use a different Logger, either overwrite this property from an Activity Interceptor, or explicitly register the
256+
* `ActivityInboundLogInterceptor` with your custom Logger. You may also subclass `ActivityInboundLogInterceptor` to
257+
* customize attributes that are emitted as metadata.
258+
*/
259+
public logger: Logger;
246260

247261
/**
248262
* **Not** meant to instantiated by Activity code, used by the worker.
@@ -253,12 +267,14 @@ export class Context {
253267
info: Info,
254268
cancelled: Promise<never>,
255269
cancellationSignal: AbortSignal,
256-
heartbeat: (details?: any) => void
270+
heartbeat: (details?: any) => void,
271+
logger: Logger
257272
) {
258273
this.info = info;
259274
this.cancelled = cancelled;
260275
this.cancellationSignal = cancellationSignal;
261276
this.heartbeatFn = heartbeat;
277+
this.logger = logger;
262278
}
263279

264280
/**

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export * from './errors';
1818
export * from './failure';
1919
export { Headers, Next } from './interceptors';
2020
export * from './interfaces';
21+
export * from './logger';
2122
export * from './retry-policy';
2223
export { type Timestamp, Duration, StringValue } from './time';
2324
export * from './workflow-handle';

packages/common/src/logger.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
export type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR';
2+
3+
export type LogMetadata = Record<string | symbol, any>;
4+
5+
/**
6+
* Implement this interface in order to customize worker logging
7+
*/
8+
export interface Logger {
9+
log(level: LogLevel, message: string, meta?: LogMetadata): any;
10+
trace(message: string, meta?: LogMetadata): any;
11+
debug(message: string, meta?: LogMetadata): any;
12+
info(message: string, meta?: LogMetadata): any;
13+
warn(message: string, meta?: LogMetadata): any;
14+
error(message: string, meta?: LogMetadata): any;
15+
}

packages/core-bridge/src/conversions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
323323

324324
let max_worker_activities_per_second =
325325
js_optional_getter!(cx, self, "maxActivitiesPerSecond", JsNumber)
326-
.map(|num| num.value(cx) as f64);
326+
.map(|num| num.value(cx));
327327
let max_task_queue_activities_per_second =
328328
js_optional_getter!(cx, self, "maxTaskQueueActivitiesPerSecond", JsNumber)
329-
.map(|num| num.value(cx) as f64);
329+
.map(|num| num.value(cx));
330330

331331
let graceful_shutdown_period =
332332
js_optional_getter!(cx, self, "shutdownGraceTimeMs", JsNumber)

packages/core-bridge/ts/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import { SpanContext } from '@opentelemetry/api';
2-
import { Duration } from '@temporalio/common';
2+
import { LogLevel, Duration } from '@temporalio/common';
33
import type { TLSConfig } from '@temporalio/common/lib/internal-non-workflow';
44

55
export { TLSConfig };
66

7+
/** @deprecated Import from @temporalio/common instead */
8+
export { LogLevel };
9+
710
type Shadow<Base, New> = Base extends object
811
? New extends object
912
? {
@@ -334,9 +337,6 @@ export interface WorkerOptions {
334337
maxActivitiesPerSecond?: number;
335338
}
336339

337-
/** Log level - must match rust log level names */
338-
export type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR';
339-
340340
export interface LogEntry {
341341
/** Log message */
342342
message: string;

packages/test/src/mock-native-worker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { lastValueFrom } from 'rxjs';
44
import { defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common';
55
import { msToTs } from '@temporalio/common/lib/time';
66
import { coresdk } from '@temporalio/proto';
7-
import { DefaultLogger, ShutdownError } from '@temporalio/worker';
7+
import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker';
88
import { byteArrayToBuffer } from '@temporalio/worker/lib/utils';
99
import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker';
1010
import {
@@ -164,6 +164,9 @@ export class Worker extends RealWorker {
164164
}
165165

166166
public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) {
167+
// Worker.create() accesses Runtime.instance(), which has some side effects that would not happen (or that would
168+
// happen too late) when creating a MockWorker. Force the singleton to be created now, if it doesn't already exist.
169+
Runtime.instance();
167170
const nativeWorker = new MockNativeWorker();
168171
super(nativeWorker, workflowCreator, opts);
169172
}

packages/test/src/test-activity-log-interceptor.ts

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,69 @@
11
import test from 'ava';
2-
import { ActivityInboundLogInterceptor, activityLogAttributes, DefaultLogger, LogEntry } from '@temporalio/worker';
2+
import {
3+
ActivityInboundLogInterceptor,
4+
activityLogAttributes,
5+
DefaultLogger,
6+
LogEntry,
7+
Runtime,
8+
} from '@temporalio/worker';
39
import { MockActivityEnvironment, defaultActivityInfo } from '@temporalio/testing';
410
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
511
import * as activity from '@temporalio/activity';
612
import { withZeroesHTTPServer } from './zeroes-http-server';
713
import { cancellableFetch } from './activities/cancellable-fetch';
814

15+
interface MyTestActivityContext extends activity.Context {
16+
logs: Array<LogEntry>;
17+
}
18+
19+
test.before(() => {
20+
const mockLogger = new DefaultLogger('DEBUG', (entry) => {
21+
(activity.Context.current() as MyTestActivityContext).logs ??= [];
22+
(activity.Context.current() as MyTestActivityContext).logs.push(entry);
23+
});
24+
Runtime.install({
25+
logger: mockLogger,
26+
});
27+
});
28+
29+
test("Activity Context's logger defaults to Runtime's Logger", async (t) => {
30+
const env = new MockActivityEnvironment();
31+
const logs: LogEntry[] = await env.run(async () => {
32+
const ctx = activity.Context.current();
33+
ctx.logger.debug('log message from activity');
34+
return (ctx as MyTestActivityContext).logs;
35+
});
36+
const activityLogEntry = logs.find((entry) => entry.message === 'log message from activity');
37+
t.not(activityLogEntry, undefined);
38+
t.is(activityLogEntry?.level, 'DEBUG');
39+
});
40+
41+
test("Activity Log Interceptor dont override Context's logger by default", async (t) => {
42+
const env = new MockActivityEnvironment();
43+
const logs: LogEntry[] = await env.run(async () => {
44+
const ctx = activity.Context.current();
45+
const interceptor = new ActivityInboundLogInterceptor(ctx);
46+
const execute = composeInterceptors([interceptor], 'execute', async () => {
47+
ctx.logger.debug('log message from activity');
48+
});
49+
try {
50+
await execute({ args: [], headers: {} });
51+
} catch {
52+
// ignore
53+
}
54+
return (ctx as MyTestActivityContext).logs;
55+
});
56+
const activityLogEntry = logs.find((entry) => entry.message === 'log message from activity');
57+
t.not(activityLogEntry, undefined);
58+
t.is(activityLogEntry?.level, 'DEBUG');
59+
});
60+
961
async function runActivity(
1062
fn: activity.ActivityFunction,
1163
heartbeatCallback = (_env: MockActivityEnvironment) => {
1264
// not an empty body eslint
1365
}
14-
): Promise<[LogEntry, LogEntry]> {
66+
): Promise<LogEntry[]> {
1567
const logs = Array<LogEntry>();
1668
const env = new MockActivityEnvironment();
1769
env.on('heartbeat', () => heartbeatCallback(env));
@@ -30,26 +82,38 @@ async function runActivity(
3082
// ignore
3183
}
3284
});
33-
if (logs.length !== 2) {
34-
throw new Error('Expected exactly 2 log entries');
35-
}
36-
return logs as [LogEntry, LogEntry];
85+
return logs;
3786
}
3887

88+
test('ActivityInboundLogInterceptor overrides Context logger if specified', async (t) => {
89+
const logs = await runActivity(async () => {
90+
activity.Context.current().logger.debug('log message from activity');
91+
});
92+
t.is(logs.length, 3);
93+
const [_, midLog] = logs;
94+
t.is(midLog.level, 'DEBUG');
95+
t.is(midLog.message, 'log message from activity');
96+
t.deepEqual(midLog.meta, activityLogAttributes(defaultActivityInfo));
97+
});
98+
3999
test('ActivityInboundLogInterceptor logs when activity starts', async (t) => {
40-
const [startLog] = await runActivity(async () => {
100+
const logs = await runActivity(async () => {
41101
// not an empty body eslint
42102
});
103+
t.is(logs.length, 2);
104+
const [startLog] = logs;
43105
t.is(startLog.level, 'DEBUG');
44106
t.is(startLog.message, 'Activity started');
45107
t.deepEqual(startLog.meta, activityLogAttributes(defaultActivityInfo));
46108
});
47109

48110
test('ActivityInboundLogInterceptor logs warning when activity fails', async (t) => {
49111
const err = new Error('Failed for test');
50-
const [_, endLog] = await runActivity(async () => {
112+
const logs = await runActivity(async () => {
51113
throw err;
52114
});
115+
t.is(logs.length, 2);
116+
const [_, endLog] = logs;
53117
t.is(endLog.level, 'WARN');
54118
t.is(endLog.message, 'Activity failed');
55119
const { durationMs, error, ...rest } = endLog.meta ?? {};
@@ -59,9 +123,11 @@ test('ActivityInboundLogInterceptor logs warning when activity fails', async (t)
59123
});
60124

61125
test('ActivityInboundLogInterceptor logs when activity completes async', async (t) => {
62-
const [_, endLog] = await runActivity(async () => {
126+
const logs = await runActivity(async () => {
63127
throw new activity.CompleteAsyncError();
64128
});
129+
t.is(logs.length, 2);
130+
const [_, endLog] = logs;
65131
t.is(endLog.level, 'DEBUG');
66132
t.is(endLog.message, 'Activity will complete asynchronously');
67133
const { durationMs, ...rest } = endLog.meta ?? {};
@@ -70,7 +136,7 @@ test('ActivityInboundLogInterceptor logs when activity completes async', async (
70136
});
71137

72138
test('ActivityInboundLogInterceptor logs when activity is cancelled with promise', async (t) => {
73-
const [_, endLog] = await runActivity(
139+
const logs = await runActivity(
74140
async () => {
75141
activity.Context.current().heartbeat();
76142
await activity.Context.current().cancelled;
@@ -79,6 +145,8 @@ test('ActivityInboundLogInterceptor logs when activity is cancelled with promise
79145
env.cancel();
80146
}
81147
);
148+
t.is(logs.length, 2);
149+
const [_, endLog] = logs;
82150
t.is(endLog.level, 'DEBUG');
83151
t.is(endLog.message, 'Activity completed as cancelled');
84152
const { durationMs, ...rest } = endLog.meta ?? {};
@@ -87,7 +155,7 @@ test('ActivityInboundLogInterceptor logs when activity is cancelled with promise
87155
});
88156

89157
test('ActivityInboundLogInterceptor logs when activity is cancelled with signal', async (t) => {
90-
const [_, endLog] = await runActivity(
158+
const logs = await runActivity(
91159
async () => {
92160
await withZeroesHTTPServer(async (port) => {
93161
await cancellableFetch(`http:127.0.0.1:${port}`);
@@ -97,6 +165,8 @@ test('ActivityInboundLogInterceptor logs when activity is cancelled with signal'
97165
env.cancel();
98166
}
99167
);
168+
t.is(logs.length, 2);
169+
const [_, endLog] = logs;
100170
t.is(endLog.level, 'DEBUG');
101171
t.is(endLog.message, 'Activity completed as cancelled');
102172
const { durationMs, ...rest } = endLog.meta ?? {};

packages/testing/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ export class MockActivityEnvironment extends events.EventEmitter {
409409
{ ...defaultActivityInfo, ...info },
410410
promise,
411411
abortController.signal,
412-
heartbeatCallback
412+
heartbeatCallback,
413+
Runtime.instance().logger
413414
);
414415
promise.catch(() => {
415416
/* avoid unhandled rejection */

packages/worker/src/activity-log-interceptor.ts

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,33 @@ export function activityLogAttributes(info: Info): Record<string, unknown> {
2525

2626
/** Logs Activity execution starts and their completions */
2727
export class ActivityInboundLogInterceptor implements ActivityInboundCallsInterceptor {
28-
constructor(protected readonly ctx: Context, protected readonly logger: Logger) {}
28+
/**
29+
* @deprecated Use `Context.current().logger` instead
30+
*/
31+
protected readonly logger: Logger;
32+
33+
constructor(protected readonly ctx: Context, logger?: Logger | undefined) {
34+
// If a parent logger was explicitly provided on this interceptor, then use it.
35+
// Otherwise, use the logger that is already set on the activity context.
36+
// By default, that will be Runtime.logger, but another interceptor might have overriden it,
37+
// in which case we would want to use that one as our parent logger.
38+
const parentLogger = logger ?? ctx.logger;
39+
this.logger = parentLogger; // eslint-disable-this-line deprecation/deprecation
40+
41+
this.ctx.logger = Object.fromEntries(
42+
(['trace', 'debug', 'info', 'warn', 'error'] as const).map((level) => {
43+
return [
44+
level,
45+
(message: string, attrs: Record<string, unknown>) => {
46+
return parentLogger[level](message, {
47+
...this.logAttributes(),
48+
...attrs,
49+
});
50+
},
51+
];
52+
})
53+
) as any;
54+
}
2955

3056
protected logAttributes(): Record<string, unknown> {
3157
return activityLogAttributes(this.ctx.info);
@@ -34,7 +60,7 @@ export class ActivityInboundLogInterceptor implements ActivityInboundCallsInterc
3460
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
3561
let error: any = UNINITIALIZED; // In case someone decides to throw undefined...
3662
const startTime = process.hrtime.bigint();
37-
this.logger.debug('Activity started', this.logAttributes());
63+
this.ctx.logger.debug('Activity started');
3864
try {
3965
return await next(input);
4066
} catch (err: any) {
@@ -45,18 +71,18 @@ export class ActivityInboundLogInterceptor implements ActivityInboundCallsInterc
4571
const durationMs = Number(durationNanos / 1_000_000n);
4672

4773
if (error === UNINITIALIZED) {
48-
this.logger.debug('Activity completed', { durationMs, ...this.logAttributes() });
74+
this.ctx.logger.debug('Activity completed', { durationMs });
4975
} else if (
5076
typeof error === 'object' &&
5177
error != null &&
5278
(CancelledFailure.is(error) || error.name === 'AbortError') &&
5379
this.ctx.cancellationSignal.aborted
5480
) {
55-
this.logger.debug('Activity completed as cancelled', { durationMs, ...this.logAttributes() });
81+
this.ctx.logger.debug('Activity completed as cancelled', { durationMs });
5682
} else if (CompleteAsyncError.is(error)) {
57-
this.logger.debug('Activity will complete asynchronously', { durationMs, ...this.logAttributes() });
83+
this.ctx.logger.debug('Activity will complete asynchronously', { durationMs });
5884
} else {
59-
this.logger.warn('Activity failed', { error, durationMs, ...this.logAttributes() });
85+
this.ctx.logger.warn('Activity failed', { error, durationMs });
6086
}
6187
}
6288
}

packages/worker/src/activity.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
ActivityInboundCallsInterceptor,
1717
ActivityInboundCallsInterceptorFactory,
1818
} from './interceptors';
19+
import { Runtime } from './runtime';
1920

2021
export type CancelReason =
2122
| keyof typeof coresdk.activity_task.ActivityCancelReason
@@ -47,7 +48,13 @@ export class Activity {
4748
reject(new CancelledFailure(reason));
4849
};
4950
});
50-
this.context = new Context(info, promise, this.abortController.signal, this.heartbeatCallback);
51+
this.context = new Context(
52+
info,
53+
promise,
54+
this.abortController.signal,
55+
this.heartbeatCallback,
56+
Runtime.instance().logger
57+
);
5158
// Prevent unhandled rejection
5259
promise.catch(() => undefined);
5360
this.interceptors = {

0 commit comments

Comments
 (0)