Skip to content

Commit dd2c4c0

Browse files
[PECO-219] Enable chunked fetch (#46)
* Added fetchAll and fetchChunk * Undid changes, edited wrong file * Added fetchAll, fetchChunk, added maxRowSize for fetch * Added changes and preliminary test * Added wait functionality and updated test * Implemented base functionality and added tests * Cleaned up imports * Ran linter * Some refinements Co-authored-by: Levko Kravets <levko.ne@gmail.com>
1 parent bdfc9a0 commit dd2c4c0

File tree

3 files changed

+120
-19
lines changed

3 files changed

+120
-19
lines changed

lib/DBSQLOperation.ts

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import { ColumnCode, Int64 } from './hive/Types';
1414
import Status from './dto/Status';
1515
import StatusFactory from './factory/StatusFactory';
1616
import { definedOrError } from './utils';
17+
import OperationStateError from './errors/OperationStateError';
18+
import GetResult from './utils/GetResult';
1719

1820
export default class DBSQLOperation implements IOperation {
1921
private driver: HiveDriver;
@@ -22,7 +24,6 @@ export default class DBSQLOperation implements IOperation {
2224
private data: Array<TRowSet>;
2325
private statusFactory: StatusFactory;
2426

25-
private maxRows: Int64 = new Int64(100000);
2627
private fetchType: number = 0;
2728

2829
private _hasMoreRows: boolean = false;
@@ -40,11 +41,22 @@ export default class DBSQLOperation implements IOperation {
4041
this.data = [];
4142
}
4243

44+
private async waitUntilReady(): Promise<void> {
45+
if (this.finished()) {
46+
return;
47+
}
48+
if (await this.isReady()) {
49+
return;
50+
} else {
51+
return this.waitUntilReady();
52+
}
53+
}
54+
4355
/**
4456
* Fetches result and schema from operation
4557
* @throws {StatusError}
4658
*/
47-
fetch(): Promise<Status> {
59+
fetch(chunkSize = 100000): Promise<Status> {
4860
if (!this.hasResultSet) {
4961
return Promise.resolve(
5062
this.statusFactory.create({
@@ -65,13 +77,37 @@ export default class DBSQLOperation implements IOperation {
6577
return this.initializeSchema()
6678
.then((schema) => {
6779
this.schema = schema;
68-
69-
return this.firstFetch();
80+
return this.firstFetch(chunkSize);
7081
})
7182
.then((response) => this.processFetchResponse(response));
7283
} else {
73-
return this.nextFetch().then((response) => this.processFetchResponse(response));
84+
return this.nextFetch(chunkSize).then((response) => this.processFetchResponse(response));
85+
}
86+
}
87+
88+
async fetchAll(): Promise<Array<object>> {
89+
let data: Array<object> = [];
90+
do {
91+
let chunk = await this.fetchChunk();
92+
if (chunk) {
93+
data.push(...chunk);
94+
}
95+
} while (this.hasMoreRows());
96+
return data;
97+
}
98+
99+
async fetchChunk(chunkSize = 100000): Promise<Array<object>> {
100+
if (!this.hasResultSet) {
101+
return Promise.resolve([]);
74102
}
103+
104+
await this.waitUntilReady();
105+
106+
return await this.fetch(chunkSize).then(() => {
107+
let data = new GetResult(this).execute().getValue();
108+
this.flush();
109+
return Promise.resolve(data);
110+
});
75111
}
76112

77113
/**
@@ -134,10 +170,6 @@ export default class DBSQLOperation implements IOperation {
134170
return this._hasMoreRows;
135171
}
136172

137-
setMaxRows(maxRows: number): void {
138-
this.maxRows = new Int64(maxRows);
139-
}
140-
141173
setFetchType(fetchType: number): void {
142174
this.fetchType = fetchType;
143175
}
@@ -174,20 +206,20 @@ export default class DBSQLOperation implements IOperation {
174206
});
175207
}
176208

177-
private firstFetch() {
209+
private firstFetch(chunkSize: number) {
178210
return this.driver.fetchResults({
179211
operationHandle: this.operationHandle,
180212
orientation: TFetchOrientation.FETCH_FIRST,
181-
maxRows: this.maxRows,
213+
maxRows: new Int64(chunkSize),
182214
fetchType: this.fetchType,
183215
});
184216
}
185217

186-
private nextFetch() {
218+
private nextFetch(chunkSize: number) {
187219
return this.driver.fetchResults({
188220
operationHandle: this.operationHandle,
189221
orientation: TFetchOrientation.FETCH_NEXT,
190-
maxRows: this.maxRows,
222+
maxRows: new Int64(chunkSize),
191223
fetchType: this.fetchType,
192224
});
193225
}
@@ -233,4 +265,29 @@ export default class DBSQLOperation implements IOperation {
233265

234266
return (columnValue?.values?.length || 0) > 0;
235267
}
268+
269+
private async isReady(): Promise<boolean> {
270+
let response = await this.status();
271+
switch (response.operationState) {
272+
case TOperationState.INITIALIZED_STATE:
273+
return false;
274+
case TOperationState.RUNNING_STATE:
275+
return false;
276+
case TOperationState.FINISHED_STATE:
277+
return true;
278+
case TOperationState.CANCELED_STATE:
279+
throw new OperationStateError('The operation was canceled by a client', response);
280+
case TOperationState.CLOSED_STATE:
281+
throw new OperationStateError('The operation was closed by a client', response);
282+
case TOperationState.ERROR_STATE:
283+
throw new OperationStateError('The operation failed due to an error', response);
284+
case TOperationState.PENDING_STATE:
285+
throw new OperationStateError('The operation is in a pending state', response);
286+
case TOperationState.TIMEDOUT_STATE:
287+
throw new OperationStateError('The operation is in a timedout state', response);
288+
case TOperationState.UKNOWN_STATE:
289+
default:
290+
throw new OperationStateError('The operation is in an unrecognized state', response);
291+
}
292+
}
236293
}

lib/contracts/IOperation.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export default interface IOperation {
55
/**
66
* Fetch schema and a portion of data
77
*/
8-
fetch(): Promise<Status>;
8+
fetch(chunkSize?: number): Promise<Status>;
99

1010
/**
1111
* Request status of operation
@@ -34,11 +34,6 @@ export default interface IOperation {
3434
*/
3535
hasMoreRows(): boolean;
3636

37-
/**
38-
* Set the max fetch size
39-
*/
40-
setMaxRows(maxRows: number): void;
41-
4237
/**
4338
* Return retrieved schema
4439
*/

tests/e2e/batched_fetch.test.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const { expect } = require('chai');
2+
const config = require('./utils/config');
3+
const logger = require('./utils/logger')(config.logger);
4+
const { DBSQLClient, thrift } = require('../..');
5+
6+
const openSession = async () => {
7+
const client = new DBSQLClient();
8+
9+
const connection = await client.connect({
10+
host: config.host,
11+
path: config.path,
12+
token: config.token,
13+
});
14+
15+
const openSessionRequest = {};
16+
17+
if (config.database.length === 2) {
18+
openSessionRequest.initialNamespace = {
19+
catalogName: config.database[0],
20+
schemaName: config.database[1],
21+
};
22+
}
23+
24+
const session = await connection.openSession(openSessionRequest);
25+
26+
return session;
27+
};
28+
29+
describe('Data fetching', () => {
30+
const query = `
31+
SELECT *
32+
FROM range(0, 1000) AS t1
33+
LEFT JOIN (SELECT 1) AS t2
34+
`;
35+
36+
it('fetch chunks should return a max row set of chunkSize', async () => {
37+
const session = await openSession();
38+
const operation = await session.executeStatement(query, { runAsync: true });
39+
let chunkedOp = await operation.fetchChunk(10).catch((error) => logger(error));
40+
expect(chunkedOp.length).to.be.equal(10);
41+
});
42+
43+
it('fetch all should fetch all records', async () => {
44+
const session = await openSession();
45+
const operation = await session.executeStatement(query, { runAsync: true });
46+
let all = await operation.fetchAll();
47+
expect(all.length).to.be.equal(1000);
48+
});
49+
});

0 commit comments

Comments
 (0)