Skip to content

Commit 37d3253

Browse files
authored
fix(workflow): Fix mutliple sinks oddities (#1181)
1 parent 2875b22 commit 37d3253

File tree

13 files changed

+386
-110
lines changed

13 files changed

+386
-110
lines changed

packages/test/src/test-sinks.ts

Lines changed: 221 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22
import test from 'ava';
33
import { v4 as uuid4 } from 'uuid';
44
import { WorkflowClient } from '@temporalio/client';
5-
import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker';
6-
import { WorkflowInfo } from '@temporalio/workflow';
5+
import {
6+
DefaultLogger,
7+
InjectedSinks,
8+
Runtime,
9+
LoggerSinks as DefaultLoggerSinks,
10+
InjectedSinkFunction,
11+
} from '@temporalio/worker';
12+
import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow';
713
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
814
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
915
import { defaultOptions } from './mock-native-worker';
@@ -15,6 +21,21 @@ class DependencyError extends Error {
1521
}
1622
}
1723

24+
function asDefaultLoggerSink(
25+
fn: (info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => Promise<void>,
26+
opts?: Omit<InjectedSinkFunction<any>, 'fn'>
27+
): InjectedSinks<DefaultLoggerSinks> {
28+
return {
29+
defaultWorkerLogger: {
30+
trace: { fn, ...opts },
31+
debug: { fn, ...opts },
32+
info: { fn, ...opts },
33+
warn: { fn, ...opts },
34+
error: { fn, ...opts },
35+
},
36+
};
37+
}
38+
1839
if (RUN_INTEGRATION_TESTS) {
1940
const recordedLogs: any[] = [];
2041
test.before((_) => {
@@ -35,6 +56,7 @@ if (RUN_INTEGRATION_TESTS) {
3556

3657
const dummyDate = new Date(2000, 1, 0, 0, 0, 0);
3758
function fixWorkflowInfoDates(input: WorkflowInfo): WorkflowInfo {
59+
delete (input.unsafe as any).now;
3860
return {
3961
...input,
4062
startTime: dummyDate,
@@ -114,7 +136,9 @@ if (RUN_INTEGRATION_TESTS) {
114136
startTime: dummyDate,
115137
runStartTime: dummyDate,
116138
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
117-
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
139+
unsafe: {
140+
isReplaying: false,
141+
} as UnsafeWorkflowInfo,
118142
};
119143

120144
t.deepEqual(recordedCalls, [
@@ -146,51 +170,60 @@ if (RUN_INTEGRATION_TESTS) {
146170
});
147171

148172
test('Sink functions are not called during replay if callDuringReplay is unset', async (t) => {
149-
const recordedMessages = Array<string>();
150173
const taskQueue = `${__filename}-${t.title}`;
151-
const sinks: InjectedSinks<workflows.LoggerSinks> = {
152-
logger: {
174+
175+
const recordedMessages = Array<{ message: string; historyLength: number; isReplaying: boolean }>();
176+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
177+
customLogger: {
153178
info: {
154-
async fn(_info, message) {
155-
recordedMessages.push(message);
179+
async fn(info, message) {
180+
recordedMessages.push({
181+
message,
182+
historyLength: info.historyLength,
183+
isReplaying: info.unsafe.isReplaying,
184+
});
156185
},
157186
},
158187
},
159188
};
160189

190+
const client = new WorkflowClient();
161191
const worker = await Worker.create({
162192
...defaultOptions,
163193
taskQueue,
164194
sinks,
165195
maxCachedWorkflows: 0,
166196
maxConcurrentWorkflowTaskExecutions: 2,
167197
});
168-
const client = new WorkflowClient();
169-
await Promise.all([
170-
(async () => {
171-
try {
172-
await client.execute(workflows.logSinkTester, { taskQueue, workflowId: uuid4() });
173-
} finally {
174-
worker.shutdown();
175-
}
176-
})(),
177-
worker.run(),
178-
]);
198+
await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId: uuid4() }));
179199

180200
t.deepEqual(recordedMessages, [
181-
'Workflow execution started, replaying: false, hl: 3',
182-
'Workflow execution completed, replaying: false, hl: 8',
201+
{
202+
message: 'Workflow execution started, replaying: false, hl: 3',
203+
historyLength: 3,
204+
isReplaying: false,
205+
},
206+
{
207+
message: 'Workflow execution completed, replaying: false, hl: 8',
208+
historyLength: 8,
209+
isReplaying: false,
210+
},
183211
]);
184212
});
185213

186214
test('Sink functions are called during replay if callDuringReplay is set', async (t) => {
187-
const recordedMessages = Array<string>();
188215
const taskQueue = `${__filename}-${t.title}`;
189-
const sinks: InjectedSinks<workflows.LoggerSinks> = {
190-
logger: {
216+
217+
const recordedMessages = Array<{ message: string; historyLength: number; isReplaying: boolean }>();
218+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
219+
customLogger: {
191220
info: {
192-
async fn(_info, message) {
193-
recordedMessages.push(message);
221+
fn: async (info, message) => {
222+
recordedMessages.push({
223+
message,
224+
historyLength: info.historyLength,
225+
isReplaying: info.unsafe.isReplaying,
226+
});
194227
},
195228
callDuringReplay: true,
196229
},
@@ -209,9 +242,168 @@ if (RUN_INTEGRATION_TESTS) {
209242

210243
// Note that task may be replayed more than once and record the first messages multiple times.
211244
t.deepEqual(recordedMessages.slice(0, 2), [
212-
'Workflow execution started, replaying: false, hl: 3',
213-
'Workflow execution started, replaying: true, hl: 3',
245+
{
246+
message: 'Workflow execution started, replaying: false, hl: 3',
247+
historyLength: 3,
248+
isReplaying: false,
249+
},
250+
{
251+
message: 'Workflow execution started, replaying: true, hl: 3',
252+
historyLength: 3,
253+
isReplaying: true,
254+
},
255+
]);
256+
t.deepEqual(recordedMessages[recordedMessages.length - 1], {
257+
message: 'Workflow execution completed, replaying: false, hl: 8',
258+
historyLength: 8,
259+
isReplaying: false,
260+
});
261+
});
262+
263+
test('Sink functions are not called in runReplayHistories if callDuringReplay is unset', async (t) => {
264+
const recordedMessages = Array<{ message: string; historyLength: number; isReplaying: boolean }>();
265+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
266+
customLogger: {
267+
info: {
268+
fn: async (info, message) => {
269+
recordedMessages.push({
270+
message,
271+
historyLength: info.historyLength,
272+
isReplaying: info.unsafe.isReplaying,
273+
});
274+
},
275+
},
276+
},
277+
};
278+
279+
const client = new WorkflowClient();
280+
const taskQueue = `${__filename}-${t.title}`;
281+
const worker = await Worker.create({
282+
...defaultOptions,
283+
taskQueue,
284+
sinks,
285+
});
286+
const workflowId = uuid4();
287+
await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId }));
288+
const history = await client.getHandle(workflowId).fetchHistory();
289+
290+
// Last 3 events are WorkflowExecutionStarted, WorkflowTaskCompleted and WorkflowExecutionCompleted
291+
history.events = history!.events!.slice(0, -3);
292+
293+
recordedMessages.length = 0;
294+
await Worker.runReplayHistory(
295+
{
296+
...defaultOptions,
297+
sinks,
298+
},
299+
history,
300+
workflowId
301+
);
302+
303+
t.deepEqual(recordedMessages, []);
304+
});
305+
306+
test('Sink functions are called in runReplayHistories if callDuringReplay is set', async (t) => {
307+
const taskQueue = `${__filename}-${t.title}`;
308+
309+
const recordedMessages = Array<{ message: string; historyLength: number; isReplaying: boolean }>();
310+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
311+
customLogger: {
312+
info: {
313+
fn: async (info, message) => {
314+
recordedMessages.push({
315+
message,
316+
historyLength: info.historyLength,
317+
isReplaying: info.unsafe.isReplaying,
318+
});
319+
},
320+
callDuringReplay: true,
321+
},
322+
},
323+
};
324+
325+
const worker = await Worker.create({
326+
...defaultOptions,
327+
taskQueue,
328+
sinks,
329+
});
330+
const client = new WorkflowClient();
331+
const workflowId = uuid4();
332+
await worker.runUntil(async () => {
333+
await client.execute(workflows.logSinkTester, { taskQueue, workflowId });
334+
});
335+
const history = await client.getHandle(workflowId).fetchHistory();
336+
337+
// Last 3 events are WorkflowExecutionStarted, WorkflowTaskCompleted and WorkflowExecutionCompleted
338+
history.events = history!.events!.slice(0, -3);
339+
340+
recordedMessages.length = 0;
341+
await Worker.runReplayHistory(
342+
{
343+
...defaultOptions,
344+
sinks,
345+
},
346+
history,
347+
workflowId
348+
);
349+
350+
t.deepEqual(recordedMessages.slice(0, 2), [
351+
{
352+
message: 'Workflow execution started, replaying: true, hl: 3',
353+
isReplaying: true,
354+
historyLength: 3,
355+
},
356+
{
357+
message: 'Workflow execution completed, replaying: false, hl: 7',
358+
isReplaying: false,
359+
historyLength: 7,
360+
},
361+
]);
362+
});
363+
364+
test('Sink functions contains upserted search attributes', async (t) => {
365+
const taskQueue = `${__filename}-${t.title}`;
366+
367+
const recordedMessages = Array<{ message: string; searchAttributes: SearchAttributes }>();
368+
const sinks = asDefaultLoggerSink(async (info, message, _attrs) => {
369+
recordedMessages.push({
370+
message,
371+
searchAttributes: info.searchAttributes,
372+
});
373+
});
374+
375+
const client = new WorkflowClient();
376+
const date = new Date();
377+
378+
const worker = await Worker.create({
379+
...defaultOptions,
380+
taskQueue,
381+
sinks,
382+
});
383+
await worker.runUntil(
384+
client.execute(workflows.upsertAndReadSearchAttributes, {
385+
taskQueue,
386+
workflowId: uuid4(),
387+
args: [date.getTime()],
388+
})
389+
);
390+
391+
t.deepEqual(recordedMessages, [
392+
{
393+
message: 'Workflow started',
394+
searchAttributes: {},
395+
},
396+
{
397+
message: 'Workflow completed',
398+
searchAttributes: {
399+
CustomBoolField: [true],
400+
CustomIntField: [], // clear
401+
CustomKeywordField: ['durable code'],
402+
CustomTextField: ['is useful'],
403+
CustomDatetimeField: [date],
404+
CustomDoubleField: [3.14],
405+
},
406+
},
214407
]);
215-
t.is(recordedMessages[recordedMessages.length - 1], 'Workflow execution completed, replaying: false, hl: 8');
216408
});
217409
}

packages/test/src/test-workflows.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ test.before(async (t) => {
6363
const workflowsPath = path.join(__dirname, 'workflows');
6464
const bundler = new WorkflowCodeBundler({ workflowsPath });
6565
const workflowBundle = parseWorkflowCode((await bundler.createBundle()).code);
66+
// FIXME: isolateExecutionTimeoutMs used to be 200 ms, but that's causing
67+
// lot of flakes on CI. Revert this after investigation / resolution.
6668
t.context.workflowCreator = REUSE_V8_CONTEXT
67-
? await TestReusableVMWorkflowCreator.create(workflowBundle, 200, new Set())
68-
: await TestVMWorkflowCreator.create(workflowBundle, 200, new Set());
69+
? await TestReusableVMWorkflowCreator.create(workflowBundle, 400, new Set())
70+
: await TestVMWorkflowCreator.create(workflowBundle, 400, new Set());
6971
});
7072

7173
test.after.always(async (t) => {
@@ -1546,10 +1548,14 @@ test('logAndTimeout', async (t) => {
15461548
const { workflowType, workflow } = t.context;
15471549
await t.throwsAsync(activate(t, makeStartWorkflow(workflowType)), {
15481550
code: 'ERR_SCRIPT_EXECUTION_TIMEOUT',
1549-
message: 'Script execution timed out after 200ms',
1551+
message: 'Script execution timed out after 400ms',
15501552
});
15511553
const calls = await workflow.getAndResetSinkCalls();
1552-
delete calls[0].args[1][LogTimestamp];
1554+
// Ignore LogTimestamp and workflowInfo for the purpose of this comparison
1555+
calls.forEach((call) => {
1556+
delete call.args[1]?.[LogTimestamp];
1557+
delete (call as any).workflowInfo;
1558+
});
15531559
t.deepEqual(calls, [
15541560
{
15551561
ifaceName: 'defaultWorkerLogger',
@@ -1565,7 +1571,20 @@ test('logAndTimeout', async (t) => {
15651571
},
15661572
],
15671573
},
1568-
{ ifaceName: 'logger', fnName: 'info', args: ['logging before getting stuck'] },
1574+
{
1575+
ifaceName: 'defaultWorkerLogger',
1576+
fnName: 'info',
1577+
args: [
1578+
'logging before getting stuck',
1579+
{
1580+
namespace: 'default',
1581+
runId: 'beforeEach hook for logAndTimeout',
1582+
taskQueue: 'test',
1583+
workflowId: 'test-workflowId',
1584+
workflowType: 'logAndTimeout',
1585+
},
1586+
],
1587+
},
15691588
]);
15701589
});
15711590

packages/test/src/workflows/definitions.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
11
/* eslint-disable no-duplicate-imports */
22
import { defineSignal } from '@temporalio/workflow';
3-
// @@@SNIPSTART typescript-logger-sink-interface
4-
import type { Sinks } from '@temporalio/workflow';
5-
6-
export interface LoggerSinks extends Sinks {
7-
logger: {
8-
info(message: string): void;
9-
};
10-
}
11-
// @@@SNIPEND
123

134
export const activityStartedSignal = defineSignal('activityStarted');
145
export const failSignal = defineSignal('fail');

packages/test/src/workflows/log-before-timing-out.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
import { proxySinks } from '@temporalio/workflow';
2-
import { LoggerSinks } from './definitions';
3-
4-
const { logger } = proxySinks<LoggerSinks>();
1+
import { log } from '@temporalio/workflow';
52

63
export async function logAndTimeout(): Promise<void> {
7-
logger.info('logging before getting stuck');
4+
log.info('logging before getting stuck');
85
for (;;) {
96
/* Workflow should never complete */
107
}

0 commit comments

Comments
 (0)