Skip to content

Commit e187c12

Browse files
authored
Fix memory leak on Emit Batch (#12)
Fix memory leak on `Emit Batch`
1 parent a19e8ba commit e187c12

File tree

5 files changed

+5885
-47
lines changed

5 files changed

+5885
-47
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.1.4 (May 06, 2022)
2+
* Fix memory leak on `Emit Batch` behavior for `Read CSV attachment` action
3+
14
## 3.1.3 (April 22, 2022)
25
* Fix a bug when emit strategy 'Emit Batch' did not process correctly
36

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ To configure this action the following fields can be used:
5858
* `Convert Data types` - numeric, date and boolean data will be converted to their type instead of remaining strings, false by default.
5959
If `Emit Behavior` equals to `Emit Batch` - new field appears: `Batch Size` - max length of array for each message
6060

61+
#### Output Metadata
62+
- For `Fetch page` and `Emit Batch`: An object with key ***result*** that has an array as its value
63+
- For `Emit Individually`: Each object fill the entire message
64+
65+
#### Limitations
66+
* If you use `Fetch All` then component needs to store whole file and object in memory that cause big memory usage
67+
* In `Emit Batch` use wisely `Batch Size`, bigger number cause bigger memory usage
68+
* Possible exception: `[ERR_STREAM_PREMATURE_CLOSE]` could be thrown when flow stopped before finish emiting all data in file, as stream stopped
69+
6170
### Create CSV From Message Stream
6271

6372
This action will combine multiple incoming events into a CSV file until there is a gap

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"title": "CSV",
33
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
44
"docsUrl": "https://github.com/elasticio/csv-component",
5-
"version": "3.1.3",
5+
"version": "3.1.4",
66
"actions": {
77
"read_action": {
88
"main": "./lib/actions/read.js",

lib/actions/read.js

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,18 @@ async function errHelper(text) {
2626
await this.emit('end')
2727
}
2828

29-
function sliceIntoChunks(arr, chunkSize) {
30-
const res = [];
31-
for (let i = 0; i < arr.length; i += chunkSize) {
32-
const chunk = arr.slice(i, i + chunkSize);
33-
res.push(chunk);
34-
}
35-
return res;
36-
}
37-
3829
async function readCSV(msg, cfg) {
3930
const that = this
4031
const emitBehavior = cfg.emitAll;
4132
const { body } = msg;
4233

34+
let batchSize;
35+
if (emitBehavior === 'emitBatch') {
36+
batchSize = body.batchSize;
37+
if (!isPositiveInteger(batchSize)) {
38+
throw new Error("'batchSize' must be a positive integer!");
39+
}
40+
}
4341
// check if url provided in msg
4442
if (body.url && body.url.length > 0) {
4543
this.logger.info('URL found')
@@ -83,7 +81,8 @@ async function readCSV(msg, cfg) {
8381
this.emit('end')
8482
return
8583
}
86-
// control of node data stream
84+
85+
const buf = [];
8786
class CsvWriter extends Writable {
8887
async write(chunk) {
8988
let data = {}
@@ -92,9 +91,14 @@ async function readCSV(msg, cfg) {
9291
} else {
9392
data = arrayToObj(chunk)
9493
}
95-
if (emitBehavior === 'emitIndividually' || cfg.emitAll === false || cfg.emitAll === 'false') {
94+
if (emitBehavior === 'emitIndividually' || cfg.emitAll === false || cfg.emitAll === 'false' || emitBehavior === 'emitBatch') {
9695
parseStream.pause()
97-
await that.emit('data', messages.newMessageWithBody(data))
96+
if (emitBehavior === 'emitBatch') {
97+
buf.push(data);
98+
if (buf.length >= batchSize) await that.emit('data', messages.newMessageWithBody({ result: buf.splice(0, batchSize) }))
99+
} else {
100+
await that.emit('data', messages.newMessageWithBody(data))
101+
}
98102
parseStream.resume()
99103
} else {
100104
result.push(data)
@@ -120,17 +124,8 @@ async function readCSV(msg, cfg) {
120124

121125
if (emitBehavior === 'fetchAll' || cfg.emitAll === true || cfg.emitAll === 'true') {
122126
await this.emit('data', messages.newMessageWithBody({ result }))
123-
} else if (emitBehavior === 'emitBatch') {
124-
const { batchSize } = body;
125-
if (!isPositiveInteger(batchSize)) {
126-
throw new Error("'batchSize' must be a positive integer!");
127-
}
128-
const chunks = sliceIntoChunks(result, batchSize);
129-
// eslint-disable-next-line no-plusplus
130-
for (let i = 0; i < chunks.length; i++) {
131-
// eslint-disable-next-line no-await-in-loop
132-
await this.emit('data', messages.newMessageWithBody({ result: chunks[i] }))
133-
}
127+
} else if (emitBehavior === 'emitBatch' && buf.length > 0) {
128+
await that.emit('data', messages.newMessageWithBody({ result: buf }))
134129
}
135130
this.logger.info(`Complete, memory used: ${process.memoryUsage().heapUsed / 1024 / 1024} Mb`)
136131
}

0 commit comments

Comments
 (0)