Skip to content

Commit f481f61

Browse files
[PECO-237] Direct results support (#51)
* Decompose DBSQLOperation: extract schema fetching Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Decompose DBSQLOperation: extract data fetching Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Decompose DBSQLOperation: extract status handling; light refactoring of previous changes Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Decompose DBSQLOperation: extract operation completion actions Signed-off-by: Levko Kravets <levko.ne@gmail.com> * [PECO-237] Add DirectResults support Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fix lint warnings Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update changelog Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add some more tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Cache status response when operation is finished or failed to avoid weird error on subsequent calls Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update CHANGELOG.md Co-authored-by: Jesse <jesse.whitehouse@databricks.com> Signed-off-by: Levko Kravets <levko.ne@gmail.com> * CR1 Signed-off-by: Levko Kravets <levko.ne@gmail.com> * CR2 Signed-off-by: Levko Kravets <levko.ne@gmail.com> Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 8bbf410 commit f481f61

15 files changed

+1270
-696
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 0.1.x (Unreleased)
44

5+
- Added support for DirectResults, which speeds up data fetches by reducing the number of server roundtrips when possible.
56
- `DBSQLOperation` interface simplified: `HiveUtils` were removed and replaced with new methods
67
`DBSQLOperation.fetchChunk`/`DBSQLOperation.fetchAll`. New API implements all necessary waiting
78
and data conversion routines internally
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { TOperationHandle, TStatusCode, TCloseOperationResp } from '../../thrift/TCLIService_types';
2+
import HiveDriver from '../hive/HiveDriver';
3+
import StatusFactory from '../factory/StatusFactory';
4+
import Status from '../dto/Status';
5+
6+
export default class CompleteOperationHelper {
7+
private driver: HiveDriver;
8+
9+
private operationHandle: TOperationHandle;
10+
11+
private statusFactory = new StatusFactory();
12+
13+
closed: boolean = false;
14+
15+
cancelled: boolean = false;
16+
17+
constructor(driver: HiveDriver, operationHandle: TOperationHandle, closeOperation?: TCloseOperationResp) {
18+
this.driver = driver;
19+
this.operationHandle = operationHandle;
20+
21+
if (closeOperation) {
22+
this.statusFactory.create(closeOperation.status);
23+
this.closed = true;
24+
}
25+
}
26+
27+
async cancel(): Promise<Status> {
28+
if (this.cancelled) {
29+
return this.statusFactory.create({
30+
statusCode: TStatusCode.SUCCESS_STATUS,
31+
});
32+
}
33+
34+
const response = await this.driver.cancelOperation({
35+
operationHandle: this.operationHandle,
36+
});
37+
const status = this.statusFactory.create(response.status);
38+
this.cancelled = true;
39+
return status;
40+
}
41+
42+
async close(): Promise<Status> {
43+
if (this.closed) {
44+
return this.statusFactory.create({
45+
statusCode: TStatusCode.SUCCESS_STATUS,
46+
});
47+
}
48+
49+
const response = await this.driver.closeOperation({
50+
operationHandle: this.operationHandle,
51+
});
52+
const status = this.statusFactory.create(response.status);
53+
this.closed = true;
54+
return status;
55+
}
56+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import {
2+
TColumn,
3+
TFetchOrientation,
4+
TFetchResultsResp,
5+
TOperationHandle,
6+
TRowSet,
7+
TStatus,
8+
} from '../../thrift/TCLIService_types';
9+
import { ColumnCode, FetchType, Int64 } from '../hive/Types';
10+
import HiveDriver from '../hive/HiveDriver';
11+
import StatusFactory from '../factory/StatusFactory';
12+
13+
function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean {
14+
if (response.hasMoreRows) {
15+
return true;
16+
}
17+
18+
const columns = response.results?.columns || [];
19+
20+
if (columns.length === 0) {
21+
return false;
22+
}
23+
24+
const column: TColumn = columns[0];
25+
26+
const columnValue =
27+
column[ColumnCode.binaryVal] ||
28+
column[ColumnCode.boolVal] ||
29+
column[ColumnCode.byteVal] ||
30+
column[ColumnCode.doubleVal] ||
31+
column[ColumnCode.i16Val] ||
32+
column[ColumnCode.i32Val] ||
33+
column[ColumnCode.i64Val] ||
34+
column[ColumnCode.stringVal];
35+
36+
return (columnValue?.values?.length || 0) > 0;
37+
}
38+
39+
export default class FetchResultsHelper {
40+
private driver: HiveDriver;
41+
42+
private operationHandle: TOperationHandle;
43+
44+
private fetchOrientation: TFetchOrientation = TFetchOrientation.FETCH_FIRST;
45+
46+
private statusFactory = new StatusFactory();
47+
48+
private prefetchedResults: TFetchResultsResp[] = [];
49+
50+
hasMoreRows: boolean = false;
51+
52+
constructor(
53+
driver: HiveDriver,
54+
operationHandle: TOperationHandle,
55+
prefetchedResults: Array<TFetchResultsResp | undefined> = [],
56+
) {
57+
this.driver = driver;
58+
this.operationHandle = operationHandle;
59+
prefetchedResults.forEach((item) => {
60+
if (item) {
61+
this.prefetchedResults.push(item);
62+
}
63+
});
64+
}
65+
66+
private assertStatus(responseStatus: TStatus): void {
67+
this.statusFactory.create(responseStatus);
68+
}
69+
70+
private processFetchResponse(response: TFetchResultsResp): TRowSet | undefined {
71+
this.assertStatus(response.status);
72+
this.fetchOrientation = TFetchOrientation.FETCH_NEXT;
73+
this.hasMoreRows = checkIfOperationHasMoreRows(response);
74+
return response.results;
75+
}
76+
77+
async fetch(maxRows: number) {
78+
const prefetchedResponse = this.prefetchedResults.shift();
79+
if (prefetchedResponse) {
80+
return this.processFetchResponse(prefetchedResponse);
81+
}
82+
return this.driver
83+
.fetchResults({
84+
operationHandle: this.operationHandle,
85+
orientation: this.fetchOrientation,
86+
maxRows: new Int64(maxRows),
87+
fetchType: FetchType.Data,
88+
})
89+
.then((response) => this.processFetchResponse(response));
90+
}
91+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { TOperationHandle, TOperationState, TGetOperationStatusResp } from '../../thrift/TCLIService_types';
2+
import HiveDriver from '../hive/HiveDriver';
3+
import StatusFactory from '../factory/StatusFactory';
4+
import { OperationStatusCallback } from '../contracts/IOperation';
5+
import OperationStateError from '../errors/OperationStateError';
6+
7+
export default class OperationStatusHelper {
8+
private driver: HiveDriver;
9+
10+
private operationHandle: TOperationHandle;
11+
12+
private statusFactory = new StatusFactory();
13+
14+
private state: number = TOperationState.INITIALIZED_STATE;
15+
16+
// Once operation is finished or fails - cache status response, because subsequent calls
17+
// to `getOperationStatus()` may fail with irrelevant errors, e.g. HTTP 404
18+
private operationStatus?: TGetOperationStatusResp;
19+
20+
hasResultSet: boolean = false;
21+
22+
constructor(driver: HiveDriver, operationHandle: TOperationHandle, operationStatus?: TGetOperationStatusResp) {
23+
this.driver = driver;
24+
this.operationHandle = operationHandle;
25+
this.hasResultSet = operationHandle.hasResultSet;
26+
27+
if (operationStatus) {
28+
this.processOperationStatusResponse(operationStatus);
29+
}
30+
}
31+
32+
private isInProgress(response: TGetOperationStatusResp) {
33+
switch (response.operationState) {
34+
case TOperationState.INITIALIZED_STATE:
35+
case TOperationState.RUNNING_STATE:
36+
return true;
37+
default:
38+
return false;
39+
}
40+
}
41+
42+
private processOperationStatusResponse(response: TGetOperationStatusResp) {
43+
this.statusFactory.create(response.status);
44+
45+
this.state = response.operationState ?? this.state;
46+
47+
if (typeof response.hasResultSet === 'boolean') {
48+
this.hasResultSet = response.hasResultSet;
49+
}
50+
51+
if (!this.isInProgress(response)) {
52+
this.operationStatus = response;
53+
}
54+
55+
return response;
56+
}
57+
58+
status(progress: boolean = false) {
59+
if (this.operationStatus) {
60+
return Promise.resolve(this.operationStatus);
61+
}
62+
return this.driver
63+
.getOperationStatus({
64+
operationHandle: this.operationHandle,
65+
getProgressUpdate: progress,
66+
})
67+
.then((response) => this.processOperationStatusResponse(response));
68+
}
69+
70+
private async isReady(progress?: boolean, callback?: OperationStatusCallback): Promise<boolean> {
71+
const response = await this.status(Boolean(progress));
72+
73+
if (callback) {
74+
await Promise.resolve(callback(response));
75+
}
76+
77+
switch (response.operationState) {
78+
case TOperationState.INITIALIZED_STATE:
79+
return false;
80+
case TOperationState.RUNNING_STATE:
81+
return false;
82+
case TOperationState.FINISHED_STATE:
83+
return true;
84+
case TOperationState.CANCELED_STATE:
85+
throw new OperationStateError('The operation was canceled by a client', response);
86+
case TOperationState.CLOSED_STATE:
87+
throw new OperationStateError('The operation was closed by a client', response);
88+
case TOperationState.ERROR_STATE:
89+
throw new OperationStateError('The operation failed due to an error', response);
90+
case TOperationState.PENDING_STATE:
91+
throw new OperationStateError('The operation is in a pending state', response);
92+
case TOperationState.TIMEDOUT_STATE:
93+
throw new OperationStateError('The operation is in a timed out state', response);
94+
case TOperationState.UKNOWN_STATE:
95+
default:
96+
throw new OperationStateError('The operation is in an unrecognized state', response);
97+
}
98+
}
99+
100+
async waitUntilReady(progress?: boolean, callback?: OperationStatusCallback): Promise<void> {
101+
if (this.state === TOperationState.FINISHED_STATE) {
102+
return;
103+
}
104+
const isReady = await this.isReady(progress, callback);
105+
if (!isReady) {
106+
return this.waitUntilReady(progress, callback);
107+
}
108+
}
109+
}

lib/DBSQLOperation/SchemaHelper.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { TOperationHandle, TGetResultSetMetadataResp } from '../../thrift/TCLIService_types';
2+
import HiveDriver from '../hive/HiveDriver';
3+
import StatusFactory from '../factory/StatusFactory';
4+
import { definedOrError } from '../utils';
5+
6+
export default class SchemaHelper {
7+
private driver: HiveDriver;
8+
9+
private operationHandle: TOperationHandle;
10+
11+
private statusFactory = new StatusFactory();
12+
13+
private metadata: TGetResultSetMetadataResp | null = null;
14+
15+
constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
16+
this.driver = driver;
17+
this.operationHandle = operationHandle;
18+
this.metadata = metadata || null;
19+
}
20+
21+
async fetch() {
22+
if (!this.metadata) {
23+
const metadata = await this.driver.getResultSetMetadata({
24+
operationHandle: this.operationHandle,
25+
});
26+
this.statusFactory.create(metadata.status);
27+
this.metadata = metadata;
28+
}
29+
30+
return definedOrError(this.metadata.schema);
31+
}
32+
}

lib/DBSQLOperation/checkIfOperationHasMoreRows.ts

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)