Skip to content

Commit b39f7f3

Browse files
authored
chore: Refactor mass history replayer API (#974)
1 parent 74e250c commit b39f7f3

16 files changed

+462
-177
lines changed

.eslintrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"prettier"
1313
],
1414
"rules": {
15+
"eqeqeq": ["error", "always", { "null": "ignore" }],
1516
"no-duplicate-imports": "error",
1617
"object-shorthand": ["error", "always"],
1718
"deprecation/deprecation": "warn",

package-lock.json

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"@grpc/grpc-js": "^1.6.7",
1717
"@temporalio/common": "file:../common",
1818
"@temporalio/proto": "file:../proto",
19+
"abort-controller": "^3.0.0",
1920
"long": "^5.2.0",
2021
"uuid": "^8.3.2"
2122
},
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { EventEmitter, on, once } from 'node:events';
2+
import { AbortController } from 'abort-controller';
3+
4+
export interface MapAsyncOptions {
5+
/**
6+
* How many items to map concurrently. If set to less than 2 (or not set), then items are not mapped concurrently.
7+
*
8+
* When items are mapped concurrently, mapped items are returned by the resulting iterator in the order they complete
9+
* mapping, not the order in which the corresponding source items were obtained from the source iterator.
10+
*
11+
* @default 1 (ie. items are not mapped concurrently)
12+
*/
13+
concurrency?: number;
14+
15+
/**
16+
* Maximum number of mapped items to keep in buffer, ready for consumption.
17+
*
18+
* Ignored unless `concurrency > 1`. No limit applies if set to `undefined`.
19+
*
20+
* @default unlimited
21+
*/
22+
bufferLimit?: number | undefined;
23+
}
24+
25+
function toAsyncIterator<A>(iterable: AsyncIterable<A>): AsyncIterator<A> {
26+
return iterable[Symbol.asyncIterator]();
27+
}
28+
29+
/**
30+
* Return an async iterable that transforms items from a source iterable by mapping each item
31+
* through a mapping function.
32+
*
33+
* If `concurrency > 1`, then up to `concurrency` items may be mapped concurrently. In that case,
34+
* items are returned by the resulting iterator in the order they complete processing, not the order
35+
* in which the corresponding source items were obtained from the source iterator.
36+
*
37+
* @param source the source async iterable
38+
* @param mapFn a mapping function to apply on every item of the source iterable
39+
*/
40+
export async function* mapAsyncIterable<A, B>(
41+
source: AsyncIterable<A>,
42+
mapFn: (val: A) => Promise<B>,
43+
options?: MapAsyncOptions
44+
): AsyncIterable<B> {
45+
const { concurrency, bufferLimit } = options ?? {};
46+
47+
if (!concurrency || concurrency < 2) {
48+
for await (const x of source) {
49+
yield mapFn(x);
50+
}
51+
return;
52+
}
53+
54+
const sourceIterator = toAsyncIterator(source);
55+
56+
const emitter = new EventEmitter();
57+
const controller = new AbortController();
58+
const emitterEventsIterable: AsyncIterable<[B]> = on(emitter, 'result', { signal: controller.signal });
59+
const emitterError: Promise<unknown[]> = once(emitter, 'error');
60+
61+
const bufferLimitSemaphore =
62+
typeof bufferLimit === 'number'
63+
? (() => {
64+
const releaseEvents: AsyncIterator<void> = toAsyncIterator(
65+
on(emitter, 'released', { signal: controller.signal })
66+
);
67+
let value = bufferLimit + concurrency;
68+
69+
return {
70+
acquire: async () => {
71+
while (value <= 0) {
72+
await Promise.race([releaseEvents.next(), emitterError]);
73+
}
74+
value--;
75+
},
76+
release: () => {
77+
value++;
78+
emitter.emit('released');
79+
},
80+
};
81+
})()
82+
: undefined;
83+
84+
const mapper = async () => {
85+
for (;;) {
86+
await bufferLimitSemaphore?.acquire();
87+
const val = await Promise.race([sourceIterator.next(), emitterError]);
88+
89+
if (Array.isArray(val)) return;
90+
if ((val as IteratorResult<[B]>)?.done) return;
91+
92+
emitter.emit('result', await mapFn(val.value));
93+
}
94+
};
95+
96+
const mappers = Array(concurrency)
97+
.fill(mapper)
98+
.map((f: typeof mapper) => f());
99+
100+
Promise.all(mappers).then(
101+
() => controller.abort(),
102+
(err) => emitter.emit('error', err)
103+
);
104+
105+
try {
106+
for await (const [res] of emitterEventsIterable) {
107+
bufferLimitSemaphore?.release();
108+
yield res;
109+
}
110+
} catch (err: unknown) {
111+
if ((err as Error)?.name === 'AbortError') {
112+
return;
113+
}
114+
throw err;
115+
}
116+
}

packages/client/src/workflow-client.ts

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
CancelledFailure,
55
compileRetryPolicy,
66
mapToPayloads,
7+
HistoryAndWorkflowId,
78
QueryDefinition,
89
RetryState,
910
searchAttributePayloadConverter,
@@ -18,6 +19,7 @@ import {
1819
WorkflowResultType,
1920
} from '@temporalio/common';
2021
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
22+
import { History } from '@temporalio/common/lib/proto-utils';
2123
import {
2224
decodeArrayFromPayloads,
2325
decodeFromPayloadsAtIndex,
@@ -65,6 +67,7 @@ import {
6567
LoadedWithDefaults,
6668
WithDefaults,
6769
} from './base-client';
70+
import { mapAsyncIterable } from './iterators-utils';
6871

6972
/**
7073
* A client side handle to a single Workflow instance.
@@ -131,6 +134,11 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
131134
*/
132135
describe(): Promise<WorkflowExecutionDescription>;
133136

137+
/**
138+
* Return a workflow execution's history
139+
*/
140+
fetchHistory(): Promise<History>;
141+
134142
/**
135143
* Readonly accessor to the underlying WorkflowClient
136144
*/
@@ -245,6 +253,19 @@ interface WorkflowHandleOptions extends GetWorkflowHandleOptions {
245253
runIdForResult?: string;
246254
}
247255

256+
/**
257+
* An iterable list of WorkflowExecution, as returned by {@link WorkflowClient.list}.
258+
*/
259+
interface AsyncWorkflowListIterable extends AsyncIterable<WorkflowExecutionInfo> {
260+
/**
261+
* Return an iterable of histories corresponding to this iterable's WorkflowExecutions.
262+
* Workflow histories will be fetched concurrently.
263+
*
264+
* Useful in batch replaying
265+
*/
266+
intoHistories: (intoHistoriesOptions?: IntoHistoriesOptions) => AsyncIterable<HistoryAndWorkflowId>;
267+
}
268+
248269
/**
249270
* Options for {@link WorkflowClient.list}
250271
*/
@@ -261,6 +282,31 @@ export interface ListOptions {
261282
query?: string;
262283
}
263284

285+
/**
286+
* Options for {@link WorkflowClient.list().intoHistories()}
287+
*/
288+
export interface IntoHistoriesOptions {
289+
/**
290+
* Maximum number of workflow histories to download concurrently.
291+
*
292+
* @default 5
293+
*/
294+
concurrency?: number;
295+
296+
/**
297+
* Maximum number of workflow histories to buffer ahead, ready for consumption.
298+
*
299+
* It is recommended to set `bufferLimit` to a rasonnably low number if it is expected that the
300+
* iterable may be stopped before reaching completion (for example, when implementing a fail fast
301+
* bach replay test).
302+
*
303+
* Ignored unless `concurrency > 1`. No limit applies if set to `undefined`.
304+
*
305+
* @default unlimited
306+
*/
307+
bufferLimit?: number;
308+
}
309+
264310
/**
265311
* Client for starting Workflow executions and creating Workflow handles.
266312
*
@@ -799,6 +845,22 @@ export class WorkflowClient extends BaseClient {
799845
(info as unknown as WorkflowExecutionDescription).raw = raw;
800846
return info;
801847
},
848+
async fetchHistory() {
849+
let nextPageToken: Uint8Array | undefined = undefined;
850+
const events = Array<temporal.api.history.v1.IHistoryEvent>();
851+
for (;;) {
852+
const response: temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse =
853+
await this.client.workflowService.getWorkflowExecutionHistory({
854+
nextPageToken,
855+
namespace: this.client.options.namespace,
856+
execution: { workflowId, runId },
857+
});
858+
events.push(...(response.history?.events ?? []));
859+
nextPageToken = response.nextPageToken;
860+
if (nextPageToken == null || nextPageToken.length === 0) break;
861+
}
862+
return temporal.api.history.v1.History.create({ events });
863+
},
802864
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
803865
const next = this.client._signalWorkflowHandler.bind(this.client);
804866
const fn = interceptors.length ? composeInterceptors(interceptors, 'signal', next) : next;
@@ -858,15 +920,7 @@ export class WorkflowClient extends BaseClient {
858920
});
859921
}
860922

861-
/**
862-
* List workflows by given `query`.
863-
*
864-
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
865-
*
866-
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
867-
* https://docs.temporal.io/visibility
868-
*/
869-
public async *list(options?: ListOptions): AsyncIterable<WorkflowExecutionInfo> {
923+
protected async *_list(options?: ListOptions): AsyncIterable<WorkflowExecutionInfo> {
870924
let nextPageToken: Uint8Array = Buffer.alloc(0);
871925
for (;;) {
872926
const response = await this.workflowService.listWorkflowExecutions({
@@ -882,10 +936,36 @@ export class WorkflowClient extends BaseClient {
882936
yield await executionInfoFromRaw(raw, this.dataConverter);
883937
}
884938
nextPageToken = response.nextPageToken;
885-
if (nextPageToken == null || nextPageToken.length == 0) break;
939+
if (nextPageToken == null || nextPageToken.length === 0) break;
886940
}
887941
}
888942

943+
/**
944+
* List workflows by given `query`.
945+
*
946+
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
947+
*
948+
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
949+
* https://docs.temporal.io/visibility
950+
*/
951+
public list(options?: ListOptions): AsyncWorkflowListIterable {
952+
return {
953+
[Symbol.asyncIterator]: () => this._list(options)[Symbol.asyncIterator](),
954+
intoHistories: (intoHistoriesOptions?: IntoHistoriesOptions) => {
955+
return mapAsyncIterable(
956+
this._list(options),
957+
async ({ workflowId, runId }) => ({
958+
workflowId,
959+
history: await this.getHandle(workflowId, runId)
960+
.fetchHistory()
961+
.catch((_) => undefined),
962+
}),
963+
{ concurrency: intoHistoriesOptions?.concurrency ?? 5 }
964+
);
965+
},
966+
};
967+
}
968+
889969
protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] {
890970
if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) {
891971
// eslint-disable-next-line deprecation/deprecation

packages/common/src/interfaces.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,11 @@ export type ActivityInterface = Record<string, ActivityFunction>;
6262
* Mapping of Activity name to function
6363
*/
6464
export type UntypedActivities = Record<string, ActivityFunction>;
65+
66+
/**
67+
* A workflow's history and ID. Useful for replay.
68+
*/
69+
export interface HistoryAndWorkflowId {
70+
workflowId: string;
71+
history: temporal.api.history.v1.History | unknown | undefined;
72+
}

0 commit comments

Comments
 (0)