Skip to content

Commit 1708529

Browse files
authored
Add logSource and taskQueue metadata on log messages (#1391)
1 parent 3b6c925 commit 1708529

24 files changed

+296
-139
lines changed

packages/activity/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ export class Context {
254254
* The logger for this Activity.
255255
*
256256
* This defaults to the `Runtime`'s Logger (see {@link Runtime.logger}). Attributes from the current Activity context
257-
* will automatically be included as metadata on every log entries, and some key events of the Activity's lifecycle
258-
* will automatically be logged (at 'DEBUG' level for most messages; 'WARN' for failures).
257+
* will automatically be included as metadata on every log entries. An extra `logSource` metadata attribute will also
258+
* be added, with value `activity`; this can be used for fine-grained filtering of log entries further downstream.
259259
*
260260
* To customize log attributes, register a {@link ActivityOutboundCallsInterceptor} that intercepts the
261261
* `getLogAttributes()` method.

packages/common/src/logger.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,44 @@ export interface Logger {
1313
warn(message: string, meta?: LogMetadata): any;
1414
error(message: string, meta?: LogMetadata): any;
1515
}
16+
17+
/**
18+
* Possible values of the `logSource` meta attributes on log messages. This
19+
* attribute indicates which subsystem emitted the log message; this may for
20+
* example be used to implement fine-grained filtering of log messages.
21+
*
22+
* Note that there is no guarantee that this list will remain stable in the
23+
* future; values may be added or removed, and messages that are currently
24+
* emitted with some `logSource` value may use a different value in the future.
25+
*/
26+
export enum LogSource {
27+
/**
28+
* Log Source value for messages emited from Workflow code, using the {@link Workflow
29+
* context logger|workflow.log}. The SDK itself never publishes messages with this source.
30+
*/
31+
workflow = 'workflow',
32+
33+
/**
34+
* Log Source value for messages emited from an activity, using the {@link activity
35+
* context logger|Context.log}. The SDK itself never publishes messages with this source.
36+
*/
37+
activity = 'activity',
38+
39+
/**
40+
* Log Source value for messages emited from a Temporal Worker instance.
41+
*
42+
* This notably includes:
43+
* - Issues with worker or runtime options or the JS execution environment;
44+
* - Worker's, Activity's, and Workflow's lifecycle events;
45+
* - Workflow Activation and Activity Task processing events;
46+
* - Workflow Bundling messages;
47+
* - Sink processing issues;
48+
* - Issues in interracting with the Core SDK library.
49+
*/
50+
worker = 'worker',
51+
52+
/**
53+
* Log Source value for all messages emited by the Core SDK library.
54+
*/
55+
core = 'core',
56+
}

packages/core-bridge/scripts/build.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,12 @@ function compile(requestedTarget) {
8282
const cmd = which.sync('cargo-cp-artifact');
8383

8484
console.log('Running', cmd, argv);
85-
const { status } = spawnSync(cmd, argv, {
85+
const { status, error } = spawnSync(cmd, argv, {
8686
stdio: 'inherit',
87+
shell: process.platform === 'win32',
8788
});
88-
if (status !== 0) {
89-
throw new Error(`Failed to build${target ? ' for ' + target : ''}`);
89+
if (status !== 0 || error) {
90+
throw new Error(`Failed to build${target ? ' for ' + target : ''}: status code ${status}`, error);
9091
}
9192
}
9293

packages/create-project/src/helpers/install.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ interface InstallArgs {
2020
* @returns A Promise that resolves once the installation is finished.
2121
*/
2222
export function install({ root, useYarn }: InstallArgs): Promise<void> {
23-
const npm = /^win/.test(process.platform) ? 'npm.cmd' : 'npm';
23+
const isWindows = process.platform === 'win32';
24+
const npm = isWindows ? 'npm.cmd' : 'npm';
2425
const command: string = useYarn ? 'yarn' : npm;
2526

2627
return spawn(command, ['install'], {
2728
cwd: root,
2829
stdio: 'inherit',
2930
env: { ...process.env, ADBLOCK: '1', DISABLE_OPENCOLLECTIVE: '1' },
31+
shell: isWindows,
3032
});
3133
}
3234

packages/test/src/helpers-integration.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
} from '@temporalio/worker';
2222
import * as workflow from '@temporalio/workflow';
2323
import { ConnectionInjectorInterceptor } from './activities/interceptors';
24-
import { Worker, test as anyTest, bundlerOptions } from './helpers';
24+
import { Worker, test as anyTest, bundlerOptions, registerDefaultCustomSearchAttributes } from './helpers';
2525

2626
export interface Context {
2727
env: TestWorkflowEnvironment;
@@ -59,6 +59,7 @@ export function makeTestFunction(opts: {
5959
],
6060
},
6161
});
62+
await registerDefaultCustomSearchAttributes(env.connection);
6263
const workflowBundle = await bundleWorkflowCode({
6364
...bundlerOptions,
6465
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],

packages/test/src/mock-native-worker.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
/* eslint-disable @typescript-eslint/no-non-null-assertion */
22
import { lastValueFrom } from 'rxjs';
3-
import { defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common';
3+
import { LogSource, defaultPayloadConverter, fromPayloadsAtIndex } from '@temporalio/common';
44
import { msToTs } from '@temporalio/common/lib/time';
55
import { coresdk } from '@temporalio/proto';
66
import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker';
77
import { byteArrayToBuffer } from '@temporalio/worker/lib/utils';
88
import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker';
9-
import {
10-
addDefaultWorkerOptions,
11-
CompiledWorkerOptions,
12-
compileWorkerOptions,
13-
WorkerOptions,
14-
} from '@temporalio/worker/lib/worker-options';
9+
import { withMetadata } from '@temporalio/worker/lib/logger';
10+
import { CompiledWorkerOptions, compileWorkerOptions, WorkerOptions } from '@temporalio/worker/lib/worker-options';
1511
import type { WorkflowCreator } from '@temporalio/worker/lib/workflow/interface';
1612
import * as activities from './activities';
1713

@@ -163,11 +159,12 @@ export class Worker extends RealWorker {
163159
}
164160

165161
public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) {
166-
// Worker.create() accesses Runtime.instance(), which has some side effects that would not happen (or that would
167-
// happen too late) when creating a MockWorker. Force the singleton to be created now, if it doesn't already exist.
168-
Runtime.instance();
162+
const logger = withMetadata(Runtime.instance().logger, {
163+
logSource: LogSource.worker,
164+
taskQueue: opts.taskQueue ?? 'default',
165+
});
169166
const nativeWorker = new MockNativeWorker();
170-
super(nativeWorker, workflowCreator, opts);
167+
super(nativeWorker, workflowCreator, opts, logger);
171168
}
172169

173170
public runWorkflows(...args: Parameters<Worker['workflow$']>): Promise<void> {
@@ -183,6 +180,10 @@ export const defaultOptions: WorkerOptions = {
183180
};
184181

185182
export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Worker {
183+
const logger = withMetadata(Runtime.instance().logger, {
184+
logSource: LogSource.worker,
185+
taskQueue: options.taskQueue ?? 'default',
186+
});
186187
return new Worker(
187188
{
188189
async createWorkflow() {
@@ -192,6 +193,6 @@ export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Work
192193
/* Nothing to destroy */
193194
},
194195
},
195-
compileWorkerOptions(addDefaultWorkerOptions(options))
196+
compileWorkerOptions(options, logger)
196197
);
197198
}

packages/test/src/test-activity-log-interceptor.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,52 @@ import { MockActivityEnvironment, defaultActivityInfo } from '@temporalio/testin
55
import { isCancellation } from '@temporalio/workflow';
66
import { isAbortError } from '@temporalio/common/lib/type-helpers';
77
import * as activity from '@temporalio/activity';
8+
import { LogSource } from '@temporalio/common';
89
import { withZeroesHTTPServer } from './zeroes-http-server';
910
import { cancellableFetch } from './activities';
1011

1112
interface MyTestActivityContext extends activity.Context {
1213
logs: Array<LogEntry>;
1314
}
1415

15-
test.before(() => {
16-
const mockLogger = new DefaultLogger('DEBUG', (entry) => {
16+
const mockLogger = new DefaultLogger('DEBUG', (entry) => {
17+
try {
1718
(activity.Context.current() as MyTestActivityContext).logs ??= [];
1819
(activity.Context.current() as MyTestActivityContext).logs.push(entry);
19-
});
20-
Runtime.install({
21-
logger: mockLogger,
22-
});
20+
} catch (e) {
21+
// Ignore messages produced from non activity context
22+
if ((e as Error).message !== 'Activity context not initialized') throw e;
23+
}
24+
});
25+
Runtime.install({
26+
logger: mockLogger,
2327
});
2428

25-
test("Activity Context logger defaults to Runtime's Logger", async (t) => {
26-
const env = new MockActivityEnvironment({});
29+
test('Activity Context logger funnel through the parent Logger', async (t) => {
30+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
2731
await env.run(async () => {
2832
activity.log.debug('log message from activity');
2933
});
3034
const logs = (env.context as MyTestActivityContext).logs;
3135
const entry = logs.find((x) => x.level === 'DEBUG' && x.message === 'log message from activity');
3236
t.not(entry, undefined);
37+
t.deepEqual(entry?.meta, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.activity });
3338
});
3439

3540
test('Activity Worker logs when activity starts', async (t) => {
36-
const env = new MockActivityEnvironment({});
41+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
3742
await env.run(async () => {
3843
activity.log.debug('log message from activity');
3944
});
4045
const logs = (env.context as MyTestActivityContext).logs;
4146
const entry = logs.find((x) => x.level === 'DEBUG' && x.message === 'Activity started');
4247
t.not(entry, undefined);
43-
t.deepEqual(entry?.meta, activityLogAttributes(defaultActivityInfo));
48+
t.deepEqual(entry?.meta, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.worker });
4449
});
4550

4651
test('Activity Worker logs warning when activity fails', async (t) => {
4752
const err = new Error('Failed for test');
48-
const env = new MockActivityEnvironment({});
53+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
4954
try {
5055
await env.run(async () => {
5156
throw err;
@@ -60,11 +65,11 @@ test('Activity Worker logs warning when activity fails', async (t) => {
6065
const { durationMs, error, ...rest } = entry?.meta ?? {};
6166
t.true(Number.isInteger(durationMs));
6267
t.is(err, error);
63-
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
68+
t.deepEqual(rest, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.worker });
6469
});
6570

6671
test('Activity Worker logs when activity completes async', async (t) => {
67-
const env = new MockActivityEnvironment({});
72+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
6873
try {
6974
await env.run(async () => {
7075
throw new activity.CompleteAsyncError();
@@ -77,11 +82,11 @@ test('Activity Worker logs when activity completes async', async (t) => {
7782
t.not(entry, undefined);
7883
const { durationMs, ...rest } = entry?.meta ?? {};
7984
t.true(Number.isInteger(durationMs));
80-
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
85+
t.deepEqual(rest, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.worker });
8186
});
8287

8388
test('Activity Worker logs when activity is cancelled with promise', async (t) => {
84-
const env = new MockActivityEnvironment({});
89+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
8590
env.on('heartbeat', () => env.cancel());
8691
try {
8792
await env.run(async () => {
@@ -96,11 +101,11 @@ test('Activity Worker logs when activity is cancelled with promise', async (t) =
96101
t.not(entry, undefined);
97102
const { durationMs, ...rest } = entry?.meta ?? {};
98103
t.true(Number.isInteger(durationMs));
99-
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
104+
t.deepEqual(rest, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.worker });
100105
});
101106

102107
test('Activity Worker logs when activity is cancelled with signal', async (t) => {
103-
const env = new MockActivityEnvironment({});
108+
const env = new MockActivityEnvironment({}, { logger: mockLogger });
104109
env.on('heartbeat', () => env.cancel());
105110
try {
106111
await env.run(async () => {
@@ -116,7 +121,7 @@ test('Activity Worker logs when activity is cancelled with signal', async (t) =>
116121
t.not(entry, undefined);
117122
const { durationMs, ...rest } = entry?.meta ?? {};
118123
t.true(Number.isInteger(durationMs));
119-
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
124+
t.deepEqual(rest, { ...activityLogAttributes(defaultActivityInfo), logSource: LogSource.worker });
120125
});
121126

122127
test('(Legacy) ActivityInboundLogInterceptor does not override Context.log by default', async (t) => {
@@ -125,6 +130,7 @@ test('(Legacy) ActivityInboundLogInterceptor does not override Context.log by de
125130
{
126131
// eslint-disable-next-line deprecation/deprecation
127132
interceptors: [(ctx) => ({ inbound: new ActivityInboundLogInterceptor(ctx) })],
133+
logger: mockLogger,
128134
}
129135
);
130136
await env.run(async () => {
@@ -146,15 +152,14 @@ test('(Legacy) ActivityInboundLogInterceptor overrides Context.log if a logger i
146152
{
147153
// eslint-disable-next-line deprecation/deprecation
148154
interceptors: [(ctx) => ({ inbound: new ActivityInboundLogInterceptor(ctx, logger) })],
155+
logger: mockLogger,
149156
}
150157
);
151158
await env.run(async () => {
152159
activity.log.debug('log message from activity');
153160
});
154-
const entry1 = logs.find((x) => x.level === 'DEBUG' && x.message === 'Activity started');
155-
t.not(entry1, undefined);
156-
const entry2 = logs.find((x) => x.level === 'DEBUG' && x.message === 'log message from activity');
157-
t.not(entry2, undefined);
161+
const entry = logs.find((x) => x.level === 'DEBUG' && x.message === 'log message from activity');
162+
t.not(entry, undefined);
158163
});
159164

160165
test('(Legacy) ActivityInboundLogInterceptor overrides Context.log if class is extended', async (t) => {
@@ -172,6 +177,7 @@ test('(Legacy) ActivityInboundLogInterceptor overrides Context.log if class is e
172177
{},
173178
{
174179
interceptors: [(ctx) => ({ inbound: new CustomActivityInboundLogInterceptor(ctx) })],
180+
logger: mockLogger,
175181
}
176182
);
177183
await env.run(async () => {

packages/test/src/test-integration-workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,12 @@ test('HistorySize is visible in WorkflowExecutionInfo', async (t) => {
153153

154154
export async function suggestedCAN(): Promise<boolean> {
155155
const maxEvents = 40_000;
156-
const batchSize = 100;
156+
const batchSize = 1000;
157157
if (workflow.workflowInfo().continueAsNewSuggested) {
158158
return false;
159159
}
160160
while (workflow.workflowInfo().historyLength < maxEvents) {
161-
await Promise.all(new Array(batchSize).fill(undefined).map((_) => workflow.sleep(1)));
161+
await Promise.all(Array.from({ length: batchSize }, (_) => workflow.sleep(1)));
162162
if (workflow.workflowInfo().continueAsNewSuggested) {
163163
return true;
164164
}

packages/test/src/test-mockactivityenv.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
import test from 'ava';
22
import { MockActivityEnvironment } from '@temporalio/testing';
3-
import { CancelledFailure, Context } from '@temporalio/activity';
3+
import * as activity from '@temporalio/activity';
4+
import { Runtime } from '@temporalio/worker';
5+
6+
test("MockActivityEnvironment doesn't implicitly instanciate Runtime", async (t) => {
7+
t.is(Runtime._instance, undefined);
8+
const env = new MockActivityEnvironment();
9+
await env.run(async (): Promise<void> => {
10+
activity.log.info('log message from activity');
11+
});
12+
t.is(Runtime._instance, undefined);
13+
});
414

515
test('MockActivityEnvironment can run a single activity', async (t) => {
616
const env = new MockActivityEnvironment();
@@ -19,12 +29,12 @@ test('MockActivityEnvironment emits heartbeat events and can be cancelled', asyn
1929
});
2030
await t.throwsAsync(
2131
env.run(async (x: number): Promise<number> => {
22-
Context.current().heartbeat(6);
23-
await Context.current().sleep(100);
32+
activity.heartbeat(6);
33+
await activity.sleep(100);
2434
return x + 1;
2535
}, 3),
2636
{
27-
instanceOf: CancelledFailure,
37+
instanceOf: activity.CancelledFailure,
2838
message: 'test',
2939
}
3040
);
@@ -33,7 +43,7 @@ test('MockActivityEnvironment emits heartbeat events and can be cancelled', asyn
3343
test('MockActivityEnvironment injects provided info', async (t) => {
3444
const env = new MockActivityEnvironment({ attempt: 3 });
3545
const res = await env.run(async (x: number): Promise<number> => {
36-
return x + Context.current().info.attempt;
46+
return x + activity.activityInfo().attempt;
3747
}, 1);
3848
t.is(res, 4);
3949
});

packages/test/src/test-runtime.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ if (RUN_INTEGRATION_TESTS) {
112112
t.is(typeof failingWftEntry.meta?.['failure'], 'string');
113113
t.is(typeof failingWftEntry.meta?.['runId'], 'string');
114114
t.is(typeof failingWftEntry.meta?.['workflowId'], 'string');
115-
t.is(typeof failingWftEntry.meta?.['subsystem'], 'string');
115+
t.is(typeof failingWftEntry.meta?.['logSource'], 'string');
116116
} finally {
117117
await Runtime.instance().shutdown();
118118
}

0 commit comments

Comments
 (0)