Skip to content

Commit fb817b5

Browse files
Iterable interface for IOperation (#252)
* Iterable interface for IOperation Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Chore: split `utils` unit tests into few files Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add visibility modifiers Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fixes after merge Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fix import Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent c239fca commit fb817b5

File tree

7 files changed

+484
-93
lines changed

7 files changed

+484
-93
lines changed

lib/DBSQLOperation.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import IOperation, {
44
FinishedOptions,
55
GetSchemaOptions,
66
WaitUntilReadyOptions,
7+
IteratorOptions,
8+
IOperationChunksIterator,
9+
IOperationRowsIterator,
710
} from './contracts/IOperation';
811
import {
912
TGetOperationStatusResp,
@@ -26,6 +29,7 @@ import CloudFetchResultHandler from './result/CloudFetchResultHandler';
2629
import ArrowResultConverter from './result/ArrowResultConverter';
2730
import ResultSlicer from './result/ResultSlicer';
2831
import { definedOrError } from './utils';
32+
import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator';
2933
import HiveDriverError from './errors/HiveDriverError';
3034
import IClientContext from './contracts/IClientContext';
3135

@@ -89,6 +93,14 @@ export default class DBSQLOperation implements IOperation {
8993
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`);
9094
}
9195

96+
public iterateChunks(options?: IteratorOptions): IOperationChunksIterator {
97+
return new OperationChunksIterator(this, options);
98+
}
99+
100+
public iterateRows(options?: IteratorOptions): IOperationRowsIterator {
101+
return new OperationRowsIterator(this, options);
102+
}
103+
92104
public get id() {
93105
const operationId = this.operationHandle?.operationId?.guid;
94106
return operationId ? stringify(operationId) : NIL;

lib/contracts/IOperation.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ export interface GetSchemaOptions extends WaitUntilReadyOptions {
2323
// no other options
2424
}
2525

26+
export interface IteratorOptions extends FetchOptions {
27+
autoClose?: boolean; // defaults to `false`
28+
}
29+
30+
export interface IOperationChunksIterator extends AsyncIterableIterator<Array<object>> {
31+
readonly operation: IOperation;
32+
}
33+
34+
export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
35+
readonly operation: IOperation;
36+
}
37+
2638
export default interface IOperation {
2739
/**
2840
* Operation identifier
@@ -70,4 +82,8 @@ export default interface IOperation {
7082
* Fetch schema
7183
*/
7284
getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null>;
85+
86+
iterateChunks(options?: IteratorOptions): IOperationChunksIterator;
87+
88+
iterateRows(options?: IteratorOptions): IOperationRowsIterator;
7389
}

lib/utils/OperationIterator.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import IOperation, { IOperationChunksIterator, IOperationRowsIterator, IteratorOptions } from '../contracts/IOperation';
2+
3+
abstract class OperationIterator<R> implements AsyncIterableIterator<R> {
4+
public readonly operation: IOperation;
5+
6+
protected readonly options?: IteratorOptions;
7+
8+
constructor(operation: IOperation, options?: IteratorOptions) {
9+
this.operation = operation;
10+
this.options = options;
11+
}
12+
13+
protected abstract getNext(): Promise<IteratorResult<R>>;
14+
15+
public [Symbol.asyncIterator]() {
16+
return this;
17+
}
18+
19+
public async next() {
20+
const result = await this.getNext();
21+
22+
if (result.done && this.options?.autoClose) {
23+
await this.operation.close();
24+
}
25+
26+
return result;
27+
}
28+
29+
// This method is intended for a cleanup when the caller does not intend to make any more
30+
// reads from iterator (e.g. when using `break` in a `for ... of` loop)
31+
public async return(value?: any) {
32+
if (this.options?.autoClose) {
33+
await this.operation.close();
34+
}
35+
36+
return { done: true, value };
37+
}
38+
}
39+
40+
export class OperationChunksIterator extends OperationIterator<Array<object>> implements IOperationChunksIterator {
41+
protected async getNext(): Promise<IteratorResult<Array<object>>> {
42+
const hasMoreRows = await this.operation.hasMoreRows();
43+
if (hasMoreRows) {
44+
const value = await this.operation.fetchChunk(this.options);
45+
return { done: false, value };
46+
}
47+
48+
return { done: true, value: undefined };
49+
}
50+
}
51+
52+
export class OperationRowsIterator extends OperationIterator<object> implements IOperationRowsIterator {
53+
private chunk: Array<object> = [];
54+
55+
private index: number = 0;
56+
57+
constructor(operation: IOperation, options?: IteratorOptions) {
58+
super(operation, {
59+
...options,
60+
// Tell slicer to return raw chunks. We're going to process rows one by one anyway,
61+
// so no need to additionally buffer and slice chunks returned by server
62+
disableBuffering: true,
63+
});
64+
}
65+
66+
protected async getNext(): Promise<IteratorResult<object>> {
67+
if (this.index < this.chunk.length) {
68+
const value = this.chunk[this.index];
69+
this.index += 1;
70+
return { done: false, value };
71+
}
72+
73+
const hasMoreRows = await this.operation.hasMoreRows();
74+
if (hasMoreRows) {
75+
this.chunk = await this.operation.fetchChunk(this.options);
76+
this.index = 0;
77+
// Note: this call is not really a recursion. Since this method is
78+
// async - the call will be actually scheduled for processing on
79+
// the next event loop cycle
80+
return this.getNext();
81+
}
82+
83+
return { done: true, value: undefined };
84+
}
85+
}

tests/e2e/iterators.test.js

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
const { expect } = require('chai');
2+
const sinon = require('sinon');
3+
const config = require('./utils/config');
4+
const { DBSQLClient } = require('../../lib');
5+
6+
async function openSession(customConfig) {
7+
const client = new DBSQLClient();
8+
9+
const clientConfig = client.getConfig();
10+
sinon.stub(client, 'getConfig').returns({
11+
...clientConfig,
12+
...customConfig,
13+
});
14+
15+
const connection = await client.connect({
16+
host: config.host,
17+
path: config.path,
18+
token: config.token,
19+
});
20+
21+
return connection.openSession({
22+
initialCatalog: config.database[0],
23+
initialSchema: config.database[1],
24+
});
25+
}
26+
27+
function arrayChunks(arr, chunkSize) {
28+
const result = [];
29+
30+
while (arr.length > 0) {
31+
const chunk = arr.splice(0, chunkSize);
32+
result.push(chunk);
33+
}
34+
35+
return result;
36+
}
37+
38+
describe('Iterators', () => {
39+
it('should iterate over all chunks', async () => {
40+
const session = await openSession({ arrowEnabled: false });
41+
sinon.spy(session.context.driver, 'fetchResults');
42+
try {
43+
const expectedRowsCount = 10;
44+
45+
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
46+
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, {
47+
maxRows: null,
48+
});
49+
50+
const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
51+
const chunkSize = 4;
52+
const expectedChunks = arrayChunks(expectedRows, chunkSize);
53+
54+
let index = 0;
55+
for await (const chunk of operation.iterateChunks({ maxRows: chunkSize })) {
56+
expect(chunk).to.deep.equal(expectedChunks[index]);
57+
index += 1;
58+
}
59+
60+
expect(index).to.equal(expectedChunks.length);
61+
} finally {
62+
await session.close();
63+
}
64+
});
65+
66+
it('should iterate over all rows', async () => {
67+
const session = await openSession({ arrowEnabled: false });
68+
sinon.spy(session.context.driver, 'fetchResults');
69+
try {
70+
const expectedRowsCount = 10;
71+
72+
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`);
73+
74+
const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
75+
76+
let index = 0;
77+
for await (const row of operation.iterateRows()) {
78+
expect(row).to.deep.equal(expectedRows[index]);
79+
index += 1;
80+
}
81+
82+
expect(index).to.equal(expectedRows.length);
83+
} finally {
84+
await session.close();
85+
}
86+
});
87+
});

tests/unit/utils.test.js renamed to tests/unit/utils/CloseableCollection.test.js

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,5 @@
11
const { expect, AssertionError } = require('chai');
2-
3-
const { buildUserAgentString, definedOrError, formatProgress, ProgressUpdateTransformer } = require('../../lib/utils');
4-
const CloseableCollection = require('../../lib/utils/CloseableCollection').default;
5-
6-
describe('buildUserAgentString', () => {
7-
// It should follow https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 and
8-
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
9-
//
10-
// UserAgent ::= <ProductName> '/' <ProductVersion> '(' <Comment> ')'
11-
// ProductName ::= 'NodejsDatabricksSqlConnector'
12-
// <Comment> ::= [ <ClientId> ';' ] 'Node.js' <NodeJsVersion> ';' <OSPlatform> <OSVersion>
13-
//
14-
// Examples:
15-
// - with <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Client ID; Node.js 16.13.1; Darwin 21.5.0)
16-
// - without <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Node.js 16.13.1; Darwin 21.5.0)
17-
18-
function checkUserAgentString(ua, clientId) {
19-
// Prefix: 'NodejsDatabricksSqlConnector/'
20-
// Version: three period-separated digits and optional suffix
21-
const re =
22-
/^(?<productName>NodejsDatabricksSqlConnector)\/(?<productVersion>\d+\.\d+\.\d+(-[^(]+)?)\s*\((?<comment>[^)]+)\)$/i;
23-
const match = re.exec(ua);
24-
expect(match).to.not.be.eq(null);
25-
26-
const { comment } = match.groups;
27-
28-
expect(comment.split(';').length).to.be.gte(2); // at least Node and OS version should be there
29-
30-
if (clientId) {
31-
expect(comment.trim()).to.satisfy((s) => s.startsWith(`${clientId};`));
32-
}
33-
}
34-
35-
it('matches pattern with clientId', () => {
36-
const clientId = 'Some Client ID';
37-
const ua = buildUserAgentString(clientId);
38-
checkUserAgentString(ua, clientId);
39-
});
40-
41-
it('matches pattern without clientId', () => {
42-
const ua = buildUserAgentString();
43-
checkUserAgentString(ua);
44-
});
45-
});
46-
47-
describe('formatProgress', () => {
48-
it('formats progress', () => {
49-
const result = formatProgress({
50-
headerNames: [],
51-
rows: [],
52-
});
53-
expect(result).to.be.eq('\n');
54-
});
55-
});
56-
57-
describe('ProgressUpdateTransformer', () => {
58-
it('should have equal columns', () => {
59-
const t = new ProgressUpdateTransformer();
60-
61-
expect(t.formatRow(['Column 1', 'Column 2'])).to.be.eq('Column 1 |Column 2 ');
62-
});
63-
64-
it('should format response as table', () => {
65-
const t = new ProgressUpdateTransformer({
66-
headerNames: ['Column 1', 'Column 2'],
67-
rows: [
68-
['value 1.1', 'value 1.2'],
69-
['value 2.1', 'value 2.2'],
70-
],
71-
footerSummary: 'footer',
72-
});
73-
74-
expect(String(t)).to.be.eq(
75-
'Column 1 |Column 2 \n' + 'value 1.1 |value 1.2 \n' + 'value 2.1 |value 2.2 \n' + 'footer',
76-
);
77-
});
78-
});
79-
80-
describe('definedOrError', () => {
81-
it('should return value if it is defined', () => {
82-
const values = [null, 0, 3.14, false, true, '', 'Hello, World!', [], {}];
83-
for (const value of values) {
84-
const result = definedOrError(value);
85-
expect(result).to.be.equal(value);
86-
}
87-
});
88-
89-
it('should throw error if value is undefined', () => {
90-
expect(() => {
91-
definedOrError(undefined);
92-
}).to.throw();
93-
});
94-
});
2+
const CloseableCollection = require('../../../lib/utils/CloseableCollection').default;
953

964
describe('CloseableCollection', () => {
975
it('should add item if not already added', () => {

0 commit comments

Comments
 (0)