Skip to content

Commit adf872e

Browse files
authored
feat(client): Implement basic list API (#942)
1 parent 18d4c15 commit adf872e

File tree

4 files changed

+158
-93
lines changed

4 files changed

+158
-93
lines changed

packages/client/src/helpers.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import {
2+
LoadedDataConverter,
3+
mapFromPayloads,
4+
searchAttributePayloadConverter,
5+
SearchAttributes,
6+
} from '@temporalio/common';
7+
import { optionalTsToDate, tsToDate } from '@temporalio/common/lib/time';
8+
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
9+
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
10+
import { temporal } from '@temporalio/proto';
11+
12+
function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
13+
return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN';
14+
}
15+
16+
/**
17+
* Intentionally leave out `default` branch to get compilation errors when new values are added
18+
*/
19+
function workflowStatusCodeToNameInternal(
20+
code: temporal.api.enums.v1.WorkflowExecutionStatus
21+
): WorkflowExecutionStatusName {
22+
switch (code) {
23+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED:
24+
return 'UNSPECIFIED';
25+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING:
26+
return 'RUNNING';
27+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED:
28+
return 'FAILED';
29+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
30+
return 'TIMED_OUT';
31+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED:
32+
return 'CANCELLED';
33+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED:
34+
return 'TERMINATED';
35+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED:
36+
return 'COMPLETED';
37+
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
38+
return 'CONTINUED_AS_NEW';
39+
}
40+
}
41+
42+
export async function executionInfoFromRaw(
43+
raw: RawWorkflowExecutionInfo,
44+
dataConverter: LoadedDataConverter
45+
): Promise<WorkflowExecutionInfo> {
46+
return {
47+
/* eslint-disable @typescript-eslint/no-non-null-assertion */
48+
type: raw.type!.name!,
49+
workflowId: raw.execution!.workflowId!,
50+
runId: raw.execution!.runId!,
51+
taskQueue: raw.taskQueue!,
52+
status: {
53+
code: raw.status!,
54+
name: workflowStatusCodeToName(raw.status!),
55+
},
56+
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
57+
historyLength: raw.historyLength!.toNumber(),
58+
startTime: tsToDate(raw.startTime!),
59+
executionTime: optionalTsToDate(raw.executionTime),
60+
closeTime: optionalTsToDate(raw.closeTime),
61+
memo: await decodeMapFromPayloads(dataConverter, raw.memo?.fields),
62+
searchAttributes: Object.fromEntries(
63+
Object.entries(
64+
mapFromPayloads(searchAttributePayloadConverter, raw.searchAttributes?.indexedFields ?? {}) as SearchAttributes
65+
).filter(([_, v]) => v && v.length > 0) // Filter out empty arrays returned by pre 1.18 servers
66+
),
67+
parentExecution: raw.parentExecution
68+
? {
69+
workflowId: raw.parentExecution.workflowId!,
70+
runId: raw.parentExecution.runId!,
71+
}
72+
: undefined,
73+
raw,
74+
};
75+
}

packages/client/src/types.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { SearchAttributes } from '@temporalio/common';
22
import * as proto from '@temporalio/proto';
33
import type * as grpc from '@grpc/grpc-js';
4+
import { Replace } from '@temporalio/common/lib/type-helpers';
45

56
export interface WorkflowExecution {
67
workflowId: string;
@@ -11,6 +12,7 @@ export type GetWorkflowExecutionHistoryRequest =
1112
proto.temporal.api.workflowservice.v1.IGetWorkflowExecutionHistoryRequest;
1213
export type DescribeWorkflowExecutionResponse =
1314
proto.temporal.api.workflowservice.v1.IDescribeWorkflowExecutionResponse;
15+
export type RawWorkflowExecutionInfo = proto.temporal.api.workflow.v1.IWorkflowExecutionInfo;
1416
export type TerminateWorkflowExecutionResponse =
1517
proto.temporal.api.workflowservice.v1.ITerminateWorkflowExecutionResponse;
1618
export type RequestCancelWorkflowExecutionResponse =
@@ -27,7 +29,7 @@ export type WorkflowExecutionStatusName =
2729
| 'TIMED_OUT'
2830
| 'UNKNOWN'; // UNKNOWN is reserved for future enum values
2931

30-
export interface WorkflowExecutionDescription {
32+
export interface WorkflowExecutionInfo {
3133
type: string;
3234
workflowId: string;
3335
runId: string;
@@ -40,9 +42,16 @@ export interface WorkflowExecutionDescription {
4042
memo?: Record<string, unknown>;
4143
searchAttributes: SearchAttributes;
4244
parentExecution?: Required<proto.temporal.api.common.v1.IWorkflowExecution>;
43-
raw: DescribeWorkflowExecutionResponse;
45+
raw: RawWorkflowExecutionInfo;
4446
}
4547

48+
export type WorkflowExecutionDescription = Replace<
49+
WorkflowExecutionInfo,
50+
{
51+
raw: DescribeWorkflowExecutionResponse;
52+
}
53+
>;
54+
4655
export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowService;
4756
export const { WorkflowService } = proto.temporal.api.workflowservice.v1;
4857
export type OperatorService = proto.temporal.api.operatorservice.v1.OperatorService;

packages/client/src/workflow-client.ts

Lines changed: 49 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ import {
55
compileRetryPolicy,
66
DataConverter,
77
LoadedDataConverter,
8-
mapFromPayloads,
98
mapToPayloads,
109
QueryDefinition,
1110
RetryState,
1211
searchAttributePayloadConverter,
13-
SearchAttributes,
1412
SignalDefinition,
1513
TerminatedFailure,
1614
TimeoutFailure,
@@ -21,12 +19,10 @@ import {
2119
WorkflowNotFoundError,
2220
WorkflowResultType,
2321
} from '@temporalio/common';
24-
import { tsToDate, optionalTsToDate } from '@temporalio/common/lib/time';
2522
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
2623
import {
2724
decodeArrayFromPayloads,
2825
decodeFromPayloadsAtIndex,
29-
decodeMapFromPayloads,
3026
decodeOptionalFailureToOptionalError,
3127
encodeMapToPayloads,
3228
encodeToPayloads,
@@ -61,10 +57,11 @@ import {
6157
TerminateWorkflowExecutionResponse,
6258
WorkflowExecution,
6359
WorkflowExecutionDescription,
64-
WorkflowExecutionStatusName,
60+
WorkflowExecutionInfo,
6561
WorkflowService,
6662
} from './types';
6763
import { compileWorkflowOptions, WorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options';
64+
import { executionInfoFromRaw } from './helpers';
6865

6966
/**
7067
* A client side handle to a single Workflow instance.
@@ -288,6 +285,22 @@ interface WorkflowHandleOptions extends GetWorkflowHandleOptions {
288285
*/
289286
export type WorkflowStartOptions<T extends Workflow = Workflow> = WithWorkflowArgs<T, WorkflowOptions>;
290287

288+
/**
289+
* Options for {@link WorkflowClient.list}
290+
*/
291+
export interface ListOptions {
292+
/**
293+
* Maximum number of results to fetch per page.
294+
*
295+
* @default depends on server config, typically 1000
296+
*/
297+
pageSize?: number;
298+
/**
299+
* Query string for matching and ordering the results
300+
*/
301+
query?: string;
302+
}
303+
291304
/**
292305
* Client for starting Workflow executions and creating Workflow handles.
293306
*
@@ -848,38 +861,9 @@ export class WorkflowClient {
848861
const raw = await fn({
849862
workflowExecution: { workflowId, runId },
850863
});
851-
return {
852-
/* eslint-disable @typescript-eslint/no-non-null-assertion */
853-
type: raw.workflowExecutionInfo!.type!.name!,
854-
workflowId: raw.workflowExecutionInfo!.execution!.workflowId!,
855-
runId: raw.workflowExecutionInfo!.execution!.runId!,
856-
taskQueue: raw.workflowExecutionInfo!.taskQueue!,
857-
status: {
858-
code: raw.workflowExecutionInfo!.status!,
859-
name: workflowStatusCodeToName(raw.workflowExecutionInfo!.status!),
860-
},
861-
// Safe to convert to number, max history length is 50k, which is much less than Number.MAX_SAFE_INTEGER
862-
historyLength: raw.workflowExecutionInfo!.historyLength!.toNumber(),
863-
startTime: tsToDate(raw.workflowExecutionInfo!.startTime!),
864-
executionTime: optionalTsToDate(raw.workflowExecutionInfo!.executionTime),
865-
closeTime: optionalTsToDate(raw.workflowExecutionInfo!.closeTime),
866-
memo: await decodeMapFromPayloads(this.client.dataConverter, raw.workflowExecutionInfo!.memo?.fields),
867-
searchAttributes: Object.fromEntries(
868-
Object.entries(
869-
mapFromPayloads(
870-
searchAttributePayloadConverter,
871-
raw.workflowExecutionInfo!.searchAttributes?.indexedFields ?? {}
872-
) as SearchAttributes
873-
).filter(([_, v]) => v && v.length > 0) // Filter out empty arrays returned by pre 1.18 servers
874-
),
875-
parentExecution: raw.workflowExecutionInfo?.parentExecution
876-
? {
877-
workflowId: raw.workflowExecutionInfo.parentExecution.workflowId!,
878-
runId: raw.workflowExecutionInfo.parentExecution.runId!,
879-
}
880-
: undefined,
881-
raw,
882-
};
864+
const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter);
865+
(info as unknown as WorkflowExecutionDescription).raw = raw;
866+
return info;
883867
},
884868
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
885869
const next = this.client._signalWorkflowHandler.bind(this.client);
@@ -939,6 +923,34 @@ export class WorkflowClient {
939923
followRuns: options?.followRuns ?? true,
940924
});
941925
}
926+
927+
/**
928+
* List workflows by given `query`.
929+
*
930+
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
931+
*
932+
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
933+
* https://docs.temporal.io/visibility
934+
*/
935+
public async *list(options?: ListOptions): AsyncIterable<WorkflowExecutionInfo> {
936+
let nextPageToken: Uint8Array = Buffer.alloc(0);
937+
for (;;) {
938+
const response = await this.workflowService.listWorkflowExecutions({
939+
namespace: this.options.namespace,
940+
query: options?.query,
941+
nextPageToken,
942+
pageSize: options?.pageSize,
943+
});
944+
// Not decoding memo payloads concurrently even though we could have to keep the lazy nature of this iterator.
945+
// Decoding is done for `memo` fields which tend to be small.
946+
// We might decide to change that based on user feedback.
947+
for (const raw of response.executions) {
948+
yield await executionInfoFromRaw(raw, this.dataConverter);
949+
}
950+
nextPageToken = response.nextPageToken;
951+
if (nextPageToken == null || nextPageToken.length == 0) break;
952+
}
953+
}
942954
}
943955

944956
export class QueryRejectedError extends Error {
@@ -954,33 +966,3 @@ export class QueryNotRegisteredError extends Error {
954966
super(message);
955967
}
956968
}
957-
958-
function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
959-
return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN';
960-
}
961-
962-
/**
963-
* Intentionally leave out `default` branch to get compilation errors when new values are added
964-
*/
965-
function workflowStatusCodeToNameInternal(
966-
code: temporal.api.enums.v1.WorkflowExecutionStatus
967-
): WorkflowExecutionStatusName {
968-
switch (code) {
969-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED:
970-
return 'UNSPECIFIED';
971-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING:
972-
return 'RUNNING';
973-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED:
974-
return 'FAILED';
975-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
976-
return 'TIMED_OUT';
977-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED:
978-
return 'CANCELLED';
979-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED:
980-
return 'TERMINATED';
981-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED:
982-
return 'COMPLETED';
983-
case temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
984-
return 'CONTINUED_AS_NEW';
985-
}
986-
}

packages/test/src/integration-tests.ts

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,33 +1325,32 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
13251325
t.pass();
13261326
});
13271327

1328-
test('Download and replay multiple executions', async (t) => {
1328+
/**
1329+
* NOTE: this test uses the `list` API which requires advanced visibility as of server 1.18.
1330+
* Run with docker-compose
1331+
*/
1332+
test('Download and replay multiple executions with client list method', async (t) => {
13291333
const { metaClient: client } = t.context;
13301334
const taskQueue = 'test';
1331-
const e1 = await client.workflow.start(workflows.argsAndReturn, {
1332-
taskQueue,
1333-
workflowId: uuid4(),
1334-
args: ['Hello', undefined, u8('world!')],
1335-
});
1336-
const e2 = await client.workflow.start(workflows.cancelFakeProgress, {
1337-
taskQueue,
1338-
workflowId: uuid4(),
1339-
});
1340-
const e3 = await client.workflow.start(workflows.childWorkflowInvoke, {
1341-
taskQueue,
1342-
workflowId: uuid4(),
1343-
});
1344-
const e4 = await client.workflow.start(workflows.activityFailures, {
1345-
taskQueue,
1346-
workflowId: uuid4(),
1347-
});
1348-
const handles = [e1, e2, e3, e4];
1335+
const fns = [
1336+
workflows.http,
1337+
workflows.cancelFakeProgress,
1338+
workflows.childWorkflowInvoke,
1339+
workflows.activityFailures,
1340+
];
1341+
const handles = await Promise.all(
1342+
fns.map((fn) =>
1343+
client.workflow.start(fn, {
1344+
taskQueue,
1345+
workflowId: uuid4(),
1346+
})
1347+
)
1348+
);
1349+
// Wait for the workflows to complete first
13491350
await Promise.all(handles.map((h) => h.result()));
1350-
const executions = (async function* () {
1351-
for (const { workflowId, firstExecutionRunId } of handles) {
1352-
yield { workflowId, runId: firstExecutionRunId };
1353-
}
1354-
})();
1351+
// Test the list API too while we're at it
1352+
const workflowIds = handles.map(({ workflowId }) => `'${workflowId}'`);
1353+
const executions = client.workflow.list({ query: `WorkflowId IN (${workflowIds.join(', ')})` });
13551354

13561355
await Worker.runReplayHistories(
13571356
{

0 commit comments

Comments
 (0)