Skip to content

Commit 15e5fa4

Browse files
Add HTTP error handling (#127)
* Add error handling (#102) Signed-off-by: Levko Kravets <levko.ne@gmail.com> Signed-off-by: nithinkdb <nithin.krishnamurthi@databricks.com> * Refactoring: Move error handler to BaseCommand Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refactoring: refine error handling code Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add delay between retries + make it configurable through constants Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fix test name Signed-off-by: Levko Kravets <levko.ne@gmail.com> * CR1 Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: nithinkdb <nithin.krishnamurthi@databricks.com> Signed-off-by: Levko Kravets <levko.ne@gmail.com> Co-authored-by: Nithin Krishnamurthi <110488533+nithinkdb@users.noreply.github.com>
1 parent a4bc14c commit 15e5fa4

File tree

4 files changed

+323
-6
lines changed

4 files changed

+323
-6
lines changed

.eslintrc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
"no-param-reassign": "off",
1414
"no-bitwise": "off",
1515
"@typescript-eslint/no-throw-literal": "off",
16-
"no-restricted-syntax": "off"
16+
"no-restricted-syntax": "off",
17+
"no-case-declarations": "off"
1718
}
1819
}
1920
]

lib/globalConfig.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
interface GlobalConfig {
22
arrowEnabled?: boolean;
33
useArrowNativeTypes?: boolean;
4+
5+
retryMaxAttempts: number;
6+
retriesTimeout: number; // in milliseconds
7+
retryDelayMin: number; // in milliseconds
8+
retryDelayMax: number; // in milliseconds
49
}
510

611
export default {
712
arrowEnabled: true,
813
useArrowNativeTypes: true,
14+
15+
retryMaxAttempts: 30,
16+
retriesTimeout: 900 * 1000,
17+
retryDelayMin: 1 * 1000,
18+
retryDelayMax: 60 * 1000,
919
} satisfies GlobalConfig;

lib/hive/Commands/BaseCommand.ts

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
1+
import { Thrift } from 'thrift';
12
import TCLIService from '../../../thrift/TCLIService';
23
import HiveDriverError from '../../errors/HiveDriverError';
4+
import globalConfig from '../../globalConfig';
5+
6+
interface CommandExecutionInfo {
7+
startTime: number; // in milliseconds
8+
attempt: number;
9+
}
10+
11+
function getRetryDelay(attempt: number): number {
12+
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
13+
return Math.min(globalConfig.retryDelayMin * scale, globalConfig.retryDelayMax);
14+
}
15+
16+
function delay(milliseconds: number): Promise<void> {
17+
return new Promise<void>((resolve) => {
18+
setTimeout(() => resolve(), milliseconds);
19+
});
20+
}
321

422
export default abstract class BaseCommand {
523
protected client: TCLIService.Client;
@@ -8,13 +26,71 @@ export default abstract class BaseCommand {
826
this.client = client;
927
}
1028

11-
executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
12-
return new Promise((resolve, reject) => {
13-
if (typeof command !== 'function') {
14-
reject(new HiveDriverError('Hive driver: the operation does not exist, try to choose another Thrift file.'));
15-
return;
29+
protected executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
30+
return this.invokeWithErrorHandling<Response>(request, command, { startTime: Date.now(), attempt: 0 });
31+
}
32+
33+
private async invokeWithErrorHandling<Response>(
34+
request: object,
35+
command: Function | void,
36+
info: CommandExecutionInfo,
37+
): Promise<Response> {
38+
try {
39+
return await this.invokeCommand<Response>(request, command);
40+
} catch (error) {
41+
if (error instanceof Thrift.TApplicationException) {
42+
if ('statusCode' in error) {
43+
switch (error.statusCode) {
44+
// On these status codes it's safe to retry the request. However,
45+
// both error codes mean that server is overwhelmed or even down.
46+
// Therefore, we need to add some delay between attempts so
47+
// server can recover and more likely handle next request
48+
case 429: // Too Many Requests
49+
case 503: // Service Unavailable
50+
info.attempt += 1;
51+
52+
// Delay interval depends on current attempt - the more attempts we do
53+
// the longer the interval will be
54+
// TODO: Respect `Retry-After` header (PECO-729)
55+
const retryDelay = getRetryDelay(info.attempt);
56+
57+
const attemptsExceeded = info.attempt >= globalConfig.retryMaxAttempts;
58+
if (attemptsExceeded) {
59+
throw new HiveDriverError(
60+
`Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`,
61+
);
62+
}
63+
64+
const timeoutExceeded = Date.now() - info.startTime + retryDelay >= globalConfig.retriesTimeout;
65+
if (timeoutExceeded) {
66+
throw new HiveDriverError(
67+
`Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`,
68+
);
69+
}
70+
71+
await delay(retryDelay);
72+
return this.invokeWithErrorHandling(request, command, info);
73+
74+
// TODO: Here we should handle other error types (see PECO-730)
75+
76+
// no default
77+
}
78+
}
1679
}
1780

81+
// Re-throw error we didn't handle
82+
throw error;
83+
}
84+
}
85+
86+
private invokeCommand<Response>(request: object, command: Function | void): Promise<Response> {
87+
if (typeof command !== 'function') {
88+
return Promise.reject(
89+
new HiveDriverError('Hive driver: the operation does not exist, try to choose another Thrift file.'),
90+
);
91+
}
92+
93+
return new Promise((resolve, reject) => {
1894
try {
1995
command.call(this.client, request, (err: Error, response: Response) => {
2096
if (err) {
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
const { expect, AssertionError } = require('chai');
2+
const { Thrift } = require('thrift');
3+
const HiveDriverError = require('../../../../dist/errors/HiveDriverError').default;
4+
const BaseCommand = require('../../../../dist/hive/Commands/BaseCommand').default;
5+
const globalConfig = require('../../../../dist/globalConfig').default;
6+
7+
const savedGlobalConfig = { ...globalConfig };
8+
9+
class ThriftClientMock {
10+
constructor(methodHandler) {
11+
this.methodHandler = methodHandler;
12+
}
13+
14+
CustomMethod(request, callback) {
15+
try {
16+
const response = this.methodHandler();
17+
return callback(undefined, response !== undefined ? response : ThriftClientMock.defaultResponse);
18+
} catch (error) {
19+
return callback(error);
20+
}
21+
}
22+
}
23+
24+
ThriftClientMock.defaultResponse = {
25+
status: { statusCode: 0 },
26+
};
27+
28+
class CustomCommand extends BaseCommand {
29+
execute(request) {
30+
return this.executeCommand(request, this.client.CustomMethod);
31+
}
32+
}
33+
34+
describe('BaseCommand', () => {
35+
afterEach(() => {
36+
Object.assign(globalConfig, savedGlobalConfig);
37+
});
38+
39+
it('should fail if trying to invoke non-existing command', async () => {
40+
const command = new CustomCommand({});
41+
42+
try {
43+
await command.execute();
44+
expect.fail('It should throw an error');
45+
} catch (error) {
46+
if (error instanceof AssertionError) {
47+
throw error;
48+
}
49+
expect(error).to.be.instanceof(HiveDriverError);
50+
expect(error.message).to.contain('the operation does not exist');
51+
}
52+
});
53+
54+
it('should handle exceptions thrown by command', async () => {
55+
const errorMessage = 'Unexpected error';
56+
57+
const command = new CustomCommand({
58+
CustomMethod() {
59+
throw new Error(errorMessage);
60+
},
61+
});
62+
63+
try {
64+
await command.execute();
65+
expect.fail('It should throw an error');
66+
} catch (error) {
67+
if (error instanceof AssertionError) {
68+
throw error;
69+
}
70+
expect(error).to.be.instanceof(Error);
71+
expect(error.message).to.contain(errorMessage);
72+
}
73+
});
74+
75+
[429, 503].forEach((statusCode) => {
76+
describe(`HTTP ${statusCode} error`, () => {
77+
it('should fail on max retry attempts exceeded', async () => {
78+
globalConfig.retriesTimeout = 200; // ms
79+
globalConfig.retryDelayMin = 5; // ms
80+
globalConfig.retryDelayMax = 20; // ms
81+
globalConfig.retryMaxAttempts = 3;
82+
83+
let methodCallCount = 0;
84+
const command = new CustomCommand(
85+
new ThriftClientMock(() => {
86+
methodCallCount += 1;
87+
const error = new Thrift.TApplicationException();
88+
error.statusCode = statusCode;
89+
throw error;
90+
}),
91+
);
92+
93+
try {
94+
await command.execute();
95+
expect.fail('It should throw an error');
96+
} catch (error) {
97+
if (error instanceof AssertionError) {
98+
throw error;
99+
}
100+
expect(error).to.be.instanceof(HiveDriverError);
101+
expect(error.message).to.contain(`${statusCode} when connecting to resource`);
102+
expect(error.message).to.contain('Max retry count exceeded');
103+
expect(methodCallCount).to.equal(globalConfig.retryMaxAttempts);
104+
}
105+
});
106+
107+
it('should fail on retry timeout exceeded', async () => {
108+
globalConfig.retriesTimeout = 200; // ms
109+
globalConfig.retryDelayMin = 5; // ms
110+
globalConfig.retryDelayMax = 20; // ms
111+
globalConfig.retryMaxAttempts = 50;
112+
113+
let methodCallCount = 0;
114+
const command = new CustomCommand(
115+
new ThriftClientMock(() => {
116+
methodCallCount += 1;
117+
const error = new Thrift.TApplicationException();
118+
error.statusCode = statusCode;
119+
throw error;
120+
}),
121+
);
122+
123+
try {
124+
await command.execute();
125+
expect.fail('It should throw an error');
126+
} catch (error) {
127+
if (error instanceof AssertionError) {
128+
throw error;
129+
}
130+
expect(error).to.be.instanceof(HiveDriverError);
131+
expect(error.message).to.contain(`${statusCode} when connecting to resource`);
132+
expect(error.message).to.contain('Retry timeout exceeded');
133+
// We set pretty low intervals/timeouts to make this test pass faster, but it also means
134+
// that it's harder to predict how much times command will be invoked. So we check that
135+
// it is within some meaningful range to reduce false-negatives
136+
expect(methodCallCount).to.be.within(10, 20);
137+
}
138+
});
139+
140+
it('should succeed after few attempts', async () => {
141+
globalConfig.retriesTimeout = 200; // ms
142+
globalConfig.retryDelayMin = 5; // ms
143+
globalConfig.retryDelayMax = 20; // ms
144+
globalConfig.retryMaxAttempts = 5;
145+
146+
let methodCallCount = 0;
147+
const command = new CustomCommand(
148+
new ThriftClientMock(() => {
149+
methodCallCount += 1;
150+
if (methodCallCount <= 3) {
151+
const error = new Thrift.TApplicationException();
152+
error.statusCode = statusCode;
153+
throw error;
154+
}
155+
return ThriftClientMock.defaultResponse;
156+
}),
157+
);
158+
159+
const response = await command.execute();
160+
expect(response).to.deep.equal(ThriftClientMock.defaultResponse);
161+
expect(methodCallCount).to.equal(4); // 3 failed attempts + 1 succeeded
162+
});
163+
});
164+
});
165+
166+
it(`should re-throw unrecognized HTTP errors`, async () => {
167+
const errorMessage = 'Unrecognized HTTP error';
168+
169+
const command = new CustomCommand(
170+
new ThriftClientMock(() => {
171+
const error = new Thrift.TApplicationException(undefined, errorMessage);
172+
error.statusCode = 500;
173+
throw error;
174+
}),
175+
);
176+
177+
try {
178+
await command.execute();
179+
expect.fail('It should throw an error');
180+
} catch (error) {
181+
if (error instanceof AssertionError) {
182+
throw error;
183+
}
184+
expect(error).to.be.instanceof(Thrift.TApplicationException);
185+
expect(error.message).to.contain(errorMessage);
186+
}
187+
});
188+
189+
it(`should re-throw unrecognized Thrift errors`, async () => {
190+
const errorMessage = 'Unrecognized HTTP error';
191+
192+
const command = new CustomCommand(
193+
new ThriftClientMock(() => {
194+
throw new Thrift.TApplicationException(undefined, errorMessage);
195+
}),
196+
);
197+
198+
try {
199+
await command.execute();
200+
expect.fail('It should throw an error');
201+
} catch (error) {
202+
if (error instanceof AssertionError) {
203+
throw error;
204+
}
205+
expect(error).to.be.instanceof(Thrift.TApplicationException);
206+
expect(error.message).to.contain(errorMessage);
207+
}
208+
});
209+
210+
it(`should re-throw unrecognized errors`, async () => {
211+
const errorMessage = 'Unrecognized error';
212+
213+
const command = new CustomCommand(
214+
new ThriftClientMock(() => {
215+
throw new Error(errorMessage);
216+
}),
217+
);
218+
219+
try {
220+
await command.execute();
221+
expect.fail('It should throw an error');
222+
} catch (error) {
223+
if (error instanceof AssertionError) {
224+
throw error;
225+
}
226+
expect(error).to.be.instanceof(Error);
227+
expect(error.message).to.contain(errorMessage);
228+
}
229+
});
230+
});

0 commit comments

Comments
 (0)