Skip to content

Commit d545898

Browse files
bergundylorensr
andauthored
feat: Add a meta Client class (#870)
Co-authored-by: Loren ☺️ <251288+lorensr@users.noreply.github.com>
1 parent e8855dc commit d545898

File tree

12 files changed

+337
-100
lines changed

12 files changed

+337
-100
lines changed

packages/activity/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export {
6363
* Throw this error from an Activity in order to make the Worker forget about this Activity.
6464
*
6565
* The Activity can then be completed asynchronously (from anywhere—usually outside the Worker) using
66-
* {@link AsyncCompletionClient}.
66+
* {@link Client.activity}.
6767
*
6868
* @example
6969
*

packages/client/src/async-completion-client.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
encodeErrorToFailure,
55
encodeToPayloads,
66
filterNullAndUndefined,
7+
isLoadedDataConverter,
78
loadDataConverter,
89
} from '@temporalio/internal-non-workflow-common';
910
import { Replace } from '@temporalio/internal-workflow-common';
@@ -41,9 +42,9 @@ export class ActivityCancelledError extends Error {
4142
*/
4243
export interface AsyncCompletionClientOptions {
4344
/**
44-
* {@link DataConverter} to use for serializing and deserializing payloads
45+
* {@link DataConverter} or {@link LoadedDataConverter} to use for serializing and deserializing payloads
4546
*/
46-
dataConverter?: DataConverter;
47+
dataConverter?: DataConverter | LoadedDataConverter;
4748

4849
/**
4950
* Identity to report to the server
@@ -69,6 +70,10 @@ export type AsyncCompletionClientOptionsWithDefaults = Replace<
6970
}
7071
>;
7172

73+
export type LoadedAsyncCompletionClientOptions = AsyncCompletionClientOptionsWithDefaults & {
74+
loadedDataConverter: LoadedDataConverter;
75+
};
76+
7277
export function defaultAsyncCompletionClientOptions(): AsyncCompletionClientOptionsWithDefaults {
7378
return {
7479
dataConverter: {},
@@ -92,22 +97,38 @@ export interface FullActivityId {
9297

9398
/**
9499
* A client for asynchronous completion and heartbeating of Activities.
100+
*
101+
* Typically this client should not be instantiated directly, instead create the high level {@link Client} and use
102+
* {@link Client.activity} to complete async activities.
95103
*/
96104
export class AsyncCompletionClient {
97-
public readonly options: AsyncCompletionClientOptionsWithDefaults;
98-
protected readonly dataConverter: LoadedDataConverter;
105+
public readonly options: LoadedAsyncCompletionClientOptions;
99106
public readonly connection: ConnectionLike;
100107

101108
constructor(options?: AsyncCompletionClientOptions) {
102109
this.connection = options?.connection ?? Connection.lazy();
103-
this.dataConverter = loadDataConverter(options?.dataConverter);
104-
this.options = { ...defaultAsyncCompletionClientOptions(), ...filterNullAndUndefined(options ?? {}) };
110+
const dataConverter = options?.dataConverter;
111+
const loadedDataConverter = isLoadedDataConverter(dataConverter) ? dataConverter : loadDataConverter(dataConverter);
112+
this.options = {
113+
...defaultAsyncCompletionClientOptions(),
114+
...filterNullAndUndefined(options ?? {}),
115+
loadedDataConverter,
116+
};
105117
}
106118

119+
/**
120+
* Raw gRPC access to the Temporal service.
121+
*
122+
* **NOTE**: The namespace provided in {@link options} is **not** automatically set on requests made to the service.
123+
*/
107124
get workflowService(): WorkflowService {
108125
return this.connection.workflowService;
109126
}
110127

128+
protected get dataConverter(): LoadedDataConverter {
129+
return this.options.loadedDataConverter;
130+
}
131+
111132
/**
112133
* Transforms grpc errors into well defined TS errors.
113134
*/

packages/client/src/client.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { DataConverter, LoadedDataConverter } from '@temporalio/common';
2+
import { filterNullAndUndefined, loadDataConverter } from '@temporalio/internal-non-workflow-common';
3+
import { Replace } from '@temporalio/internal-workflow-common';
4+
import { temporal } from '@temporalio/proto';
5+
import os from 'os';
6+
import { AsyncCompletionClient } from './async-completion-client';
7+
import { Connection } from './connection';
8+
import { ClientInterceptors } from './interceptors';
9+
import { ConnectionLike, Metadata, WorkflowService } from './types';
10+
import { WorkflowClient } from './workflow-client';
11+
12+
export interface ClientOptions {
13+
/**
14+
* {@link DataConverter} to use for serializing and deserializing payloads
15+
*/
16+
dataConverter?: DataConverter;
17+
18+
/**
19+
* Used to override and extend default Connection functionality
20+
*
21+
* Useful for injecting auth headers and tracing Workflow executions
22+
*/
23+
interceptors?: ClientInterceptors;
24+
25+
/**
26+
* Identity to report to the server
27+
*
28+
* @default `${process.pid}@${os.hostname()}`
29+
*/
30+
identity?: string;
31+
32+
/**
33+
* Connection to use to communicate with the server.
34+
*
35+
* By default `WorkflowClient` connects to localhost.
36+
*
37+
* Connections are expensive to construct and should be reused.
38+
*/
39+
connection?: ConnectionLike;
40+
41+
/**
42+
* Server namespace
43+
*
44+
* @default default
45+
*/
46+
namespace?: string;
47+
48+
workflow?: {
49+
/**
50+
* Should a query be rejected by closed and failed workflows
51+
*
52+
* @default QUERY_REJECT_CONDITION_UNSPECIFIED which means that closed and failed workflows are still queryable
53+
*/
54+
queryRejectCondition?: temporal.api.enums.v1.QueryRejectCondition;
55+
};
56+
}
57+
58+
export type ClientOptionsWithDefaults = Replace<
59+
Required<ClientOptions>,
60+
{
61+
connection?: ConnectionLike;
62+
}
63+
>;
64+
65+
export type LoadedClientOptions = ClientOptionsWithDefaults & {
66+
loadedDataConverter: LoadedDataConverter;
67+
};
68+
69+
export function defaultClientOptions(): ClientOptionsWithDefaults {
70+
return {
71+
dataConverter: {},
72+
identity: `${process.pid}@${os.hostname()}`,
73+
interceptors: {},
74+
namespace: 'default',
75+
workflow: {
76+
queryRejectCondition: temporal.api.enums.v1.QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED,
77+
},
78+
};
79+
}
80+
81+
/**
82+
* High level SDK client.
83+
*
84+
*
85+
*/
86+
export class Client {
87+
/**
88+
* Underlying gRPC connection to the Temporal service
89+
*/
90+
public readonly connection: ConnectionLike;
91+
public readonly options: LoadedClientOptions;
92+
/**
93+
* Workflow sub-client - use to start and interact with Workflows
94+
*/
95+
public readonly workflow: WorkflowClient;
96+
/**
97+
* (Async) Activity completion sub-client - use to manually manage Activities
98+
*/
99+
public readonly activity: AsyncCompletionClient;
100+
101+
constructor(options?: ClientOptions) {
102+
this.connection = options?.connection ?? Connection.lazy();
103+
this.options = {
104+
...defaultClientOptions(),
105+
...filterNullAndUndefined(options ?? {}),
106+
loadedDataConverter: loadDataConverter(options?.dataConverter),
107+
};
108+
109+
const { workflow, loadedDataConverter, interceptors, ...base } = this.options;
110+
111+
this.workflow = new WorkflowClient({
112+
...base,
113+
...workflow,
114+
connection: this.connection,
115+
dataConverter: loadedDataConverter,
116+
interceptors: interceptors.workflow,
117+
});
118+
119+
this.activity = new AsyncCompletionClient({
120+
...base,
121+
connection: this.connection,
122+
dataConverter: loadedDataConverter,
123+
});
124+
}
125+
126+
/**
127+
* Raw gRPC access to the Temporal service.
128+
*
129+
* **NOTE**: The namespace provided in {@link options} is **not** automatically set on requests made to the service.
130+
*/
131+
get workflowService(): WorkflowService {
132+
return this.connection.workflowService;
133+
}
134+
135+
/**
136+
* Set the deadline for any service requests executed in `fn`'s scope.
137+
*/
138+
async withDeadline<R>(deadline: number | Date, fn: () => Promise<R>): Promise<R> {
139+
return await this.connection.withDeadline(deadline, fn);
140+
}
141+
142+
/**
143+
* Set metadata for any service requests executed in `fn`'s scope.
144+
*
145+
* @returns returned value of `fn`
146+
*
147+
* @see {@link Connection.withMetadata}
148+
*/
149+
async withMetadata<R>(metadata: Metadata, fn: () => Promise<R>): Promise<R> {
150+
return await this.connection.withMetadata(metadata, fn);
151+
}
152+
}

packages/client/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export * from '@temporalio/internal-workflow-common/lib/errors';
2828
export * from '@temporalio/internal-workflow-common/lib/interfaces';
2929
export * from '@temporalio/internal-workflow-common/lib/workflow-handle';
3030
export * from './async-completion-client';
31+
export * from './client';
3132
export { Connection, ConnectionOptions, ConnectionOptionsWithDefaults, LOCAL_TARGET } from './connection';
3233
export * from './errors';
3334
export * from './grpc-retry';

packages/client/src/interceptors.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,12 @@ export interface WorkflowClientCallsInterceptorFactory {
130130
export interface WorkflowClientInterceptors {
131131
calls?: WorkflowClientCallsInterceptorFactory[];
132132
}
133+
134+
/**
135+
* Interceptors for any high-level SDK client.
136+
*
137+
* NOTE: Currently only for {@link WorkflowClient}. More will be added later as needed.
138+
*/
139+
export interface ClientInterceptors {
140+
workflow?: WorkflowClientInterceptors;
141+
}

0 commit comments

Comments
 (0)