Skip to content

Commit 6a91390

Browse files
authored
feat: Make client gRPC retry more configurable (#879)
1 parent 2dad3d4 commit 6a91390

File tree

2 files changed

+194
-76
lines changed

2 files changed

+194
-76
lines changed

packages/client/src/grpc-retry.ts

Lines changed: 111 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,85 @@
1-
import {
2-
InterceptingCall,
3-
Interceptor,
4-
ListenerBuilder,
5-
Metadata,
6-
RequesterBuilder,
7-
StatusObject,
8-
} from '@grpc/grpc-js';
1+
import { InterceptingCall, Interceptor, ListenerBuilder, RequesterBuilder, StatusObject } from '@grpc/grpc-js';
92
import * as grpc from '@grpc/grpc-js';
103

114
export interface GrpcRetryOptions {
12-
/** Maximum number of allowed retries. Defaults to 10. */
13-
maxRetries: number;
14-
155
/**
16-
* A function which accepts the current retry attempt (starts at 0) and returns the millisecond
6+
* A function which accepts the current retry attempt (starts at 1) and returns the millisecond
177
* delay that should be applied before the next retry.
188
*/
19-
delayFunction: (attempt: number) => number;
9+
delayFunction: (attempt: number, status: StatusObject) => number;
2010

2111
/**
2212
* A function which accepts a failed status object and returns true if the call should be retried
2313
*/
24-
retryableDecider: (status: StatusObject) => boolean;
14+
retryableDecider: (attempt: number, status: StatusObject) => boolean;
15+
}
16+
17+
/**
18+
* Options for the backoff formula: `factor ^ attempt * initialIntervalMs(status) * jitter(maxJitter)`
19+
*/
20+
export interface BackoffOptions {
21+
/**
22+
* Exponential backoff factor
23+
*
24+
* @default 2
25+
*/
26+
factor: number;
27+
28+
/**
29+
* Maximum number of attempts
30+
*
31+
* @default 10
32+
*/
33+
maxAttempts: number;
34+
/**
35+
* Maximum amount of jitter to apply
36+
*
37+
* @default 0.1
38+
*/
39+
maxJitter: number;
40+
/**
41+
* Function that returns the "initial" backoff interval based on the returned status.
42+
*
43+
* The default is 1 second for RESOURCE_EXHAUSTED errors and 20 millis for other retryable errors.
44+
*/
45+
initialIntervalMs(status: StatusObject): number;
2546
}
2647

27-
export function defaultGrpcRetryOptions(): GrpcRetryOptions {
48+
/**
49+
* Add defaults as documented in {@link BackoffOptions}
50+
*/
51+
function withDefaultBackoffOptions({
52+
maxAttempts,
53+
factor,
54+
maxJitter,
55+
initialIntervalMs,
56+
}: Partial<BackoffOptions>): BackoffOptions {
2857
return {
29-
maxRetries: 10,
30-
delayFunction: backOffAmount,
31-
retryableDecider: isRetryableError,
58+
maxAttempts: maxAttempts ?? 10,
59+
factor: factor ?? 2,
60+
maxJitter: maxJitter ?? 0.1,
61+
initialIntervalMs: initialIntervalMs ?? defaultInitialIntervalMs,
3262
};
3363
}
3464

65+
/**
66+
* Generates the default retry behavior based on given backoff options
67+
*/
68+
export function defaultGrpcRetryOptions(options: Partial<BackoffOptions> = {}): GrpcRetryOptions {
69+
const { maxAttempts, factor, maxJitter, initialIntervalMs } = withDefaultBackoffOptions(options);
70+
return {
71+
delayFunction(attempt, status) {
72+
return factor ** attempt * initialIntervalMs(status) * jitter(maxJitter);
73+
},
74+
retryableDecider(attempt, status) {
75+
return attempt < maxAttempts && isRetryableError(status);
76+
},
77+
};
78+
}
79+
80+
/**
81+
* Set of retryable gRPC status codes
82+
*/
3583
const retryableCodes = new Set([
3684
grpc.status.UNKNOWN,
3785
grpc.status.RESOURCE_EXHAUSTED,
@@ -45,69 +93,75 @@ export function isRetryableError(status: StatusObject): boolean {
4593
return retryableCodes.has(status.code);
4694
}
4795

48-
/** Return backoff amount in ms */
49-
export function backOffAmount(attempt: number): number {
50-
return 2 ** attempt * 20;
96+
/**
97+
* Calculates random amount of jitter between 0 and `max`
98+
*/
99+
function jitter(max: number) {
100+
return 1 - max + Math.random() * max * 2;
101+
}
102+
103+
/**
104+
* Default implementation - backs off more on RESOURCE_EXHAUSTED errors
105+
*/
106+
function defaultInitialIntervalMs({ code }: StatusObject) {
107+
// Backoff more on RESOURCE_EXHAUSTED
108+
if (code === grpc.status.RESOURCE_EXHAUSTED) {
109+
return 1000;
110+
}
111+
return 20;
51112
}
52113

53114
/**
54115
* Returns a GRPC interceptor that will perform automatic retries for some types of failed calls
55116
*
56117
* @param retryOptions Options for the retry interceptor
57118
*/
58-
export function makeGrpcRetryInterceptor(retryOptions: GrpcRetryOptions): Interceptor {
119+
export function makeGrpcRetryInterceptor({ retryableDecider, delayFunction }: GrpcRetryOptions): Interceptor {
59120
return (options, nextCall) => {
60-
let savedMetadata: Metadata;
61121
let savedSendMessage: any;
62122
let savedReceiveMessage: any;
63-
let savedMessageNext: any;
123+
let savedMessageNext: (message: any) => void;
124+
64125
const requester = new RequesterBuilder()
65126
.withStart(function (metadata, _listener, next) {
66-
savedMetadata = metadata;
67-
const newListener = new ListenerBuilder()
127+
// First attempt
128+
let attempt = 1;
129+
130+
const listener = new ListenerBuilder()
68131
.withOnReceiveMessage((message, next) => {
69132
savedReceiveMessage = message;
70133
savedMessageNext = next;
71134
})
72135
.withOnReceiveStatus((status, next) => {
73-
let retries = 0;
74-
const retry = (message: any, metadata: Metadata) => {
75-
retries++;
76-
const newCall = nextCall(options);
77-
newCall.start(metadata, {
78-
onReceiveMessage: (message) => {
136+
const retry = () => {
137+
attempt++;
138+
const call = nextCall(options);
139+
call.start(metadata, {
140+
onReceiveMessage(message) {
79141
savedReceiveMessage = message;
80142
},
81-
onReceiveStatus: (status) => {
82-
if (retryOptions.retryableDecider(status)) {
83-
if (retries <= retryOptions.maxRetries) {
84-
setTimeout(() => retry(message, metadata), retryOptions.delayFunction(retries));
85-
} else {
86-
savedMessageNext(savedReceiveMessage);
87-
next(status);
88-
}
89-
} else {
90-
savedMessageNext(savedReceiveMessage);
91-
// TODO: For reasons that are completely unclear to me, if you pass a handcrafted
92-
// status object here, node will magically just exit at the end of this line.
93-
// No warning, no nothing. Here be dragons.
94-
next(status);
95-
}
96-
},
143+
onReceiveStatus,
97144
});
98-
newCall.sendMessage(message);
99-
newCall.halfClose();
145+
call.sendMessage(savedSendMessage);
146+
call.halfClose();
147+
};
148+
149+
const onReceiveStatus = (status: StatusObject) => {
150+
if (retryableDecider(attempt, status)) {
151+
setTimeout(retry, delayFunction(attempt, status));
152+
} else {
153+
savedMessageNext(savedReceiveMessage);
154+
// TODO: For reasons that are completely unclear to me, if you pass a handcrafted
155+
// status object here, node will magically just exit at the end of this line.
156+
// No warning, no nothing. Here be dragons.
157+
next(status);
158+
}
100159
};
101160

102-
if (retryOptions.retryableDecider(status)) {
103-
setTimeout(() => retry(savedSendMessage, savedMetadata), backOffAmount(retries));
104-
} else {
105-
savedMessageNext(savedReceiveMessage);
106-
next(status);
107-
}
161+
onReceiveStatus(status);
108162
})
109163
.build();
110-
next(metadata, newListener);
164+
next(metadata, listener);
111165
})
112166
.withSendMessage((message, next) => {
113167
savedSendMessage = message;

packages/test/src/test-client-connection.ts

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,30 @@ import util from 'util';
33
import path from 'path';
44
import * as grpc from '@grpc/grpc-js';
55
import * as protoLoader from '@grpc/proto-loader';
6-
import { Connection } from '@temporalio/client';
6+
import { Connection, defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from '@temporalio/client';
77
import pkg from '@temporalio/client/lib/pkg';
88
import { temporal, grpc as grpcProto } from '@temporalio/proto';
99

10-
test('withMetadata / withDeadline set the CallContext for RPC call', async (t) => {
11-
const packageDefinition = protoLoader.loadSync(
12-
path.resolve(
13-
__dirname,
14-
'../../core-bridge/sdk-core/protos/api_upstream/temporal/api/workflowservice/v1/service.proto'
15-
),
16-
{ includeDirs: [path.resolve(__dirname, '../../core-bridge/sdk-core/protos/api_upstream')] }
17-
);
18-
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition) as any;
10+
const workflowServicePackageDefinition = protoLoader.loadSync(
11+
path.resolve(
12+
__dirname,
13+
'../../core-bridge/sdk-core/protos/api_upstream/temporal/api/workflowservice/v1/service.proto'
14+
),
15+
{ includeDirs: [path.resolve(__dirname, '../../core-bridge/sdk-core/protos/api_upstream')] }
16+
);
17+
const workflowServiceProtoDescriptor = grpc.loadPackageDefinition(workflowServicePackageDefinition) as any;
1918

19+
async function bindLocalhost(server: grpc.Server): Promise<number> {
20+
return await util.promisify(server.bindAsync.bind(server))('127.0.0.1:0', grpc.ServerCredentials.createInsecure());
21+
}
22+
23+
test('withMetadata / withDeadline set the CallContext for RPC call', async (t) => {
2024
const server = new grpc.Server();
2125
let gotTestHeaders = false;
2226
let gotDeadline = false;
2327
const deadline = Date.now() + 10000;
2428

25-
server.addService(protoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
29+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
2630
registerNamespace(
2731
call: grpc.ServerUnaryCall<
2832
temporal.api.workflowservice.v1.IRegisterNamespaceRequest,
@@ -52,10 +56,7 @@ test('withMetadata / withDeadline set the CallContext for RPC call', async (t) =
5256
callback(null, {});
5357
},
5458
});
55-
const port = await util.promisify(server.bindAsync.bind(server))(
56-
'127.0.0.1:0',
57-
grpc.ServerCredentials.createInsecure()
58-
);
59+
const port = await bindLocalhost(server);
5960
server.start();
6061
const conn = await Connection.connect({ address: `127.0.0.1:${port}`, metadata: { staticKey: 'set' } });
6162
await conn.withMetadata({ test: 'true' }, () =>
@@ -88,12 +89,75 @@ test('healthService works', async (t) => {
8889
);
8990
},
9091
});
91-
const port = await util.promisify(server.bindAsync.bind(server))(
92-
'127.0.0.1:0',
93-
grpc.ServerCredentials.createInsecure()
94-
);
92+
const port = await bindLocalhost(server);
9593
server.start();
9694
const conn = await Connection.connect({ address: `127.0.0.1:${port}` });
9795
const response = await conn.healthService.check({});
9896
t.is(response.status, grpcProto.health.v1.HealthCheckResponse.ServingStatus.SERVING);
9997
});
98+
99+
test('grpc retry passes request and headers on retry, propagates responses', async (t) => {
100+
let attempt = 0;
101+
let successAttempt = 3;
102+
103+
const meta = Array<string>();
104+
const namespaces = Array<string>();
105+
106+
const server = new grpc.Server();
107+
108+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
109+
describeWorkflowExecution(
110+
call: grpc.ServerUnaryCall<
111+
temporal.api.workflowservice.v1.IDescribeWorkflowExecutionRequest,
112+
temporal.api.workflowservice.v1.IDescribeWorkflowExecutionResponse
113+
>,
114+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IRegisterNamespaceResponse>
115+
) {
116+
const { namespace } = call.request;
117+
if (typeof namespace === 'string') {
118+
namespaces.push(namespace);
119+
}
120+
const [aValue] = call.metadata.get('a');
121+
if (typeof aValue === 'string') {
122+
meta.push(aValue);
123+
}
124+
125+
attempt++;
126+
if (attempt < successAttempt) {
127+
callback({ code: grpc.status.UNKNOWN });
128+
return;
129+
}
130+
const response: temporal.api.workflowservice.v1.IDescribeWorkflowExecutionResponse = {
131+
workflowExecutionInfo: { execution: { workflowId: 'test' } },
132+
};
133+
callback(null, response);
134+
},
135+
});
136+
const port = await bindLocalhost(server);
137+
server.start();
138+
139+
// Default interceptor config with backoff factor of 1 to speed things up
140+
const interceptor = makeGrpcRetryInterceptor(defaultGrpcRetryOptions({ factor: 1 }));
141+
const conn = await Connection.connect({
142+
address: `127.0.0.1:${port}`,
143+
metadata: { a: 'bc' },
144+
interceptors: [interceptor],
145+
});
146+
const response = await conn.workflowService.describeWorkflowExecution({ namespace: 'a' });
147+
// Check that response is sent correctly
148+
t.is(response.workflowExecutionInfo?.execution?.workflowId, 'test');
149+
t.is(attempt, 3);
150+
// Check that request is sent correctly in each attempt
151+
t.deepEqual(namespaces, ['a', 'a', 'a']);
152+
// Check that metadata is sent correctly in each attempt
153+
t.deepEqual(meta, ['bc', 'bc', 'bc']);
154+
155+
// Reset and rerun expecting error in the response
156+
attempt = 0;
157+
successAttempt = 11; // never
158+
159+
await t.throwsAsync(() => conn.workflowService.describeWorkflowExecution({ namespace: 'a' }), {
160+
message: '2 UNKNOWN: Unknown Error',
161+
});
162+
t.is(attempt, 10);
163+
});

0 commit comments

Comments
 (0)