Skip to content

Commit 6673660

Browse files
[PECO-729] Improve retry behavior (#230)
* [PECO-729] Respect `Retry-After` header with falling back to backoff algorithm Signed-off-by: Levko Kravets <levko.ne@gmail.com> * [PECO-729] Extend list of HTTP status codes that could be retried Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Pass `Request` object in addition to `Response` to `HttpRetryPolicy` Signed-off-by: Levko Kravets <levko.ne@gmail.com> * [PECO-729] Retry only idempotent requests (HTTP GET + restricted set of Thrift operations) Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update HttpRetryPolicy logic; add/update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Reduce max retry attempts to 5 Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Use `Retry-After` as a base for backoff, not instead of it Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 08bbdfc commit 6673660

File tree

13 files changed

+566
-108
lines changed

13 files changed

+566
-108
lines changed

lib/DBSQLClient.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
7777
useArrowNativeTypes: true,
7878
socketTimeout: 15 * 60 * 1000, // 15 minutes
7979

80-
retryMaxAttempts: 30,
81-
retriesTimeout: 900 * 1000,
82-
retryDelayMin: 1 * 1000,
83-
retryDelayMax: 60 * 1000,
80+
retryMaxAttempts: 5,
81+
retriesTimeout: 15 * 60 * 1000, // 15 minutes
82+
retryDelayMin: 1 * 1000, // 1 second
83+
retryDelayMax: 60 * 1000, // 60 seconds (1 minute)
8484

8585
useCloudFetch: false,
8686
cloudFetchConcurrentDownloads: 10,

lib/connection/connections/HttpConnection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import thrift from 'thrift';
22
import https from 'https';
33
import http from 'http';
4-
import { HeadersInit, Response } from 'node-fetch';
4+
import { HeadersInit } from 'node-fetch';
55
import { ProxyAgent } from 'proxy-agent';
66

7-
import IConnectionProvider from '../contracts/IConnectionProvider';
7+
import IConnectionProvider, { HttpTransactionDetails } from '../contracts/IConnectionProvider';
88
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
99
import IClientContext from '../../contracts/IClientContext';
1010

@@ -120,7 +120,7 @@ export default class HttpConnection implements IConnectionProvider {
120120
return this.connection;
121121
}
122122

123-
public async getRetryPolicy(): Promise<IRetryPolicy<Response>> {
123+
public async getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>> {
124124
return new HttpRetryPolicy(this.context);
125125
}
126126
}
Lines changed: 64 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
1-
import { Response } from 'node-fetch';
21
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
3-
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
2+
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
3+
import IClientContext from '../../contracts/IClientContext';
44
import RetryError, { RetryErrorCode } from '../../errors/RetryError';
55

6-
function getRetryDelay(attempt: number, config: ClientConfig): number {
7-
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
8-
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
9-
}
10-
116
function delay(milliseconds: number): Promise<void> {
127
return new Promise<void>((resolve) => {
138
setTimeout(() => resolve(), milliseconds);
149
});
1510
}
1611

17-
export default class HttpRetryPolicy implements IRetryPolicy<Response> {
12+
export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
1813
private context: IClientContext;
1914

2015
private readonly startTime: number; // in milliseconds
@@ -27,53 +22,81 @@ export default class HttpRetryPolicy implements IRetryPolicy<Response> {
2722
this.attempt = 0;
2823
}
2924

30-
public async shouldRetry(response: Response): Promise<ShouldRetryResult> {
31-
if (!response.ok) {
32-
switch (response.status) {
33-
// On these status codes it's safe to retry the request. However,
34-
// both error codes mean that server is overwhelmed or even down.
35-
// Therefore, we need to add some delay between attempts so
36-
// server can recover and more likely handle next request
37-
case 429: // Too Many Requests
38-
case 503: // Service Unavailable
39-
this.attempt += 1;
40-
41-
const clientConfig = this.context.getConfig();
25+
public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
26+
if (this.isRetryable(details)) {
27+
const clientConfig = this.context.getConfig();
4228

43-
// Delay interval depends on current attempt - the more attempts we do
44-
// the longer the interval will be
45-
// TODO: Respect `Retry-After` header (PECO-729)
46-
const retryDelay = getRetryDelay(this.attempt, clientConfig);
47-
48-
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
49-
if (attemptsExceeded) {
50-
throw new RetryError(RetryErrorCode.AttemptsExceeded, response);
51-
}
29+
// Don't retry if overall retry timeout exceeded
30+
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
31+
if (timeoutExceeded) {
32+
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
33+
}
5234

53-
const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout;
54-
if (timeoutExceeded) {
55-
throw new RetryError(RetryErrorCode.TimeoutExceeded, response);
56-
}
35+
this.attempt += 1;
5736

58-
return { shouldRetry: true, retryAfter: retryDelay };
37+
// Don't retry if max attempts count reached
38+
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
39+
if (attemptsExceeded) {
40+
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
41+
}
5942

60-
// TODO: Here we should handle other error types (see PECO-730)
43+
// If possible, use `Retry-After` header as a floor for a backoff algorithm
44+
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
45+
const retryAfter = this.getBackoffDelay(
46+
this.attempt,
47+
retryAfterHeader ?? clientConfig.retryDelayMin,
48+
clientConfig.retryDelayMax,
49+
);
6150

62-
// no default
63-
}
51+
return { shouldRetry: true, retryAfter };
6452
}
6553

6654
return { shouldRetry: false };
6755
}
6856

69-
public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> {
57+
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
7058
for (;;) {
71-
const response = await operation(); // eslint-disable-line no-await-in-loop
72-
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop
59+
const details = await operation(); // eslint-disable-line no-await-in-loop
60+
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
7361
if (!status.shouldRetry) {
74-
return response;
62+
return details;
7563
}
7664
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
7765
}
7866
}
67+
68+
protected isRetryable({ response }: HttpTransactionDetails): boolean {
69+
const statusCode = response.status;
70+
71+
const result =
72+
// Retry on all codes below 100
73+
statusCode < 100 ||
74+
// ...and on `429 Too Many Requests`
75+
statusCode === 429 ||
76+
// ...and on all `5xx` codes except for `501 Not Implemented`
77+
(statusCode >= 500 && statusCode !== 501);
78+
79+
return result;
80+
}
81+
82+
protected getRetryAfterHeader({ response }: HttpTransactionDetails, delayMin: number): number | undefined {
83+
// `Retry-After` header may contain a date after which to retry, or delay seconds. We support only delay seconds.
84+
// Value from `Retry-After` header is used when:
85+
// 1. it's available and is non-empty
86+
// 2. it could be parsed as a number, and is greater than zero
87+
// 3. additionally, we clamp it to not be smaller than minimal retry delay
88+
const header = response.headers.get('Retry-After') || '';
89+
if (header !== '') {
90+
const value = Number(header);
91+
if (Number.isFinite(value) && value > 0) {
92+
return Math.max(delayMin, value);
93+
}
94+
}
95+
return undefined;
96+
}
97+
98+
protected getBackoffDelay(attempt: number, delayMin: number, delayMax: number): number {
99+
const value = 2 ** attempt * delayMin;
100+
return Math.min(value, delayMax);
101+
}
79102
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
2+
3+
export default class NullRetryPolicy<R> implements IRetryPolicy<R> {
4+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
5+
public async shouldRetry(details: R): Promise<ShouldRetryResult> {
6+
return { shouldRetry: false };
7+
}
8+
9+
public async invokeWithRetry(operation: RetryableOperation<R>): Promise<R> {
10+
// Just invoke the operation, don't attempt to retry it
11+
return operation();
12+
}
13+
}

lib/connection/connections/ThriftHttpConnection.ts

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
import { EventEmitter } from 'events';
88
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
9-
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
9+
import fetch, { RequestInit, HeadersInit, Request, Response, FetchError } from 'node-fetch';
1010
// @ts-expect-error TS7016: Could not find a declaration file for module
1111
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
1212
import IRetryPolicy from '../contracts/IRetryPolicy';
13+
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
14+
import NullRetryPolicy from './NullRetryPolicy';
1315

1416
export class THTTPException extends Thrift.TApplicationException {
1517
public readonly statusCode: unknown;
@@ -32,7 +34,7 @@ interface ThriftHttpConnectionOptions {
3234
url: string;
3335
transport?: TTransportType;
3436
protocol?: TProtocolConstructor;
35-
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
37+
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
3638
}
3739

3840
// This type describes a shape of internals of Thrift client object.
@@ -47,29 +49,56 @@ type ThriftClient = {
4749
[key: string]: (input: TProtocol, mtype: Thrift.MessageType, seqId: number) => void;
4850
};
4951

52+
const retryableThriftMethods = new Set([
53+
'GetOperationStatus',
54+
'CancelOperation',
55+
'CloseOperation',
56+
'GetResultSetMetadata',
57+
'CloseSession',
58+
'GetInfo',
59+
'GetTypeInfo',
60+
'GetCatalogs',
61+
'GetSchemas',
62+
'GetTables',
63+
'GetTableTypes',
64+
'GetColumns',
65+
'GetFunctions',
66+
'GetPrimaryKeys',
67+
'GetCrossReference',
68+
]);
69+
5070
export default class ThriftHttpConnection extends EventEmitter {
5171
private readonly url: string;
5272

5373
private config: RequestInit;
5474

75+
private options: ThriftHttpConnectionOptions;
76+
5577
// This field is used by Thrift internally, so name and type are important
5678
private readonly transport: TTransportType;
5779

5880
// This field is used by Thrift internally, so name and type are important
5981
private readonly protocol: TProtocolConstructor;
6082

61-
private readonly getRetryPolicy: () => Promise<IRetryPolicy<Response>>;
62-
6383
// thrift.createClient sets this field internally
6484
public client?: ThriftClient;
6585

6686
constructor(options: ThriftHttpConnectionOptions, config: RequestInit = {}) {
6787
super();
6888
this.url = options.url;
6989
this.config = config;
90+
this.options = options;
7091
this.transport = options.transport ?? TBufferedTransport;
7192
this.protocol = options.protocol ?? TBinaryProtocol;
72-
this.getRetryPolicy = options.getRetryPolicy;
93+
}
94+
95+
protected async getRetryPolicy(thriftMethodName?: string): Promise<IRetryPolicy<HttpTransactionDetails>> {
96+
// Allow retry behavior only for Thrift operations that are for sure safe to retry
97+
if (thriftMethodName && retryableThriftMethods.has(thriftMethodName)) {
98+
return this.options.getRetryPolicy();
99+
}
100+
// Don't retry everything that is not explicitly allowed to retry
101+
return new NullRetryPolicy();
73102
}
74103

75104
public setHeaders(headers: HeadersInit) {
@@ -92,12 +121,16 @@ export default class ThriftHttpConnection extends EventEmitter {
92121
body: data,
93122
};
94123

95-
this.getRetryPolicy()
124+
this.getThriftMethodName(data)
125+
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
96126
.then((retryPolicy) => {
97-
const makeRequest = () => fetch(this.url, requestConfig);
127+
const makeRequest = () => {
128+
const request = new Request(this.url, requestConfig);
129+
return fetch(request).then((response) => ({ request, response }));
130+
};
98131
return retryPolicy.invokeWithRetry(makeRequest);
99132
})
100-
.then((response) => {
133+
.then(({ response }) => {
101134
if (response.status !== 200) {
102135
throw new THTTPException(response);
103136
}
@@ -131,6 +164,23 @@ export default class ThriftHttpConnection extends EventEmitter {
131164
});
132165
}
133166

167+
private getThriftMethodName(thriftMessage: Buffer): Promise<string | undefined> {
168+
return new Promise((resolve) => {
169+
try {
170+
const receiver = this.transport.receiver((transportWithData) => {
171+
const Protocol = this.protocol;
172+
const proto = new Protocol(transportWithData);
173+
const header = proto.readMessageBegin();
174+
resolve(header.fname);
175+
}, 0 /* `seqId` could be any because it's ignored */);
176+
177+
receiver(thriftMessage);
178+
} catch {
179+
resolve(undefined);
180+
}
181+
});
182+
}
183+
134184
private handleThriftResponse(transportWithData: TTransport) {
135185
if (!this.client) {
136186
throw new Thrift.TApplicationException(Thrift.TApplicationExceptionType.INTERNAL_ERROR, 'Client not available');
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import http from 'http';
2-
import { HeadersInit, Response } from 'node-fetch';
2+
import { HeadersInit, Request, Response } from 'node-fetch';
33
import IRetryPolicy from './IRetryPolicy';
44

5+
export interface HttpTransactionDetails {
6+
request: Request;
7+
response: Response;
8+
}
9+
510
export default interface IConnectionProvider {
611
getThriftConnection(): Promise<any>;
712

813
getAgent(): Promise<http.Agent>;
914

1015
setHeaders(headers: HeadersInit): void;
1116

12-
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
17+
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
1318
}

lib/connection/contracts/IRetryPolicy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export type ShouldRetryResult =
1010
export type RetryableOperation<R> = () => Promise<R>;
1111

1212
export default interface IRetryPolicy<R> {
13-
shouldRetry(response: R): Promise<ShouldRetryResult>;
13+
shouldRetry(details: R): Promise<ShouldRetryResult>;
1414

1515
invokeWithRetry(operation: RetryableOperation<R>): Promise<R>;
1616
}

lib/hive/Commands/BaseCommand.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@ export default abstract class BaseCommand {
1919
return await this.invokeCommand<Response>(request, command);
2020
} catch (error) {
2121
if (error instanceof RetryError) {
22-
const statusCode = error.payload instanceof Response ? error.payload.status : undefined;
22+
let statusCode: number | undefined;
23+
if (
24+
error.payload &&
25+
typeof error.payload === 'object' &&
26+
'response' in error.payload &&
27+
error.payload.response instanceof Response
28+
) {
29+
statusCode = error.payload.response.status;
30+
}
2331

2432
switch (error.errorCode) {
2533
case RetryErrorCode.AttemptsExceeded:

lib/result/CloudFetchResultHandler.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import LZ4 from 'lz4';
2-
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
2+
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
33
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
44
import IClientContext from '../contracts/IClientContext';
55
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
@@ -77,6 +77,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
7777
const retryPolicy = await connectionProvider.getRetryPolicy();
7878

7979
const requestConfig: RequestInit = { agent, ...init };
80-
return retryPolicy.invokeWithRetry(() => fetch(url, requestConfig));
80+
const result = await retryPolicy.invokeWithRetry(() => {
81+
const request = new Request(url, requestConfig);
82+
return fetch(request).then((response) => ({ request, response }));
83+
});
84+
return result.response;
8185
}
8286
}

0 commit comments

Comments
 (0)