Skip to content

Commit c52f2ba

Browse files
[PECO-1541] Optimize UC Volume ingestion (#247)
* [PECO-1541] Optimize UC Volume ingestion Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine the code Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent c00f72e commit c52f2ba

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

lib/DBSQLSession.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as fs from 'fs';
22
import * as path from 'path';
3+
import stream from 'node:stream';
4+
import util from 'node:util';
35
import { stringify, NIL, parse } from 'uuid';
46
import fetch, { HeadersInit } from 'node-fetch';
57
import {
@@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
3638
import ParameterError from './errors/ParameterError';
3739
import IClientContext, { ClientConfig } from './contracts/IClientContext';
3840

41+
// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
42+
const pipeline = util.promisify(stream.pipeline);
43+
3944
const defaultMaxRows = 100000;
4045

4146
interface OperationResponseShape {
@@ -271,8 +276,10 @@ export default class DBSQLSession implements IDBSQLSession {
271276
if (!response.ok) {
272277
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
273278
}
274-
const buffer = await response.arrayBuffer();
275-
fs.writeFileSync(localFile, Buffer.from(buffer));
279+
280+
const fileStream = fs.createWriteStream(localFile);
281+
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
282+
return pipeline(response.body, fileStream);
276283
}
277284

278285
private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
@@ -301,8 +308,19 @@ export default class DBSQLSession implements IDBSQLSession {
301308
const connectionProvider = await this.context.getConnectionProvider();
302309
const agent = await connectionProvider.getAgent();
303310

304-
const data = fs.readFileSync(localFile);
305-
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
311+
const fileStream = fs.createReadStream(localFile);
312+
const fileInfo = fs.statSync(localFile, { bigint: true });
313+
314+
const response = await fetch(presignedUrl, {
315+
method: 'PUT',
316+
headers: {
317+
...headers,
318+
// This header is required by server
319+
'Content-Length': fileInfo.size.toString(),
320+
},
321+
agent,
322+
body: fileStream,
323+
});
306324
if (!response.ok) {
307325
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
308326
}

0 commit comments

Comments
 (0)