Skip to content

Commit 42a68ef

Browse files
Properly handle operations closed within Direct Results routine (#134)
* Properly handle operations closed within directResults routine Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Improve tests (add more conditions) Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 80db5a2 commit 42a68ef

File tree

5 files changed

+73
-8
lines changed

5 files changed

+73
-8
lines changed

lib/DBSQLOperation/CompleteOperationHelper.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,16 @@ export default class CompleteOperationHelper {
77

88
private readonly operationHandle: TOperationHandle;
99

10+
private closeOperation?: TCloseOperationResp;
11+
1012
public closed: boolean = false;
1113

1214
public cancelled: boolean = false;
1315

1416
constructor(driver: HiveDriver, operationHandle: TOperationHandle, closeOperation?: TCloseOperationResp) {
1517
this.driver = driver;
1618
this.operationHandle = operationHandle;
17-
18-
if (closeOperation) {
19-
Status.assert(closeOperation.status);
20-
this.closed = true;
21-
}
19+
this.closeOperation = closeOperation;
2220
}
2321

2422
public async cancel(): Promise<Status> {
@@ -35,6 +33,11 @@ export default class CompleteOperationHelper {
3533
}
3634

3735
public async close(): Promise<Status> {
36+
if (!this.closed && this.closeOperation) {
37+
Status.assert(this.closeOperation.status);
38+
this.closed = true;
39+
}
40+
3841
if (this.closed) {
3942
return Status.success();
4043
}

lib/DBSQLOperation/FetchResultsHelper.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,15 @@ export default class FetchResultsHelper {
4444

4545
private prefetchedResults: TFetchResultsResp[] = [];
4646

47+
private readonly returnOnlyPrefetchedResults: boolean;
48+
4749
public hasMoreRows: boolean = false;
4850

4951
constructor(
5052
driver: HiveDriver,
5153
operationHandle: TOperationHandle,
5254
prefetchedResults: Array<TFetchResultsResp | undefined>,
55+
returnOnlyPrefetchedResults: boolean,
5356
) {
5457
this.driver = driver;
5558
this.operationHandle = operationHandle;
@@ -58,12 +61,21 @@ export default class FetchResultsHelper {
5861
this.prefetchedResults.push(item);
5962
}
6063
});
64+
this.returnOnlyPrefetchedResults = returnOnlyPrefetchedResults;
6165
}
6266

6367
private processFetchResponse(response: TFetchResultsResp): TRowSet | undefined {
6468
Status.assert(response.status);
6569
this.fetchOrientation = TFetchOrientation.FETCH_NEXT;
66-
this.hasMoreRows = checkIfOperationHasMoreRows(response);
70+
71+
if (this.prefetchedResults.length > 0) {
72+
this.hasMoreRows = true;
73+
} else if (this.returnOnlyPrefetchedResults) {
74+
this.hasMoreRows = false;
75+
} else {
76+
this.hasMoreRows = checkIfOperationHasMoreRows(response);
77+
}
78+
6779
return response.results;
6880
}
6981

lib/DBSQLOperation/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,17 @@ export default class DBSQLOperation implements IOperation {
4141
this.driver = driver;
4242
this.operationHandle = operationHandle;
4343
this.logger = logger;
44+
45+
const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation);
46+
4447
this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus);
4548
this._schema = new SchemaHelper(this.driver, this.operationHandle, directResults?.resultSetMetadata);
46-
this._data = new FetchResultsHelper(this.driver, this.operationHandle, [directResults?.resultSet]);
49+
this._data = new FetchResultsHelper(
50+
this.driver,
51+
this.operationHandle,
52+
[directResults?.resultSet],
53+
useOnlyPrefetchedResults,
54+
);
4755
this._completeOperation = new CompleteOperationHelper(
4856
this.driver,
4957
this.operationHandle,

tests/e2e/batched_fetch.test.js

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const { expect } = require('chai');
2+
const sinon = require('sinon');
23
const config = require('./utils/config');
34
const logger = require('./utils/logger')(config.logger);
45
const { DBSQLClient } = require('../..');
@@ -36,21 +37,61 @@ describe('Data fetching', () => {
3637

3738
it('fetch chunks should return a max row set of chunkSize', async () => {
3839
const session = await openSession();
40+
sinon.spy(session.driver, 'fetchResults');
3941
try {
42+
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
4043
const operation = await session.executeStatement(query, { runAsync: true, maxRows: null });
4144
let chunkedOp = await operation.fetchChunk({ maxRows: 10 }).catch((error) => logger(error));
4245
expect(chunkedOp.length).to.be.equal(10);
46+
// we explicitly requested only one chunk
47+
expect(session.driver.fetchResults.callCount).to.equal(1);
4348
} finally {
4449
await session.close();
4550
}
4651
});
4752

4853
it('fetch all should fetch all records', async () => {
4954
const session = await openSession();
55+
sinon.spy(session.driver, 'fetchResults');
5056
try {
57+
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
5158
const operation = await session.executeStatement(query, { runAsync: true, maxRows: null });
59+
let all = await operation.fetchAll({ maxRows: 200 });
60+
expect(all.length).to.be.equal(1000);
61+
// 1000/200 = 5 chunks + one extra request to ensure that there's no more data
62+
expect(session.driver.fetchResults.callCount).to.equal(6);
63+
} finally {
64+
await session.close();
65+
}
66+
});
67+
68+
it('should fetch all records if they fit within directResults response', async () => {
69+
const session = await openSession();
70+
sinon.spy(session.driver, 'fetchResults');
71+
try {
72+
// here `maxRows` enables direct results with limit of the first batch
73+
const operation = await session.executeStatement(query, { runAsync: true, maxRows: 1000 });
5274
let all = await operation.fetchAll();
5375
expect(all.length).to.be.equal(1000);
76+
// all the data returned immediately from direct results, so no additional requests
77+
expect(session.driver.fetchResults.callCount).to.equal(0);
78+
} finally {
79+
await session.close();
80+
}
81+
});
82+
83+
it('should fetch all records if only part of them fit within directResults response', async () => {
84+
const session = await openSession();
85+
sinon.spy(session.driver, 'fetchResults');
86+
try {
87+
// here `maxRows` enables direct results with limit of the first batch
88+
const operation = await session.executeStatement(query, { runAsync: true, maxRows: 200 });
89+
// here `maxRows` sets limit for `driver.fetchResults`
90+
let all = await operation.fetchAll({ maxRows: 200 });
91+
expect(all.length).to.be.equal(1000);
92+
// 1 chunk returned immediately from direct results + 4 remaining chunks + one extra chunk to ensure
93+
// that there's no more data
94+
expect(session.driver.fetchResults.callCount).to.equal(5);
5495
} finally {
5596
await session.close();
5697
}

tests/unit/DBSQLOperation.test.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,14 @@ describe('DBSQLOperation', () => {
379379
});
380380

381381
expect(operation._completeOperation.cancelled).to.be.false;
382-
expect(operation._completeOperation.closed).to.be.true;
382+
expect(operation._completeOperation.closed).to.be.false;
383383

384384
await operation.close();
385385

386386
expect(driver.closeOperation.called).to.be.false;
387387
expect(operation._completeOperation.cancelled).to.be.false;
388388
expect(operation._completeOperation.closed).to.be.true;
389+
expect(driver.closeOperation.callCount).to.be.equal(0);
389390
});
390391

391392
it('should throw an error in case of a status error and keep state', async () => {

0 commit comments

Comments
 (0)