Skip to content

Commit b33b502

Browse files
authored
Merge pull request #3 from elasticio/add-emit-behaviour
## 3.1.0 (March 3, 2022) * Added `Emit Batch` behavior for `Read CSV attachment` action
2 parents 3976f7b + 7060763 commit b33b502

File tree

8 files changed

+625
-5886
lines changed

8 files changed

+625
-5886
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.1.0 (March 3, 2022)
2+
* Added `Emit Batch` behavior for `Read CSV attachment` action
3+
14
## 3.0.0 (July 9, 2021)
25
* Deleted trigger:
36
- `Read CSV attachment`

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,15 @@ To configure this action the following fields can be used:
4848

4949
#### Config Fields
5050

51-
* `Emit Behavior` - this selector configures output behavior of the component. If the option is `Fetch All` - the component emits an array of messages, otherwise (`Emit Individually`) - the component emits a message per row.
51+
* `Emit Behavior` - this selector configures output behavior of the component. If the option is `Fetch All` - the component emits an array of messages; `Emit Individually` - the component emits a message per row; `Emit Batch` - component will produce a series of message where each message has an array of max length equal to the `Batch Size`;
5252

5353
#### Input Metadata
5454

5555
* `URL` - We will fetch this URL and parse it as CSV file
5656
* `Contains headers` - if true, the first row of parsed data will be interpreted as field names, false by default.
5757
* `Delimiter` - The delimiting character. Leave blank to auto-detect from a list of most common delimiters.
5858
* `Convert Data types` - numeric, date and boolean data will be converted to their type instead of remaining strings, false by default.
59+
If `Emit Behavior` equals to `Emit Batch` - new field appears: `Batch Size` - max length of array for each message
5960

6061
### Create CSV From Message Stream
6162

component.json

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
44
"docsUrl": "https://github.com/elasticio/csv-component",
55
"buildType": "docker",
6-
"version": "3.0.0",
6+
"version": "3.1.0",
77
"actions": {
88
"read_action": {
99
"main": "./lib/actions/read.js",
@@ -18,16 +18,14 @@
1818
"required": true,
1919
"viewClass": "SelectView",
2020
"model": {
21-
"true": "Fetch All",
22-
"false": "Emit Individually"
21+
"fetchAll": "Fetch All",
22+
"emitIndividually": "Emit Individually",
23+
"emitBatch": "Emit Batch"
2324
},
2425
"prompt": "Select Emit Behavior"
2526
}
2627
},
27-
"metadata": {
28-
"in": "./lib/schemas/read.in.json",
29-
"out": {}
30-
}
28+
"dynamicMetadata": true
3129
},
3230
"write_from_stream": {
3331
"main": "./lib/actions/writeStream.js",

lib/actions/read.js

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,19 @@ async function errHelper(text) {
2626
await this.emit('end')
2727
}
2828

29+
function sliceIntoChunks(arr, chunkSize) {
30+
const res = [];
31+
for (let i = 0; i < arr.length; i += chunkSize) {
32+
const chunk = arr.slice(i, i + chunkSize);
33+
res.push(chunk);
34+
}
35+
return res;
36+
}
37+
2938
async function readCSV(msg, cfg) {
3039
const that = this
31-
const emitAll = cfg.emitAll === true || cfg.emitAll === 'true'
32-
const { body } = msg
40+
const emitBehavior = cfg.emitAll;
41+
const { body } = msg;
3342

3443
// check if url provided in msg
3544
if (body.url && body.url.length > 0) {
@@ -60,9 +69,7 @@ async function readCSV(msg, cfg) {
6069
}
6170

6271
// if set "Fetch All" create object with results
63-
const outputMsg = {
64-
result: [],
65-
}
72+
const result = [];
6673

6774
let dataStream
6875
const parseStream = papa.parse(papa.NODE_STREAM_INPUT, parseOptions)
@@ -85,12 +92,12 @@ async function readCSV(msg, cfg) {
8592
} else {
8693
data = arrayToObj(chunk)
8794
}
88-
if (emitAll) {
89-
outputMsg.result.push(data)
90-
} else {
95+
if (emitBehavior === 'emitIndividually' || cfg.emitAll === false || cfg.emitAll === 'false') {
9196
parseStream.pause()
9297
await that.emit('data', messages.newMessageWithBody(data))
9398
parseStream.resume()
99+
} else {
100+
result.push(data)
94101
}
95102
}
96103
}
@@ -111,10 +118,56 @@ async function readCSV(msg, cfg) {
111118
return
112119
}
113120

114-
if (emitAll) {
115-
await this.emit('data', messages.newMessageWithBody(outputMsg))
121+
if (emitBehavior === 'fetchAll' || cfg.emitAll === true || cfg.emitAll === 'true') {
122+
await this.emit('data', messages.newMessageWithBody({ result }))
123+
} else if (emitBehavior === 'emitBatch') {
124+
const chunks = sliceIntoChunks(result, body.batchSize);
125+
// eslint-disable-next-line no-plusplus
126+
for (let i = 0; i < chunks.length; i++) {
127+
// eslint-disable-next-line no-await-in-loop
128+
await this.emit('data', messages.newMessageWithBody({ result: chunks[i] }))
129+
}
116130
}
117131
this.logger.info(`Complete, memory used: ${process.memoryUsage().heapUsed / 1024 / 1024} Mb`)
118132
}
119133

120-
module.exports.process = readCSV
134+
module.exports.process = readCSV;
135+
module.exports.getMetaModel = async function getMetaModel(cfg) {
136+
const meta = {
137+
in: {
138+
type: 'object',
139+
properties: {
140+
url: {
141+
type: 'string',
142+
required: true,
143+
title: 'URL'
144+
},
145+
header: {
146+
type: 'boolean',
147+
required: false,
148+
title: 'Contains headers'
149+
},
150+
delimiter: {
151+
type: 'string',
152+
required: false,
153+
title: 'Delimiter'
154+
},
155+
dynamicTyping: {
156+
type: 'boolean',
157+
required: false,
158+
title: 'Convert Data types'
159+
}
160+
}
161+
},
162+
out: {}
163+
};
164+
165+
if (cfg.emitBehavior === 'emitBatch') {
166+
meta.in.properties.batchSize = {
167+
title: 'Batch Size',
168+
type: 'number',
169+
required: true
170+
}
171+
}
172+
return meta;
173+
}

lib/schemas/read.in.json

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)