Skip to content

Commit c239fca

Browse files
Chore (#255)
* Get rid of redundant `lib/hive/Types` Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Allow any number type (number, bigint, Int64) for `maxRows` and `queryTimeout` options Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Move some global constants to client config Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add a note about `queryTimeout` option Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Use a proper error class instead of a generic `Error` Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent fac3345 commit c239fca

File tree

14 files changed

+138
-130
lines changed

14 files changed

+138
-130
lines changed

lib/DBSQLClient.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import thrift from 'thrift';
2+
import Int64 from 'node-int64';
23

34
import { EventEmitter } from 'events';
45
import TCLIService from '../thrift/TCLIService';
@@ -7,7 +8,6 @@ import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } fr
78
import IDriver from './contracts/IDriver';
89
import IClientContext, { ClientConfig } from './contracts/IClientContext';
910
import HiveDriver from './hive/HiveDriver';
10-
import { Int64 } from './hive/Types';
1111
import DBSQLSession from './DBSQLSession';
1212
import IDBSQLSession from './contracts/IDBSQLSession';
1313
import IAuthentication from './connection/contracts/IAuthentication';
@@ -73,6 +73,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
7373

7474
private static getDefaultConfig(): ClientConfig {
7575
return {
76+
directResultsDefaultMaxRows: 100000,
77+
fetchChunkDefaultMaxRows: 100000,
78+
7679
arrowEnabled: true,
7780
useArrowNativeTypes: true,
7881
socketTimeout: 15 * 60 * 1000, // 15 minutes

lib/DBSQLOperation.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import { definedOrError } from './utils';
2929
import HiveDriverError from './errors/HiveDriverError';
3030
import IClientContext from './contracts/IClientContext';
3131

32-
const defaultMaxRows = 100000;
33-
3432
interface DBSQLOperationConstructorOptions {
3533
handle: TOperationHandle;
3634
directResults?: TSparkDirectResults;
@@ -164,8 +162,10 @@ export default class DBSQLOperation implements IOperation {
164162
setTimeout(resolve, 0);
165163
});
166164

165+
const defaultMaxRows = this.context.getConfig().fetchChunkDefaultMaxRows;
166+
167167
const result = resultHandler.fetchNext({
168-
limit: options?.maxRows || defaultMaxRows,
168+
limit: options?.maxRows ?? defaultMaxRows,
169169
disableBuffering: options?.disableBuffering,
170170
});
171171
await this.failIfClosed();
@@ -174,7 +174,7 @@ export default class DBSQLOperation implements IOperation {
174174
.getLogger()
175175
.log(
176176
LogLevel.debug,
177-
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.id}`,
177+
`Fetched chunk of size: ${options?.maxRows ?? defaultMaxRows} from operation with id: ${this.id}`,
178178
);
179179
return result;
180180
}

lib/DBSQLSession.ts

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as path from 'path';
33
import stream from 'node:stream';
44
import util from 'node:util';
55
import { stringify, NIL } from 'uuid';
6+
import Int64 from 'node-int64';
67
import fetch, { HeadersInit } from 'node-fetch';
78
import {
89
TSessionHandle,
@@ -12,7 +13,6 @@ import {
1213
TSparkArrowTypes,
1314
TSparkParameter,
1415
} from '../thrift/TCLIService_types';
15-
import { Int64 } from './hive/Types';
1616
import IDBSQLSession, {
1717
ExecuteStatementOptions,
1818
TypeInfoRequest,
@@ -41,22 +41,35 @@ import IClientContext, { ClientConfig } from './contracts/IClientContext';
4141
// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
4242
const pipeline = util.promisify(stream.pipeline);
4343

44-
const defaultMaxRows = 100000;
45-
4644
interface OperationResponseShape {
4745
status: TStatus;
4846
operationHandle?: TOperationHandle;
4947
directResults?: TSparkDirectResults;
5048
}
5149

52-
function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
50+
export function numberToInt64(value: number | bigint | Int64): Int64 {
51+
if (value instanceof Int64) {
52+
return value;
53+
}
54+
55+
if (typeof value === 'bigint') {
56+
const buffer = new ArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT);
57+
const view = new DataView(buffer);
58+
view.setBigInt64(0, value, false); // `false` to use big-endian order
59+
return new Int64(Buffer.from(buffer));
60+
}
61+
62+
return new Int64(value);
63+
}
64+
65+
function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undefined, config: ClientConfig) {
5366
if (maxRows === null) {
5467
return {};
5568
}
5669

5770
return {
5871
getDirectResults: {
59-
maxRows: new Int64(maxRows),
72+
maxRows: numberToInt64(maxRows ?? config.directResultsDefaultMaxRows),
6073
},
6174
};
6275
}
@@ -86,7 +99,6 @@ function getArrowOptions(config: ClientConfig): {
8699
}
87100

88101
function getQueryParameters(
89-
sessionHandle: TSessionHandle,
90102
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>,
91103
ordinalParameters?: Array<DBSQLParameter | DBSQLParameterValue>,
92104
): Array<TSparkParameter> {
@@ -184,12 +196,12 @@ export default class DBSQLSession implements IDBSQLSession {
184196
const operationPromise = driver.executeStatement({
185197
sessionHandle: this.sessionHandle,
186198
statement,
187-
queryTimeout: options.queryTimeout,
199+
queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined,
188200
runAsync: true,
189-
...getDirectResultsOptions(options.maxRows),
201+
...getDirectResultsOptions(options.maxRows, clientConfig),
190202
...getArrowOptions(clientConfig),
191203
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
192-
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
204+
parameters: getQueryParameters(options.namedParameters, options.ordinalParameters),
193205
canDecompressLZ4Result: clientConfig.useLZ4Compression && Boolean(LZ4),
194206
});
195207
const response = await this.handleResponse(operationPromise);
@@ -339,10 +351,11 @@ export default class DBSQLSession implements IDBSQLSession {
339351
public async getTypeInfo(request: TypeInfoRequest = {}): Promise<IOperation> {
340352
await this.failIfClosed();
341353
const driver = await this.context.getDriver();
354+
const clientConfig = this.context.getConfig();
342355
const operationPromise = driver.getTypeInfo({
343356
sessionHandle: this.sessionHandle,
344357
runAsync: true,
345-
...getDirectResultsOptions(request.maxRows),
358+
...getDirectResultsOptions(request.maxRows, clientConfig),
346359
});
347360
const response = await this.handleResponse(operationPromise);
348361
return this.createOperation(response);
@@ -357,10 +370,11 @@ export default class DBSQLSession implements IDBSQLSession {
357370
public async getCatalogs(request: CatalogsRequest = {}): Promise<IOperation> {
358371
await this.failIfClosed();
359372
const driver = await this.context.getDriver();
373+
const clientConfig = this.context.getConfig();
360374
const operationPromise = driver.getCatalogs({
361375
sessionHandle: this.sessionHandle,
362376
runAsync: true,
363-
...getDirectResultsOptions(request.maxRows),
377+
...getDirectResultsOptions(request.maxRows, clientConfig),
364378
});
365379
const response = await this.handleResponse(operationPromise);
366380
return this.createOperation(response);
@@ -375,12 +389,13 @@ export default class DBSQLSession implements IDBSQLSession {
375389
public async getSchemas(request: SchemasRequest = {}): Promise<IOperation> {
376390
await this.failIfClosed();
377391
const driver = await this.context.getDriver();
392+
const clientConfig = this.context.getConfig();
378393
const operationPromise = driver.getSchemas({
379394
sessionHandle: this.sessionHandle,
380395
catalogName: request.catalogName,
381396
schemaName: request.schemaName,
382397
runAsync: true,
383-
...getDirectResultsOptions(request.maxRows),
398+
...getDirectResultsOptions(request.maxRows, clientConfig),
384399
});
385400
const response = await this.handleResponse(operationPromise);
386401
return this.createOperation(response);
@@ -395,14 +410,15 @@ export default class DBSQLSession implements IDBSQLSession {
395410
public async getTables(request: TablesRequest = {}): Promise<IOperation> {
396411
await this.failIfClosed();
397412
const driver = await this.context.getDriver();
413+
const clientConfig = this.context.getConfig();
398414
const operationPromise = driver.getTables({
399415
sessionHandle: this.sessionHandle,
400416
catalogName: request.catalogName,
401417
schemaName: request.schemaName,
402418
tableName: request.tableName,
403419
tableTypes: request.tableTypes,
404420
runAsync: true,
405-
...getDirectResultsOptions(request.maxRows),
421+
...getDirectResultsOptions(request.maxRows, clientConfig),
406422
});
407423
const response = await this.handleResponse(operationPromise);
408424
return this.createOperation(response);
@@ -417,10 +433,11 @@ export default class DBSQLSession implements IDBSQLSession {
417433
public async getTableTypes(request: TableTypesRequest = {}): Promise<IOperation> {
418434
await this.failIfClosed();
419435
const driver = await this.context.getDriver();
436+
const clientConfig = this.context.getConfig();
420437
const operationPromise = driver.getTableTypes({
421438
sessionHandle: this.sessionHandle,
422439
runAsync: true,
423-
...getDirectResultsOptions(request.maxRows),
440+
...getDirectResultsOptions(request.maxRows, clientConfig),
424441
});
425442
const response = await this.handleResponse(operationPromise);
426443
return this.createOperation(response);
@@ -435,14 +452,15 @@ export default class DBSQLSession implements IDBSQLSession {
435452
public async getColumns(request: ColumnsRequest = {}): Promise<IOperation> {
436453
await this.failIfClosed();
437454
const driver = await this.context.getDriver();
455+
const clientConfig = this.context.getConfig();
438456
const operationPromise = driver.getColumns({
439457
sessionHandle: this.sessionHandle,
440458
catalogName: request.catalogName,
441459
schemaName: request.schemaName,
442460
tableName: request.tableName,
443461
columnName: request.columnName,
444462
runAsync: true,
445-
...getDirectResultsOptions(request.maxRows),
463+
...getDirectResultsOptions(request.maxRows, clientConfig),
446464
});
447465
const response = await this.handleResponse(operationPromise);
448466
return this.createOperation(response);
@@ -457,13 +475,14 @@ export default class DBSQLSession implements IDBSQLSession {
457475
public async getFunctions(request: FunctionsRequest): Promise<IOperation> {
458476
await this.failIfClosed();
459477
const driver = await this.context.getDriver();
478+
const clientConfig = this.context.getConfig();
460479
const operationPromise = driver.getFunctions({
461480
sessionHandle: this.sessionHandle,
462481
catalogName: request.catalogName,
463482
schemaName: request.schemaName,
464483
functionName: request.functionName,
465484
runAsync: true,
466-
...getDirectResultsOptions(request.maxRows),
485+
...getDirectResultsOptions(request.maxRows, clientConfig),
467486
});
468487
const response = await this.handleResponse(operationPromise);
469488
return this.createOperation(response);
@@ -472,13 +491,14 @@ export default class DBSQLSession implements IDBSQLSession {
472491
public async getPrimaryKeys(request: PrimaryKeysRequest): Promise<IOperation> {
473492
await this.failIfClosed();
474493
const driver = await this.context.getDriver();
494+
const clientConfig = this.context.getConfig();
475495
const operationPromise = driver.getPrimaryKeys({
476496
sessionHandle: this.sessionHandle,
477497
catalogName: request.catalogName,
478498
schemaName: request.schemaName,
479499
tableName: request.tableName,
480500
runAsync: true,
481-
...getDirectResultsOptions(request.maxRows),
501+
...getDirectResultsOptions(request.maxRows, clientConfig),
482502
});
483503
const response = await this.handleResponse(operationPromise);
484504
return this.createOperation(response);
@@ -493,6 +513,7 @@ export default class DBSQLSession implements IDBSQLSession {
493513
public async getCrossReference(request: CrossReferenceRequest): Promise<IOperation> {
494514
await this.failIfClosed();
495515
const driver = await this.context.getDriver();
516+
const clientConfig = this.context.getConfig();
496517
const operationPromise = driver.getCrossReference({
497518
sessionHandle: this.sessionHandle,
498519
parentCatalogName: request.parentCatalogName,
@@ -502,7 +523,7 @@ export default class DBSQLSession implements IDBSQLSession {
502523
foreignSchemaName: request.foreignSchemaName,
503524
foreignTableName: request.foreignTableName,
504525
runAsync: true,
505-
...getDirectResultsOptions(request.maxRows),
526+
...getDirectResultsOptions(request.maxRows, clientConfig),
506527
});
507528
const response = await this.handleResponse(operationPromise);
508529
return this.createOperation(response);

lib/connection/auth/DatabricksOAuth/AuthorizationCode.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import open from 'open';
44
import { LogLevel } from '../../../contracts/IDBSQLLogger';
55
import { OAuthScopes, scopeDelimiter } from './OAuthScope';
66
import IClientContext from '../../../contracts/IClientContext';
7+
import AuthenticationError from '../../../errors/AuthenticationError';
78

89
export interface AuthorizationCodeOptions {
910
client: BaseClient;
@@ -113,9 +114,9 @@ export default class AuthorizationCode {
113114
if (!receivedParams || !receivedParams.code) {
114115
if (receivedParams?.error) {
115116
const errorMessage = `OAuth error: ${receivedParams.error} ${receivedParams.error_description}`;
116-
throw new Error(errorMessage);
117+
throw new AuthenticationError(errorMessage);
117118
}
118-
throw new Error(`No path parameters were returned to the callback at ${redirectUri}`);
119+
throw new AuthenticationError(`No path parameters were returned to the callback at ${redirectUri}`);
119120
}
120121

121122
return { code: receivedParams.code, verifier: verifierString, redirectUri };
@@ -152,7 +153,7 @@ export default class AuthorizationCode {
152153
}
153154
}
154155

155-
throw new Error('Failed to start server: all ports are in use');
156+
throw new AuthenticationError('Failed to start server: all ports are in use');
156157
}
157158

158159
private renderCallbackResponse(): string {

lib/connection/auth/DatabricksOAuth/OAuthManager.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import http from 'http';
22
import { Issuer, BaseClient, custom } from 'openid-client';
3-
import HiveDriverError from '../../../errors/HiveDriverError';
3+
import AuthenticationError from '../../../errors/AuthenticationError';
44
import { LogLevel } from '../../../contracts/IDBSQLLogger';
55
import OAuthToken from './OAuthToken';
66
import AuthorizationCode from './AuthorizationCode';
@@ -104,7 +104,7 @@ export default abstract class OAuthManager {
104104
if (!token.refreshToken) {
105105
const message = `OAuth access token expired on ${token.expirationTime}.`;
106106
this.context.getLogger().log(LogLevel.error, message);
107-
throw new HiveDriverError(message);
107+
throw new AuthenticationError(message);
108108
}
109109

110110
// Try to refresh using the refresh token
@@ -115,7 +115,7 @@ export default abstract class OAuthManager {
115115
const client = await this.getClient();
116116
const { access_token: accessToken, refresh_token: refreshToken } = await client.refresh(token.refreshToken);
117117
if (!accessToken || !refreshToken) {
118-
throw new Error('Failed to refresh token: invalid response');
118+
throw new AuthenticationError('Failed to refresh token: invalid response');
119119
}
120120
return new OAuthToken(accessToken, refreshToken, token.scopes);
121121
}
@@ -165,7 +165,7 @@ export default abstract class OAuthManager {
165165
});
166166

167167
if (!accessToken) {
168-
throw new Error('Failed to fetch access token');
168+
throw new AuthenticationError('Failed to fetch access token');
169169
}
170170
return new OAuthToken(accessToken, refreshToken, mappedScopes);
171171
}
@@ -185,7 +185,7 @@ export default abstract class OAuthManager {
185185
});
186186

187187
if (!accessToken) {
188-
throw new Error('Failed to fetch access token');
188+
throw new AuthenticationError('Failed to fetch access token');
189189
}
190190
return new OAuthToken(accessToken, undefined, mappedScopes);
191191
}
@@ -234,7 +234,7 @@ export default abstract class OAuthManager {
234234
}
235235
}
236236

237-
throw new Error(`OAuth is not supported for ${options.host}`);
237+
throw new AuthenticationError(`OAuth is not supported for ${options.host}`);
238238
}
239239
}
240240

lib/contracts/IClientContext.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import IConnectionProvider from '../connection/contracts/IConnectionProvider';
44
import TCLIService from '../../thrift/TCLIService';
55

66
export interface ClientConfig {
7+
directResultsDefaultMaxRows: number;
8+
fetchChunkDefaultMaxRows: number;
9+
710
arrowEnabled?: boolean;
811
useArrowNativeTypes?: boolean;
912
socketTimeout: number;

0 commit comments

Comments
 (0)