Skip to content

Commit 87e6a15

Browse files
authored
feat(worker): Worker.runReplayHistories returns an AsyncIterableIterator (#1067)
1 parent 59c60b6 commit 87e6a15

File tree

6 files changed

+142
-192
lines changed

6 files changed

+142
-192
lines changed

packages/test/src/test-replay.ts

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@ import * as fs from 'fs';
44
import path from 'path';
55
import anyTest, { TestFn } from 'ava';
66
import { temporal } from '@temporalio/proto';
7-
import { DefaultLogger, ReplayError, Runtime } from '@temporalio/worker';
7+
import { bundleWorkflowCode, ReplayError, WorkflowBundle } from '@temporalio/worker';
88
import { DeterminismViolationError } from '@temporalio/workflow';
99
import { Worker } from './helpers';
1010
import History = temporal.api.history.v1.History;
1111

12+
async function gen2array<T>(gen: AsyncIterable<T>): Promise<T[]> {
13+
const out: T[] = [];
14+
for await (const x of gen) {
15+
out.push(x);
16+
}
17+
return out;
18+
}
19+
1220
export interface Context {
13-
runtime: Runtime;
21+
bundle: WorkflowBundle;
1422
}
1523

1624
async function getHistories(fname: string): Promise<History> {
@@ -38,18 +46,18 @@ const test = anyTest as TestFn<Context>;
3846
test.before(async (t) => {
3947
// We don't want AVA to whine about unhandled rejections thrown by workflows
4048
process.removeAllListeners('unhandledRejection');
41-
const logger = new DefaultLogger('DEBUG');
42-
const runtime = Runtime.install({ logger });
49+
const bundle = await bundleWorkflowCode({ workflowsPath: require.resolve('./workflows') });
50+
4351
t.context = {
44-
runtime,
52+
bundle,
4553
};
4654
});
4755

4856
test('cancel-fake-progress-replay', async (t) => {
4957
const hist = await getHistories('cancel_fake_progress_history.bin');
5058
await Worker.runReplayHistory(
5159
{
52-
workflowsPath: require.resolve('./workflows'),
60+
workflowBundle: t.context.bundle,
5361
},
5462
hist
5563
);
@@ -60,7 +68,7 @@ test('cancel-fake-progress-replay from JSON', async (t) => {
6068
const hist = await getHistories('cancel_fake_progress_history.json');
6169
await Worker.runReplayHistory(
6270
{
63-
workflowsPath: require.resolve('./workflows'),
71+
workflowBundle: t.context.bundle,
6472
},
6573
hist
6674
);
@@ -75,8 +83,7 @@ test('cancel-fake-progress-replay-nondeterministic', async (t) => {
7583
await t.throwsAsync(
7684
Worker.runReplayHistory(
7785
{
78-
workflowsPath: require.resolve('./workflows'),
79-
failFast: false, // Verify this flag is ignored for single replay
86+
workflowBundle: t.context.bundle,
8087
},
8188
hist
8289
),
@@ -91,67 +98,69 @@ test('workflow-task-failure-fails-replay', async (t) => {
9198
// Manually alter the workflow type to point to our workflow which will fail workflow tasks
9299
hist.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'failsWorkflowTask';
93100

94-
const err: ReplayError | undefined = await t.throwsAsync(
101+
await t.throwsAsync(
95102
Worker.runReplayHistory(
96103
{
97-
workflowsPath: require.resolve('./workflows'),
104+
workflowBundle: t.context.bundle,
98105
replayName: t.title,
99106
},
100107
hist
101108
),
102109
{ instanceOf: ReplayError }
103110
);
104-
t.false(err?.isNonDeterminism);
105111
});
106112

107113
test('multiple-histories-replay', async (t) => {
108114
const hist1 = await getHistories('cancel_fake_progress_history.bin');
109115
const hist2 = await getHistories('cancel_fake_progress_history.json');
110116
const histories = historator([hist1, hist2]);
111117

112-
const res = await Worker.runReplayHistories(
113-
{
114-
workflowsPath: require.resolve('./workflows'),
115-
replayName: t.title,
116-
},
117-
histories
118+
const res = await gen2array(
119+
Worker.runReplayHistories(
120+
{
121+
workflowBundle: t.context.bundle,
122+
replayName: t.title,
123+
},
124+
histories
125+
)
126+
);
127+
t.deepEqual(
128+
res.map(({ error }) => error),
129+
[undefined, undefined]
118130
);
119-
t.is(res.errors.length, 0);
120131
});
121132

122-
test('multiple-histories-replay-fails-fast', async (t) => {
133+
test('multiple-histories-replay-returns-errors', async (t) => {
123134
const hist1 = await getHistories('cancel_fake_progress_history.bin');
124135
const hist2 = await getHistories('cancel_fake_progress_history.json');
125136
// change workflow type to break determinism
126137
hist1.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'http';
127138
hist2.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'http';
128139
const histories = historator([hist1, hist2]);
129140

130-
await t.throwsAsync(
141+
const results = await gen2array(
131142
Worker.runReplayHistories(
132143
{
133-
workflowsPath: require.resolve('./workflows'),
144+
workflowBundle: t.context.bundle,
134145
replayName: t.title,
135146
},
136147
histories
137148
)
138149
);
150+
151+
t.is(results.filter(({ error }) => error instanceof DeterminismViolationError).length, 2);
139152
});
140153

141-
test('multiple-histories-replay-fails-slow', async (t) => {
142-
const hist1 = await getHistories('cancel_fake_progress_history.bin');
143-
const hist2 = await getHistories('cancel_fake_progress_history.json');
144-
// change workflow type to break determinism
145-
hist1.events[0].workflowExecutionStartedEventAttributes!.workflowType!.name = 'http';
146-
const histories = historator([hist1, hist2]);
154+
test('empty-histories-replay-returns-empty-result', async (t) => {
155+
const histories = historator([]);
147156

148-
const res = await Worker.runReplayHistories(
149-
{
150-
workflowsPath: require.resolve('./workflows'),
151-
replayName: t.title,
152-
failFast: false,
153-
},
154-
histories
157+
const res = await gen2array(
158+
Worker.runReplayHistories(
159+
{
160+
workflowBundle: t.context.bundle,
161+
},
162+
histories
163+
)
155164
);
156-
t.is(res.errors.length, 1);
165+
t.is(res.length, 0);
157166
});

packages/worker/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export {
4949
WorkflowBundlePath,
5050
WorkflowBundlePathWithSourceMap, // eslint-disable-line deprecation/deprecation
5151
} from './worker-options';
52-
export { ReplayError, ReplayHistoriesIterable, ReplayResults } from './replay';
52+
export { ReplayError, ReplayHistoriesIterable, ReplayResult } from './replay';
5353
export { WorkflowInboundLogInterceptor, workflowLogAttributes } from './workflow-log-interceptor';
5454
export {
5555
BundleOptions,

packages/worker/src/replay.ts

Lines changed: 12 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { HistoryAndWorkflowId } from '@temporalio/client';
22
import { coresdk } from '@temporalio/proto';
3+
import { DeterminismViolationError } from '@temporalio/workflow';
34

45
export type EvictionReason = coresdk.workflow_activation.RemoveFromCache.EvictionReason;
56
export const EvictionReason = coresdk.workflow_activation.RemoveFromCache.EvictionReason;
@@ -10,40 +11,15 @@ export type RemoveFromCache = coresdk.workflow_activation.IRemoveFromCache;
1011
*/
1112
export class ReplayError extends Error {
1213
public readonly name = 'ReplayError';
13-
14-
constructor(
15-
/**
16-
* Workflow ID of the Workflow that failed to replay
17-
*/
18-
public readonly workflowId: string,
19-
/**
20-
* Run ID of the Workflow that failed to replay
21-
*/
22-
public runId: string,
23-
/**
24-
* Whether or not this error is caused by non-determinism
25-
*/
26-
public readonly isNonDeterminism: boolean,
27-
/**
28-
* Why replay failed
29-
*/
30-
message: string
31-
) {
32-
super(message);
33-
}
34-
}
35-
36-
export interface ReplayResults {
37-
readonly hasErrors: boolean;
38-
/** Maps run id to information about the replay failure */
39-
readonly errors: ReplayError[];
4014
}
4115

4216
/**
43-
* @internal
17+
* Result of a single workflow replay
4418
*/
45-
export interface ReplayRunOptions {
46-
failFast?: boolean;
19+
export interface ReplayResult {
20+
readonly workflowId: string;
21+
readonly runId: string;
22+
readonly error?: ReplayError | DeterminismViolationError;
4723
}
4824

4925
/**
@@ -60,28 +36,22 @@ export type ReplayHistoriesIterable = AsyncIterable<HistoryAndWorkflowId> | Iter
6036
*
6137
* @internal
6238
*/
63-
export function handleReplayEviction(evictJob: RemoveFromCache, workflowId: string, runId: string): ReplayError | null {
39+
export function evictionReasonToReplayError(
40+
evictJob: RemoveFromCache
41+
): ReplayError | DeterminismViolationError | undefined {
6442
switch (evictJob.reason) {
6543
case EvictionReason.NONDETERMINISM:
66-
return new ReplayError(
67-
workflowId,
68-
runId,
69-
true,
44+
return new DeterminismViolationError(
7045
'Replay failed with a nondeterminism error. This means that the workflow code as written ' +
7146
`is not compatible with the history that was fed in. Details: ${evictJob.message}`
7247
);
7348
case EvictionReason.LANG_FAIL:
74-
return new ReplayError(
75-
workflowId,
76-
runId,
77-
false,
78-
`Replay failed due workflow task failure. Details: ${evictJob.message}`
79-
);
49+
return new ReplayError(`Replay failed due workflow task failure. Details: ${evictJob.message}`);
8050
// Both of these reasons are not considered errors.
8151
// LANG_REQUESTED is used internally by Core to support duplicate runIds during replay.
8252
case EvictionReason.LANG_REQUESTED:
8353
case EvictionReason.CACHE_FULL:
84-
return null;
54+
return undefined;
8555
case undefined:
8656
case null:
8757
case EvictionReason.UNSPECIFIED:
@@ -91,9 +61,6 @@ export function handleReplayEviction(evictJob: RemoveFromCache, workflowId: stri
9161
case EvictionReason.PAGINATION_OR_HISTORY_FETCH:
9262
case EvictionReason.FATAL:
9363
return new ReplayError(
94-
workflowId,
95-
runId,
96-
false,
9764
`Replay failed due to internal SDK issue. Code: ${
9865
evictJob.reason ? EvictionReason[evictJob.reason] : 'absent'
9966
}, Details: ${evictJob.message}`

packages/worker/src/worker-options.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -402,20 +402,12 @@ export interface ReplayWorkerOptions
402402
| 'maxCachedWorkflows'
403403
> {
404404
/**
405-
* A name for this replay worker. It will be combined with a short random ID to form a unique
405+
* A optional name for this replay worker. It will be combined with an incremental ID to form a unique
406406
* task queue for the replay worker.
407407
*
408-
* @default workflow name from given history
408+
* @default "fake_replay_queue"
409409
*/
410410
replayName?: string;
411-
/**
412-
* If set to false, and replaying with multiple histories, do not throw an exception upon
413-
* encountering a problem with replaying a workflow. Instead, finish replaying all workflows
414-
* and return a results object containing the failures (if any).
415-
*
416-
* @default true
417-
*/
418-
failFast?: boolean;
419411
}
420412

421413
/**

0 commit comments

Comments
 (0)