Skip to content

Commit 1adcb94

Browse files
[PECO-983] Support streaming query results via Node.js streams (#262)
* [PECO-983] Support streaming query results via Node.js streams Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * CR1 Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 3c29fe2 commit 1adcb94

File tree

4 files changed

+94
-0
lines changed

4 files changed

+94
-0
lines changed

lib/DBSQLOperation.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { stringify, NIL } from 'uuid';
2+
import { Readable } from 'node:stream';
23
import IOperation, {
34
FetchOptions,
45
FinishedOptions,
@@ -7,6 +8,7 @@ import IOperation, {
78
IteratorOptions,
89
IOperationChunksIterator,
910
IOperationRowsIterator,
11+
NodeStreamOptions,
1012
} from './contracts/IOperation';
1113
import {
1214
TGetOperationStatusResp,
@@ -101,6 +103,23 @@ export default class DBSQLOperation implements IOperation {
101103
return new OperationRowsIterator(this, options);
102104
}
103105

106+
public toNodeStream(options?: NodeStreamOptions): Readable {
107+
let iterable: IOperationChunksIterator | IOperationRowsIterator | undefined;
108+
109+
switch (options?.mode ?? 'chunks') {
110+
case 'chunks':
111+
iterable = this.iterateChunks(options?.iteratorOptions);
112+
break;
113+
case 'rows':
114+
iterable = this.iterateRows(options?.iteratorOptions);
115+
break;
116+
default:
117+
throw new Error(`IOperation.toNodeStream: unsupported mode ${options?.mode}`);
118+
}
119+
120+
return Readable.from(iterable, options?.streamOptions);
121+
}
122+
104123
public get id() {
105124
const operationId = this.operationHandle?.operationId?.guid;
106125
return operationId ? stringify(operationId) : NIL;

lib/contracts/IOperation.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Readable, ReadableOptions } from 'node:stream';
12
import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types';
23
import Status from '../dto/Status';
34

@@ -35,6 +36,12 @@ export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
3536
readonly operation: IOperation;
3637
}
3738

39+
export interface NodeStreamOptions {
40+
mode?: 'chunks' | 'rows'; // defaults to 'chunks'
41+
iteratorOptions?: IteratorOptions;
42+
streamOptions?: ReadableOptions;
43+
}
44+
3845
export default interface IOperation {
3946
/**
4047
* Operation identifier
@@ -86,4 +93,6 @@ export default interface IOperation {
8693
iterateChunks(options?: IteratorOptions): IOperationChunksIterator;
8794

8895
iterateRows(options?: IteratorOptions): IOperationRowsIterator;
96+
97+
toNodeStream(options?: NodeStreamOptions): Readable;
8998
}

tests/e2e/iterators.test.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,64 @@ describe('Iterators', () => {
8888
await session.close();
8989
}
9090
});
91+
92+
it('should get all chunks via Nodejs stream', async () => {
93+
const session = await openSession({ arrowEnabled: false });
94+
// @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession
95+
sinon.spy(session.context.driver, 'fetchResults');
96+
try {
97+
const expectedRowsCount = 10;
98+
99+
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
100+
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, {
101+
maxRows: null,
102+
});
103+
104+
const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
105+
const chunkSize = 4;
106+
const expectedChunks = arrayChunks(expectedRows, chunkSize);
107+
108+
const stream = operation.toNodeStream({
109+
mode: 'chunks',
110+
iteratorOptions: { maxRows: chunkSize },
111+
});
112+
113+
let index = 0;
114+
for await (const chunk of stream) {
115+
expect(chunk).to.deep.equal(expectedChunks[index]);
116+
index += 1;
117+
}
118+
119+
expect(index).to.equal(expectedChunks.length);
120+
} finally {
121+
await session.close();
122+
}
123+
});
124+
125+
it('should get all rows via Nodejs stream', async () => {
126+
const session = await openSession({ arrowEnabled: false });
127+
// @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession
128+
sinon.spy(session.context.driver, 'fetchResults');
129+
try {
130+
const expectedRowsCount = 10;
131+
132+
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`);
133+
134+
const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
135+
136+
const stream = operation.toNodeStream({
137+
mode: 'rows',
138+
});
139+
140+
let index = 0;
141+
for await (const row of stream) {
142+
expect(row).to.deep.equal(expectedRows[index]);
143+
index += 1;
144+
}
145+
146+
expect(index).to.equal(expectedRows.length);
147+
} finally {
148+
await session.close();
149+
}
150+
});
91151
});

tests/unit/.stubs/OperationStub.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ import IOperation, {
22
IOperationChunksIterator,
33
IOperationRowsIterator,
44
IteratorOptions,
5+
NodeStreamOptions,
56
} from '../../../lib/contracts/IOperation';
67
import Status from '../../../lib/dto/Status';
78
import { OperationChunksIterator, OperationRowsIterator } from '../../../lib/utils/OperationIterator';
9+
import { Readable } from 'node:stream';
810

911
export default class OperationStub implements IOperation {
1012
public readonly id: string = '';
@@ -59,4 +61,8 @@ export default class OperationStub implements IOperation {
5961
public iterateRows(options?: IteratorOptions): IOperationRowsIterator {
6062
return new OperationRowsIterator(this, options);
6163
}
64+
65+
public toNodeStream(options?: NodeStreamOptions): Readable {
66+
throw new Error('Not implemented');
67+
}
6268
}

0 commit comments

Comments
 (0)