Skip to content

Commit 1dc16ac

Browse files
[PECO-1532] Ignore the excess records in query results (#239)
* [PECO-1532] Arrow and CloudFetch result handlers: return row count with raw batch data Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refactor `ArrowResultConverter` - cleanup and make it skip empty batches Signed-off-by: Levko Kravets <levko.ne@gmail.com> * [PECO-1532] Ignore the excess records in arrow batches Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 6673660 commit 1dc16ac

8 files changed

+330
-120
lines changed

lib/result/ArrowResultConverter.ts

Lines changed: 78 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types';
1717
import IClientContext from '../contracts/IClientContext';
1818
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
19-
import { getSchemaColumns, convertThriftValue } from './utils';
19+
import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils';
2020

2121
const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
2222

@@ -26,15 +26,23 @@ type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
2626
export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
2727
protected readonly context: IClientContext;
2828

29-
private readonly source: IResultsProvider<Array<Buffer>>;
29+
private readonly source: IResultsProvider<ArrowBatch>;
3030

3131
private readonly schema: Array<TColumnDesc>;
3232

33-
private reader?: IterableIterator<RecordBatch<TypeMap>>;
33+
private recordBatchReader?: IterableIterator<RecordBatch<TypeMap>>;
3434

35-
private pendingRecordBatch?: RecordBatch<TypeMap>;
35+
// Remaining rows in current Arrow batch (not the record batch!)
36+
private remainingRows: number = 0;
3637

37-
constructor(context: IClientContext, source: IResultsProvider<Array<Buffer>>, { schema }: TGetResultSetMetadataResp) {
38+
// This is the next (!!) record batch to be read. It is unset only in two cases:
39+
// - prior to the first call to `fetchNext`
40+
// - when no more data available
41+
// This field is primarily used by a `hasMore`, so it can tell if next `fetchNext` will
42+
// actually return a non-empty result
43+
private prefetchedRecordBatch?: RecordBatch<TypeMap>;
44+
45+
constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: TGetResultSetMetadataResp) {
3846
this.context = context;
3947
this.source = source;
4048
this.schema = getSchemaColumns(schema);
@@ -44,7 +52,7 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
4452
if (this.schema.length === 0) {
4553
return false;
4654
}
47-
if (this.pendingRecordBatch) {
55+
if (this.prefetchedRecordBatch) {
4856
return true;
4957
}
5058
return this.source.hasMore();
@@ -55,47 +63,80 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
5563
return [];
5664
}
5765

58-
// eslint-disable-next-line no-constant-condition
59-
while (true) {
60-
// It's not possible to know if iterator has more items until trying
61-
// to get the next item. But we need to know if iterator is empty right
62-
// after getting the next item. Therefore, after creating the iterator,
63-
// we get one item more and store it in `pendingRecordBatch`. Next time,
64-
// we use that stored item, and prefetch the next one. Prefetched item
65-
// is therefore the next item we are going to return, so it can be used
66-
// to know if we actually can return anything next time
67-
const recordBatch = this.pendingRecordBatch;
68-
this.pendingRecordBatch = this.prefetch();
69-
70-
if (recordBatch) {
71-
const table = new Table(recordBatch);
72-
return this.getRows(table.schema, table.toArray());
66+
// It's not possible to know if iterator has more items until trying to get the next item.
67+
// So each time we read one batch ahead and store it, but process the batch prefetched on
68+
// a previous `fetchNext` call. Because we actually already have the next item - it's easy
69+
// to tell if the subsequent `fetchNext` will be able to read anything, and `hasMore` logic
70+
// becomes trivial
71+
72+
// This prefetch handles a first call to `fetchNext`, when all the internal fields are not initialized yet.
73+
// On subsequent calls to `fetchNext` it will do nothing
74+
await this.prefetch(options);
75+
76+
if (this.prefetchedRecordBatch) {
77+
// Consume a record batch fetched during previous call to `fetchNext`
78+
const table = new Table(this.prefetchedRecordBatch);
79+
this.prefetchedRecordBatch = undefined;
80+
// Get table rows, but not more than remaining count
81+
const arrowRows = table.toArray().slice(0, this.remainingRows);
82+
const result = this.getRows(table.schema, arrowRows);
83+
84+
// Reduce remaining rows count by a count of rows we just processed.
85+
// If the remaining count reached zero - we're done with current arrow
86+
// batch, so discard the batch reader
87+
this.remainingRows -= result.length;
88+
if (this.remainingRows === 0) {
89+
this.recordBatchReader = undefined;
7390
}
7491

75-
// eslint-disable-next-line no-await-in-loop
76-
const batches = await this.source.fetchNext(options);
77-
if (batches.length === 0) {
78-
this.reader = undefined;
79-
break;
80-
}
92+
// Prefetch the next record batch
93+
await this.prefetch(options);
8194

82-
const reader = RecordBatchReader.from<TypeMap>(batches);
83-
this.reader = reader[Symbol.iterator]();
84-
this.pendingRecordBatch = this.prefetch();
95+
return result;
8596
}
8697

8798
return [];
8899
}
89100

90-
private prefetch(): RecordBatch<TypeMap> | undefined {
91-
const item = this.reader?.next() ?? { done: true, value: undefined };
101+
// This method tries to read one more record batch and store it in `prefetchedRecordBatch` field.
102+
// If `prefetchedRecordBatch` is already non-empty - the method does nothing.
103+
// This method pulls the next item from source if needed, initializes a record batch reader and
104+
// gets the next item from it - until either reaches end of data or finds a non-empty record batch
105+
private async prefetch(options: ResultsProviderFetchNextOptions) {
106+
// This loop will be executed until a next non-empty record batch is retrieved
107+
// Another implicit loop condition (end of data) is checked in the loop body
108+
while (!this.prefetchedRecordBatch) {
109+
// First, try to fetch next item from source and initialize record batch reader.
110+
// If source has no more data - exit prematurely
111+
if (!this.recordBatchReader) {
112+
const sourceHasMore = await this.source.hasMore(); // eslint-disable-line no-await-in-loop
113+
if (!sourceHasMore) {
114+
return;
115+
}
116+
117+
const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop
118+
if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) {
119+
const reader = RecordBatchReader.from<TypeMap>(arrowBatch.batches);
120+
this.recordBatchReader = reader[Symbol.iterator]();
121+
this.remainingRows = arrowBatch.rowCount;
122+
}
123+
}
92124

93-
if (item.done || item.value === undefined) {
94-
this.reader = undefined;
95-
return undefined;
125+
// Try to get a next item from current record batch reader. The reader may be unavailable at this point -
126+
// in this case we fall back to a "done" state, and the `while` loop will do one more iteration attempting
127+
// to create a new reader. Eventually it will either succeed or reach end of source. This scenario also
128+
// handles readers which are already empty
129+
const item = this.recordBatchReader?.next() ?? { done: true, value: undefined };
130+
if (item.done || item.value === undefined) {
131+
this.recordBatchReader = undefined;
132+
} else {
133+
// Skip empty batches
134+
// eslint-disable-next-line no-lonely-if
135+
if (item.value.numRows > 0) {
136+
this.prefetchedRecordBatch = item.value;
137+
}
138+
}
96139
}
97-
98-
return item.value;
99140
}
100141

101142
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {

lib/result/ArrowResultHandler.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import LZ4 from 'lz4';
22
import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types';
33
import IClientContext from '../contracts/IClientContext';
44
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
5-
import { hiveSchemaToArrowSchema } from './utils';
5+
import { ArrowBatch, hiveSchemaToArrowSchema } from './utils';
66

7-
export default class ArrowResultHandler implements IResultsProvider<Array<Buffer>> {
7+
export default class ArrowResultHandler implements IResultsProvider<ArrowBatch> {
88
protected readonly context: IClientContext;
99

1010
private readonly source: IResultsProvider<TRowSet | undefined>;
@@ -35,22 +35,33 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer
3535

3636
public async fetchNext(options: ResultsProviderFetchNextOptions) {
3737
if (!this.arrowSchema) {
38-
return [];
38+
return {
39+
batches: [],
40+
rowCount: 0,
41+
};
3942
}
4043

4144
const rowSet = await this.source.fetchNext(options);
4245

4346
const batches: Array<Buffer> = [];
44-
rowSet?.arrowBatches?.forEach(({ batch }) => {
47+
let totalRowCount = 0;
48+
rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => {
4549
if (batch) {
4650
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
51+
totalRowCount += rowCount.toNumber(true);
4752
}
4853
});
4954

5055
if (batches.length === 0) {
51-
return [];
56+
return {
57+
batches: [],
58+
rowCount: 0,
59+
};
5260
}
5361

54-
return [this.arrowSchema, ...batches];
62+
return {
63+
batches: [this.arrowSchema, ...batches],
64+
rowCount: totalRowCount,
65+
};
5566
}
5667
}

lib/result/CloudFetchResultHandler.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
33
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
44
import IClientContext from '../contracts/IClientContext';
55
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
6+
import { ArrowBatch } from './utils';
67

7-
export default class CloudFetchResultHandler implements IResultsProvider<Array<Buffer>> {
8+
export default class CloudFetchResultHandler implements IResultsProvider<ArrowBatch> {
89
protected readonly context: IClientContext;
910

1011
private readonly source: IResultsProvider<TRowSet | undefined>;
@@ -13,7 +14,7 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
1314

1415
private pendingLinks: Array<TSparkArrowResultLink> = [];
1516

16-
private downloadTasks: Array<Promise<Buffer>> = [];
17+
private downloadTasks: Array<Promise<ArrowBatch>> = [];
1718

1819
constructor(
1920
context: IClientContext,
@@ -49,15 +50,20 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
4950
}
5051

5152
const batch = await this.downloadTasks.shift();
52-
const batches = batch ? [batch] : [];
53+
if (!batch) {
54+
return {
55+
batches: [],
56+
rowCount: 0,
57+
};
58+
}
5359

5460
if (this.isLZ4Compressed) {
55-
return batches.map((buffer) => LZ4.decode(buffer));
61+
batch.batches = batch.batches.map((buffer) => LZ4.decode(buffer));
5662
}
57-
return batches;
63+
return batch;
5864
}
5965

60-
private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
66+
private async downloadLink(link: TSparkArrowResultLink): Promise<ArrowBatch> {
6167
if (Date.now() >= link.expiryTime.toNumber()) {
6268
throw new Error('CloudFetch link has expired');
6369
}
@@ -68,7 +74,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
6874
}
6975

7076
const result = await response.arrayBuffer();
71-
return Buffer.from(result);
77+
return {
78+
batches: [Buffer.from(result)],
79+
rowCount: link.rowCount.toNumber(true),
80+
};
7281
}
7382

7483
private async fetch(url: RequestInfo, init?: RequestInit) {

lib/result/utils.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import {
1919
import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId } from '../../thrift/TCLIService_types';
2020
import HiveDriverError from '../errors/HiveDriverError';
2121

22+
export interface ArrowBatch {
23+
batches: Array<Buffer>;
24+
rowCount: number;
25+
}
26+
2227
export function getSchemaColumns(schema?: TTableSchema): Array<TColumnDesc> {
2328
if (!schema) {
2429
return [];

0 commit comments

Comments
 (0)