Skip to content

Commit 5c5e8c5

Browse files
authored
feat: Revise SDK log attributes and levels (#750)
1 parent 792787f commit 5c5e8c5

16 files changed

+498
-41
lines changed

packages/activity/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ export interface Info {
148148
* Use this in order to resume your Activity from checkpoint.
149149
*/
150150
heartbeatDetails: any;
151+
152+
/**
153+
* Task queue the Activity is scheduled in, set to the Workflow's task queue in case of local Activity.
154+
*/
155+
taskQueue: string;
151156
}
152157

153158
/**

packages/common/src/failure.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ export class TimeoutFailure extends TemporalFailure {
181181
* This exception is expected to be thrown only by the framework code.
182182
*/
183183
export class ActivityFailure extends TemporalFailure {
184+
public readonly name: string = 'ActivityFailure';
185+
184186
public constructor(
185187
public readonly activityType: string,
186188
public readonly activityId: string | undefined,
@@ -199,6 +201,8 @@ export class ActivityFailure extends TemporalFailure {
199201
* This exception is expected to be thrown only by the framework code.
200202
*/
201203
export class ChildWorkflowFailure extends TemporalFailure {
204+
public readonly name: string = 'ChildWorkflowFailure';
205+
202206
public constructor(
203207
public readonly namespace: string | undefined,
204208
public readonly execution: WorkflowExecution,

packages/test/src/run-a-worker.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
import arg from 'arg';
2-
import { Worker, Runtime, DefaultLogger } from '@temporalio/worker';
2+
import { Worker, Runtime, DefaultLogger, LogLevel } from '@temporalio/worker';
33
import * as activities from './activities';
44

55
async function main() {
66
const argv = arg({
7-
'--debug': Boolean,
7+
'--log-level': String,
88
});
9-
if (argv['--debug']) {
9+
if (argv['--log-level']) {
10+
const logLevel = argv['--log-level'].toUpperCase();
1011
Runtime.install({
11-
logger: new DefaultLogger('DEBUG'),
12+
logger: new DefaultLogger(logLevel as LogLevel),
1213
telemetryOptions: {
1314
tracingFilter: 'temporal_sdk_core=DEBUG',
14-
logging: { forward: { level: 'DEBUG' } },
15+
logging: { forward: { level: logLevel as LogLevel } },
1516
},
1617
});
1718
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import test from 'ava';
2+
import { ActivityInboundLogInterceptor, activityLogAttributes, DefaultLogger, LogEntry } from '@temporalio/worker';
3+
import { MockActivityEnvironment, defaultActivityInfo } from '@temporalio/testing';
4+
import { composeInterceptors } from '@temporalio/internal-workflow-common';
5+
import * as activity from '@temporalio/activity';
6+
import { withZeroesHTTPServer } from './zeroes-http-server';
7+
import { cancellableFetch } from './activities/cancellable-fetch';
8+
9+
async function runActivity(
10+
fn: activity.ActivityFunction,
11+
heartbeatCallback = (_env: MockActivityEnvironment) => {
12+
// not an empty body eslint
13+
}
14+
): Promise<[LogEntry, LogEntry]> {
15+
const logs = Array<LogEntry>();
16+
const env = new MockActivityEnvironment();
17+
env.on('heartbeat', () => heartbeatCallback(env));
18+
await env.run(async () => {
19+
const ctx = activity.Context.current();
20+
const interceptor = new ActivityInboundLogInterceptor(
21+
ctx,
22+
new DefaultLogger('DEBUG', (entry) => {
23+
logs.push(entry);
24+
})
25+
);
26+
const execute = composeInterceptors([interceptor], 'execute', fn);
27+
try {
28+
await execute({ args: [], headers: {} });
29+
} catch {
30+
// ignore
31+
}
32+
});
33+
if (logs.length !== 2) {
34+
throw new Error('Expected exactly 2 log entries');
35+
}
36+
return logs as [LogEntry, LogEntry];
37+
}
38+
39+
test('ActivityInboundLogInterceptor logs when activity starts', async (t) => {
40+
const [startLog] = await runActivity(async () => {
41+
// not an empty body eslint
42+
});
43+
t.is(startLog.level, 'DEBUG');
44+
t.is(startLog.message, 'Activity started');
45+
t.deepEqual(startLog.meta, activityLogAttributes(defaultActivityInfo));
46+
});
47+
48+
test('ActivityInboundLogInterceptor logs warning when activity fails', async (t) => {
49+
const err = new Error('Failed for test');
50+
const [_, endLog] = await runActivity(async () => {
51+
throw err;
52+
});
53+
t.is(endLog.level, 'WARN');
54+
t.is(endLog.message, 'Activity failed');
55+
const { durationMs, error, ...rest } = endLog.meta ?? {};
56+
t.true(Number.isInteger(durationMs));
57+
t.is(err, error);
58+
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
59+
});
60+
61+
test('ActivityInboundLogInterceptor logs when activity completes async', async (t) => {
62+
const [_, endLog] = await runActivity(async () => {
63+
throw new activity.CompleteAsyncError();
64+
});
65+
t.is(endLog.level, 'DEBUG');
66+
t.is(endLog.message, 'Activity will complete asynchronously');
67+
const { durationMs, ...rest } = endLog.meta ?? {};
68+
t.true(Number.isInteger(durationMs));
69+
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
70+
});
71+
72+
test('ActivityInboundLogInterceptor logs when activity is cancelled with promise', async (t) => {
73+
const [_, endLog] = await runActivity(
74+
async () => {
75+
activity.Context.current().heartbeat();
76+
await activity.Context.current().cancelled;
77+
},
78+
(env) => {
79+
env.cancel();
80+
}
81+
);
82+
t.is(endLog.level, 'DEBUG');
83+
t.is(endLog.message, 'Activity completed as cancelled');
84+
const { durationMs, ...rest } = endLog.meta ?? {};
85+
t.true(Number.isInteger(durationMs));
86+
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
87+
});
88+
89+
test('ActivityInboundLogInterceptor logs when activity is cancelled with signal', async (t) => {
90+
const [_, endLog] = await runActivity(
91+
async () => {
92+
await withZeroesHTTPServer(async (port) => {
93+
await cancellableFetch(`http:127.0.0.1:${port}`);
94+
});
95+
},
96+
(env) => {
97+
env.cancel();
98+
}
99+
);
100+
t.is(endLog.level, 'DEBUG');
101+
t.is(endLog.message, 'Activity completed as cancelled');
102+
const { durationMs, ...rest } = endLog.meta ?? {};
103+
t.true(Number.isInteger(durationMs));
104+
t.deepEqual(rest, activityLogAttributes(defaultActivityInfo));
105+
});

packages/test/src/test-custom-payload-converter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ test('Worker with proto data converter runs an activity and reports completion',
121121
taskToken,
122122
start: {
123123
activityType: 'protoActivity',
124+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
124125
input: toPayloads(payloadConverter, messageInstance),
125126
},
126127
});

packages/test/src/test-worker-activities.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ test('Worker runs an activity and reports completion', async (t) => {
5959
taskToken,
6060
start: {
6161
activityType: 'httpGet',
62+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
6263
input: toPayloads(defaultPayloadConverter, url),
6364
},
6465
});
@@ -77,6 +78,7 @@ test('Worker runs an activity and reports failure', async (t) => {
7778
taskToken,
7879
start: {
7980
activityType: 'throwAnError',
81+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
8082
input: toPayloads(defaultPayloadConverter, false, message),
8183
},
8284
});
@@ -105,6 +107,7 @@ test('Worker cancels activity and reports cancellation', async (t) => {
105107
taskToken,
106108
start: {
107109
activityType: 'waitForCancellation',
110+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
108111
input: toPayloads(defaultPayloadConverter),
109112
},
110113
},
@@ -131,6 +134,7 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => {
131134
taskToken,
132135
start: {
133136
activityType: 'cancellableFetch',
137+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
134138
input: toPayloads(defaultPayloadConverter, `http://127.0.0.1:${port}`, false),
135139
},
136140
},
@@ -158,6 +162,7 @@ test('Activity cancel with reason "NOT_FOUND" is valid', async (t) => {
158162
taskToken,
159163
start: {
160164
activityType: 'cancellableFetch',
165+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
161166
input: toPayloads(defaultPayloadConverter, `http://127.0.0.1:${port}`, false),
162167
},
163168
},
@@ -183,6 +188,7 @@ test('Activity Context heartbeat is sent to core', async (t) => {
183188
taskToken,
184189
start: {
185190
activityType: 'progressiveSleep',
191+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
186192
input: toPayloads(defaultPayloadConverter),
187193
},
188194
});
@@ -206,6 +212,7 @@ test('Worker fails activity with proper message when it is not registered', asyn
206212
taskToken,
207213
start: {
208214
activityType: 'notFound',
215+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
209216
input: toPayloads(defaultPayloadConverter),
210217
},
211218
});
@@ -240,6 +247,7 @@ test('Worker cancels activities after shutdown', async (t) => {
240247
taskToken,
241248
start: {
242249
activityType: 'cancellationSnitch',
250+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
243251
input: toPayloads(defaultPayloadConverter),
244252
},
245253
}),
@@ -271,6 +279,7 @@ test('Non ApplicationFailure TemporalFailures thrown from Activity are wrapped w
271279
taskToken,
272280
start: {
273281
activityType: 'throwTemporalFailure',
282+
workflowExecution: { workflowId: 'wfid', runId: 'runId' },
274283
input: toPayloads(defaultPayloadConverter),
275284
},
276285
});

packages/test/src/test-worker-heartbeats.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import { isolateFreeWorker, Worker } from './mock-native-worker';
88
async function runActivity(worker: Worker, callback?: (completion: coresdk.ActivityTaskCompletion) => void) {
99
const taskToken = Buffer.from(uuid4());
1010
await worker.runUntil(async () => {
11-
const completion = await worker.native.runActivityTask({ taskToken, start: { activityType: 'rapidHeartbeater' } });
11+
const completion = await worker.native.runActivityTask({
12+
taskToken,
13+
start: { activityType: 'rapidHeartbeater', workflowExecution: { workflowId: 'wfid', runId: 'runid' } },
14+
});
1215
callback?.(completion);
1316
});
1417
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import anyTest, { TestInterface, ExecutionContext } from 'ava';
2+
import { v4 as uuid4 } from 'uuid';
3+
import { Worker, DefaultLogger, LogEntry, defaultSinks } from '@temporalio/worker';
4+
import { TestWorkflowEnvironment } from '@temporalio/testing';
5+
import * as workflows from './workflows';
6+
7+
interface Context {
8+
testEnv: TestWorkflowEnvironment;
9+
}
10+
const test = anyTest as TestInterface<Context>;
11+
12+
test.before(async (t) => {
13+
t.context = {
14+
testEnv: await TestWorkflowEnvironment.create({
15+
testServer: {
16+
stdio: 'inherit',
17+
},
18+
}),
19+
};
20+
});
21+
22+
test.after.always(async (t) => {
23+
await t.context.testEnv?.teardown();
24+
});
25+
26+
async function withWorker(t: ExecutionContext<Context>, p: Promise<any>): Promise<[LogEntry, LogEntry]> {
27+
const { nativeConnection } = t.context.testEnv;
28+
const logs = Array<LogEntry>();
29+
const logger = new DefaultLogger('DEBUG', (entry) => logs.push(entry));
30+
const worker = await Worker.create({
31+
connection: nativeConnection,
32+
taskQueue: 'test',
33+
workflowsPath: require.resolve('./workflows'),
34+
sinks: defaultSinks(logger),
35+
});
36+
await worker.runUntil(p);
37+
t.true(logs.length >= 2);
38+
return logs as [LogEntry, LogEntry];
39+
}
40+
41+
test.serial('WorkflowInboundLogInterceptor logs when workflow completes', async (t) => {
42+
const { workflowClient } = t.context.testEnv;
43+
const workflowId = uuid4();
44+
const [startLog, endLog] = await withWorker(
45+
t,
46+
workflowClient.execute(workflows.successString, { workflowId, taskQueue: 'test' })
47+
);
48+
t.is(startLog.level, 'DEBUG');
49+
t.is(startLog.message, 'Workflow started');
50+
t.is(startLog.meta?.workflowId, workflowId);
51+
t.true(typeof startLog.meta?.runId === 'string');
52+
t.is(startLog.meta?.taskQueue, 'test');
53+
t.is(startLog.meta?.namespace, 'default');
54+
t.is(startLog.meta?.workflowType, 'successString');
55+
t.is(endLog.level, 'DEBUG');
56+
t.is(endLog.message, 'Workflow completed');
57+
});
58+
59+
test.serial('WorkflowInboundLogInterceptor logs when workflow continues as new', async (t) => {
60+
const { workflowClient } = t.context.testEnv;
61+
const [_, endLog] = await withWorker(
62+
t,
63+
t.throwsAsync(
64+
workflowClient.execute(workflows.continueAsNewSameWorkflow, {
65+
args: ['execute', 'execute'],
66+
workflowId: uuid4(),
67+
taskQueue: 'test',
68+
followRuns: false,
69+
})
70+
)
71+
);
72+
t.is(endLog.level, 'DEBUG');
73+
t.is(endLog.message, 'Workflow continued as new');
74+
});
75+
76+
test.serial('WorkflowInboundLogInterceptor logs warning when workflow fails', async (t) => {
77+
const { workflowClient } = t.context.testEnv;
78+
const [_, endLog] = await withWorker(
79+
t,
80+
t.throwsAsync(
81+
workflowClient.execute(workflows.throwAsync, {
82+
workflowId: uuid4(),
83+
taskQueue: 'test',
84+
followRuns: false,
85+
})
86+
)
87+
);
88+
t.is(endLog.level, 'WARN');
89+
t.is(endLog.message, 'Workflow failed');
90+
});

packages/testing/src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ export class TestWorkflowEnvironment {
292292
*/
293293
export const defaultActivityInfo: activity.Info = {
294294
attempt: 1,
295+
taskQueue: 'test',
295296
isLocal: false,
296297
taskToken: Buffer.from('test'),
297298
activityId: 'test',
@@ -333,6 +334,9 @@ export class MockActivityEnvironment extends events.EventEmitter {
333334
abortController.signal,
334335
heartbeatCallback
335336
);
337+
promise.catch(() => {
338+
/* avoid unhandled rejection */
339+
});
336340
}
337341

338342
/**

0 commit comments

Comments
 (0)