Skip to content

Commit ede666e

Browse files
authored
chore: Deprecate defaultSinks (#1283)
1 parent 444d7f9 commit ede666e

20 files changed

+411
-215
lines changed

package-lock.json

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/test/src/test-sinks.ts

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
/* eslint @typescript-eslint/no-non-null-assertion: 0 */
22
import test from 'ava';
33
import { v4 as uuid4 } from 'uuid';
4-
import { WorkflowClient } from '@temporalio/client';
4+
import { Connection, WorkflowClient } from '@temporalio/client';
55
import {
66
DefaultLogger,
77
InjectedSinks,
88
Runtime,
9-
LoggerSinks as DefaultLoggerSinks,
109
InjectedSinkFunction,
1110
WorkerOptions,
11+
LogEntry,
1212
} from '@temporalio/worker';
13+
import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs';
1314
import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow';
1415
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
15-
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
16+
import { RUN_INTEGRATION_TESTS, Worker, registerDefaultCustomSearchAttributes } from './helpers';
1617
import { defaultOptions } from './mock-native-worker';
1718
import * as workflows from './workflows';
1819

@@ -22,12 +23,12 @@ class DependencyError extends Error {
2223
}
2324
}
2425

25-
function asDefaultLoggerSink(
26+
function asSdkLoggerSink(
2627
fn: (info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => Promise<void>,
2728
opts?: Omit<InjectedSinkFunction<any>, 'fn'>
2829
): InjectedSinks<DefaultLoggerSinks> {
2930
return {
30-
defaultWorkerLogger: {
31+
__temporal_logger: {
3132
trace: { fn, ...opts },
3233
debug: { fn, ...opts },
3334
info: { fn, ...opts },
@@ -38,17 +39,20 @@ function asDefaultLoggerSink(
3839
}
3940

4041
if (RUN_INTEGRATION_TESTS) {
41-
const recordedLogs: any[] = [];
42-
test.before((_) => {
42+
const recordedLogs: { [workflowId: string]: LogEntry[] } = {};
43+
44+
test.before(async (_) => {
45+
await registerDefaultCustomSearchAttributes(await Connection.connect({}));
4346
Runtime.install({
44-
logger: new DefaultLogger('DEBUG', ({ level, message, meta }) => {
45-
if (message === 'External sink function threw an error') recordedLogs.push({ level, message, meta });
47+
logger: new DefaultLogger('DEBUG', (entry: LogEntry) => {
48+
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId;
49+
recordedLogs[workflowId] ??= [];
50+
recordedLogs[workflowId].push(entry);
4651
}),
4752
});
4853
});
4954

50-
// Must be serial because it uses the global Runtime to check for error messages
51-
test.serial('Worker injects sinks', async (t) => {
55+
test('Worker injects sinks', async (t) => {
5256
interface RecordedCall {
5357
info: WorkflowInfo;
5458
counter: number;
@@ -153,12 +157,13 @@ if (RUN_INTEGRATION_TESTS) {
153157
]);
154158

155159
t.deepEqual(
156-
recordedLogs.map((x) => ({
160+
recordedLogs[info.workflowId].map((x: LogEntry) => ({
157161
...x,
158162
meta: {
159163
...x.meta,
160-
workflowInfo: fixWorkflowInfoDates(x.meta.workflowInfo),
164+
workflowInfo: fixWorkflowInfoDates(x.meta?.workflowInfo),
161165
},
166+
timestampNanos: undefined,
162167
})),
163168
thrownErrors.map((error) => ({
164169
level: 'ERROR',
@@ -169,6 +174,7 @@ if (RUN_INTEGRATION_TESTS) {
169174
fnName: error.fnName,
170175
workflowInfo: info,
171176
},
177+
timestampNanos: undefined,
172178
}))
173179
);
174180
});
@@ -369,7 +375,7 @@ if (RUN_INTEGRATION_TESTS) {
369375
const taskQueue = `${__filename}-${t.title}`;
370376

371377
const recordedMessages = Array<{ message: string; searchAttributes: SearchAttributes }>();
372-
const sinks = asDefaultLoggerSink(async (info, message, _attrs) => {
378+
const sinks = asSdkLoggerSink(async (info, message, _attrs) => {
373379
recordedMessages.push({
374380
message,
375381
searchAttributes: info.searchAttributes,
Lines changed: 122 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
import anyTest, { TestFn, ExecutionContext } from 'ava';
22
import { v4 as uuid4 } from 'uuid';
3-
import { DefaultLogger, LogEntry, defaultSinks } from '@temporalio/worker';
3+
import {
4+
DefaultLogger,
5+
InjectedSinks,
6+
LogEntry,
7+
LogLevel,
8+
LoggerSinks,
9+
Runtime,
10+
defaultSinks,
11+
} from '@temporalio/worker';
412
import { TestWorkflowEnvironment } from '@temporalio/testing';
13+
import { WorkflowInfo } from '@temporalio/workflow';
514
import * as workflows from './workflows';
615
import { Worker } from './helpers';
716

@@ -11,9 +20,19 @@ interface Context {
1120
}
1221
const test = anyTest as TestFn<Context>;
1322

23+
const recordedLogs: { [workflowId: string]: LogEntry[] } = {};
24+
1425
test.before(async (t) => {
26+
Runtime.install({
27+
logger: new DefaultLogger('DEBUG', (entry) => {
28+
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
29+
recordedLogs[workflowId] ??= [];
30+
recordedLogs[workflowId].push(entry);
31+
}),
32+
});
33+
1534
t.context = {
16-
testEnv: await TestWorkflowEnvironment.createTimeSkipping(),
35+
testEnv: await TestWorkflowEnvironment.createLocal(),
1736
taskQueue: '', // Will be set in beforeEach
1837
};
1938
});
@@ -26,27 +45,30 @@ test.after.always(async (t) => {
2645
await t.context.testEnv?.teardown();
2746
});
2847

29-
async function withWorker(t: ExecutionContext<Context>, p: Promise<any>): Promise<[LogEntry, LogEntry]> {
48+
async function withWorker(
49+
t: ExecutionContext<Context>,
50+
p: Promise<any>,
51+
workflowId: string
52+
): Promise<[LogEntry, LogEntry]> {
3053
const { nativeConnection } = t.context.testEnv;
31-
const logs = Array<LogEntry>();
32-
const logger = new DefaultLogger('DEBUG', (entry) => logs.push(entry));
3354
const worker = await Worker.create({
3455
connection: nativeConnection,
3556
taskQueue: t.context.taskQueue,
3657
workflowsPath: require.resolve('./workflows'),
37-
sinks: defaultSinks(logger),
3858
});
3959
await worker.runUntil(p);
60+
const logs = recordedLogs[workflowId];
4061
t.true(logs.length >= 2);
4162
return logs as [LogEntry, LogEntry];
4263
}
4364

44-
test.serial('WorkflowInboundLogInterceptor logs when workflow completes', async (t) => {
65+
test('WorkflowInboundLogInterceptor logs when workflow completes', async (t) => {
4566
const { client } = t.context.testEnv;
4667
const workflowId = uuid4();
4768
const [startLog, endLog] = await withWorker(
4869
t,
49-
client.workflow.execute(workflows.successString, { workflowId, taskQueue: t.context.taskQueue })
70+
client.workflow.execute(workflows.successString, { workflowId, taskQueue: t.context.taskQueue }),
71+
workflowId
5072
);
5173
t.is(startLog.level, 'DEBUG');
5274
t.is(startLog.message, 'Workflow started');
@@ -59,35 +81,121 @@ test.serial('WorkflowInboundLogInterceptor logs when workflow completes', async
5981
t.is(endLog.message, 'Workflow completed');
6082
});
6183

62-
test.serial('WorkflowInboundLogInterceptor logs when workflow continues as new', async (t) => {
84+
test('WorkflowInboundLogInterceptor logs when workflow continues as new', async (t) => {
6385
const { client } = t.context.testEnv;
86+
const workflowId = uuid4();
6487
const [_, endLog] = await withWorker(
6588
t,
6689
t.throwsAsync(
6790
client.workflow.execute(workflows.continueAsNewSameWorkflow, {
6891
args: ['execute', 'execute'],
69-
workflowId: uuid4(),
92+
workflowId,
7093
taskQueue: t.context.taskQueue,
7194
followRuns: false,
7295
})
73-
)
96+
),
97+
workflowId
7498
);
7599
t.is(endLog.level, 'DEBUG');
76100
t.is(endLog.message, 'Workflow continued as new');
77101
});
78102

79-
test.serial('WorkflowInboundLogInterceptor logs warning when workflow fails', async (t) => {
103+
test('WorkflowInboundLogInterceptor logs warning when workflow fails', async (t) => {
80104
const { client } = t.context.testEnv;
105+
const workflowId = uuid4();
81106
const [_, endLog] = await withWorker(
82107
t,
83108
t.throwsAsync(
84109
client.workflow.execute(workflows.throwAsync, {
85-
workflowId: uuid4(),
110+
workflowId,
86111
taskQueue: t.context.taskQueue,
87112
followRuns: false,
88113
})
89-
)
114+
),
115+
workflowId
90116
);
91117
t.is(endLog.level, 'WARN');
92118
t.is(endLog.message, 'Workflow failed');
93119
});
120+
121+
test('(Legacy) defaultSinks(logger) can be used to customize where logs are sent', async (t) => {
122+
const { client } = t.context.testEnv;
123+
const workflowId = uuid4();
124+
const { nativeConnection } = t.context.testEnv;
125+
const logs = Array<LogEntry>();
126+
const logger = new DefaultLogger('DEBUG', (entry) => logs.push(entry));
127+
const worker = await Worker.create({
128+
connection: nativeConnection,
129+
taskQueue: t.context.taskQueue,
130+
workflowsPath: require.resolve('./workflows'),
131+
// eslint-disable-next-line deprecation/deprecation
132+
sinks: defaultSinks(logger),
133+
});
134+
await worker.runUntil(
135+
client.workflow.execute(workflows.successString, { workflowId, taskQueue: t.context.taskQueue })
136+
);
137+
t.false(workflowId in recordedLogs);
138+
t.true(logs.length >= 2);
139+
const [startLog, endLog] = logs;
140+
t.is(startLog.level, 'DEBUG');
141+
t.is(startLog.message, 'Workflow started');
142+
t.is(startLog.meta?.workflowId, workflowId);
143+
t.true(typeof startLog.meta?.runId === 'string');
144+
t.is(startLog.meta?.taskQueue, t.context.taskQueue);
145+
t.is(startLog.meta?.namespace, 'default');
146+
t.is(startLog.meta?.workflowType, 'successString');
147+
t.is(endLog.level, 'DEBUG');
148+
t.is(endLog.message, 'Workflow completed');
149+
});
150+
151+
test('(Legacy) Can register defaultWorkerLogger sink to customize where logs are sent', async (t) => {
152+
const { client } = t.context.testEnv;
153+
const workflowId = uuid4();
154+
const { nativeConnection } = t.context.testEnv;
155+
const logs = Array<LogEntry>();
156+
const fn = (level: LogLevel, _info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => {
157+
logs.push({ level, message, meta: attrs, timestampNanos: 0n });
158+
};
159+
const worker = await Worker.create({
160+
connection: nativeConnection,
161+
taskQueue: t.context.taskQueue,
162+
workflowsPath: require.resolve('./workflows'),
163+
// eslint-disable-next-line deprecation/deprecation
164+
sinks: <InjectedSinks<LoggerSinks>>{
165+
defaultWorkerLogger: {
166+
trace: { fn: fn.bind(undefined, 'TRACE') },
167+
debug: { fn: fn.bind(undefined, 'DEBUG') },
168+
info: { fn: fn.bind(undefined, 'INFO') },
169+
warn: { fn: fn.bind(undefined, 'WARN') },
170+
error: { fn: fn.bind(undefined, 'ERROR') },
171+
},
172+
},
173+
});
174+
await worker.runUntil(
175+
client.workflow.execute(workflows.successString, { workflowId, taskQueue: t.context.taskQueue })
176+
);
177+
t.false(workflowId in recordedLogs);
178+
t.true(logs.length >= 2);
179+
const [startLog, endLog] = logs;
180+
t.is(startLog.level, 'DEBUG');
181+
t.is(startLog.message, 'Workflow started');
182+
t.is(startLog.meta?.workflowId, workflowId);
183+
t.true(typeof startLog.meta?.runId === 'string');
184+
t.is(startLog.meta?.taskQueue, t.context.taskQueue);
185+
t.is(startLog.meta?.namespace, 'default');
186+
t.is(startLog.meta?.workflowType, 'successString');
187+
t.is(endLog.level, 'DEBUG');
188+
t.is(endLog.message, 'Workflow completed');
189+
});
190+
191+
test('(Legacy) Can explicitly call defaultWorkerLogger sink to emit logs', async (t) => {
192+
const { client } = t.context.testEnv;
193+
const workflowId = uuid4();
194+
const [_, midLog] = await withWorker(
195+
t,
196+
client.workflow.execute(workflows.useDepreatedLoggerSinkWorkflow, { workflowId, taskQueue: t.context.taskQueue }),
197+
workflowId
198+
);
199+
t.is(midLog.level, 'INFO');
200+
t.is(midLog.message, 'Log message from workflow');
201+
});

packages/test/src/test-workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,7 @@ test('logAndTimeout', async (t) => {
15681568
});
15691569
t.deepEqual(calls, [
15701570
{
1571-
ifaceName: 'defaultWorkerLogger',
1571+
ifaceName: '__temporal_logger',
15721572
fnName: 'debug',
15731573
args: [
15741574
'Workflow started',
@@ -1582,7 +1582,7 @@ test('logAndTimeout', async (t) => {
15821582
],
15831583
},
15841584
{
1585-
ifaceName: 'defaultWorkerLogger',
1585+
ifaceName: '__temporal_logger',
15861586
fnName: 'info',
15871587
args: [
15881588
'logging before getting stuck',

packages/test/src/workflows/internals-interceptor-example.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { proxySinks, WorkflowInterceptors, Sinks, sleep } from '@temporalio/workflow';
22

3-
export interface LoggerSinks extends Sinks {
3+
export interface MyLoggerSinks extends Sinks {
44
logger: {
55
log(event: string): void;
66
};
77
}
88

9-
const { logger } = proxySinks<LoggerSinks>();
9+
const { logger } = proxySinks<MyLoggerSinks>();
1010

1111
export async function internalsInterceptorExample(): Promise<void> {
1212
await sleep(10);

packages/test/src/workflows/log-sink-tester.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,10 @@ export async function logSinkTester(): Promise<void> {
3434
}`
3535
);
3636
}
37+
38+
// eslint-disable-next-line deprecation/deprecation
39+
const { defaultWorkerLogger } = wf.proxySinks<wf.LoggerSinks>();
40+
41+
export async function useDepreatedLoggerSinkWorkflow(): Promise<void> {
42+
defaultWorkerLogger.info(`Log message from workflow`, { workflowId: wf.workflowInfo().workflowId });
43+
}

0 commit comments

Comments
 (0)