Skip to content

Commit ad3c290

Browse files
authored
New Read URL trigger
1 parent 7a4d92a commit ad3c290

File tree

8 files changed

+1960
-1276
lines changed

8 files changed

+1960
-1276
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.3.0 (May 21, 2024)
2+
* Added new trigger - `Read CSV file from URL`
3+
14
## 3.2.0 (May 02, 2023)
25
* Added new config fields to `Read CSV attachment` action:
36
* `Skip empty lines`

README.md

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
* [Read CSV attachment](#read-CSV-attachment)
1212
* [Create CSV From Message Stream](#create-CSV-from-message-stream)
1313
* [Create CSV From JSON Array](#create-CSV-from-JSON-array)
14+
* [Triggers](#triggers)
15+
* [Read CSV file from URL](#read-csv-file-from-url)
1416
* [Limitations](#limitations)
1517

1618
## Description
@@ -183,6 +185,51 @@ This action will convert an incoming array into a CSV file
183185
* `attachmentExpiryTime` - When the attachment is set to expire
184186
* `contentType` - Always set to `text/csv`
185187
188+
## Triggers
189+
190+
### Read CSV file from URL
191+
192+
This trigger read the CSV file from the URL provided in the configuration fields and output the result as a JSON object.
193+
The trigger works pretty much the same as the [Read CSV attachment action](#read-CSV-attachment). The difference is that all the settings are to be provided in the configuration fields, not in the body message. As the triggers do not have input messages.
194+
195+
#### Config Fields
196+
197+
* `Emit Behavior` (dropdown, required) - this selector configures output behavior of the component.
198+
* `Fetch All` - the component emits an array of messages;
199+
* `Emit Individually` - the component emits a message per row;
200+
* `Emit Batch` - component will produce a series of message where each message has an array of max length equal to the `Batch Size`;
201+
* `Skip empty lines` (checkbox, optional) - by default, empty lines are parsed if checked they will be skipped
202+
* `Comment char` (string, optional) - if specified, skips lines starting with this string
203+
204+
#### Input Metadata
205+
206+
* `URL` (string, required) - URL of the CSV file to parse
207+
* `Contains headers` (boolean, optional) - If true, the first row of parsed data will be interpreted as field names, false by default.
208+
* `Delimiter` (string, optional) - The delimiting character. Leave blank to auto-detect from a list of most common delimiters or provide your own
209+
<details><summary>Example</summary>
210+
if you use "$" as Delimiter, this CSV:
211+
212+
```
213+
a$b$c$d
214+
```
215+
216+
can be parsed into this JSON
217+
218+
``` json
219+
{
220+
"column0": "a",
221+
"column1": "b",
222+
"column2": "c",
223+
"column3": "d"
224+
}
225+
```
226+
</details>
227+
* `Convert Data types` (boolean, optional) - Numeric data and boolean data will be converted to their type instead of remaining strings, false by default.
228+
229+
#### Output Metadata
230+
- For `Fetch page` and `Emit Batch`: An object with key ***result*** that has an array as its value
231+
- For `Emit Individually`: Each object fill the entire message
232+
186233
## Limitations
187234
188235
### General
@@ -191,4 +238,4 @@ This action will convert an incoming array into a CSV file
191238
`EIO_REQUIRED_RAM_MB` environment variable with an appropriate value (e.g. value `1024` means that 1024 MB will be allocated) for the component in this case.
192239
* Maximal possible size for an attachment is 10 MB.
193240
* Attachments mechanism does not work with [Local Agent Installation](https://docs.elastic.io/getting-started/local-agent.html)
194-
* Inbound message in `Message Stream` and each element of `JSON Array` should be a plain Object, if value not a primitive type it will be set as `[object Object]`
241+
* Inbound message in `Message Stream` and each element of `JSON Array` should be a plain Object, if value not a primitive type it will be set as `[object Object]`

component.json

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
{
2-
"title": "CSV",
2+
"title": "CSV v3",
33
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
44
"docsUrl": "https://github.com/elasticio/csv-component",
5-
"version": "3.2.0",
5+
"version": "3.3.0",
66
"actions": {
77
"read_action": {
88
"main": "./lib/actions/read.js",
99
"title": "Read CSV attachment",
1010
"help": {
1111
"description": "This action will read the CSV attachment of the incoming message or from the specified URL and output a JSON object.",
12-
"link": "/components/csv/actions#read-csv-attachment"
12+
"link": "/components/csv/actions.html#read-csv-attachment"
1313
},
1414
"fields": {
1515
"emitAll": {
@@ -50,7 +50,7 @@
5050
"title": "Create CSV From Message Stream",
5151
"help": {
5252
"description": "Multiple incoming events can be combined into one CSV file with the write CSV action.",
53-
"link": "/components/csv/actions#write-csv-attachment"
53+
"link": "/components/csv/actions.html#write-csv-attachment"
5454
},
5555
"fields": {
5656
"uploadToAttachment": {
@@ -101,7 +101,7 @@
101101
"title": "Create CSV From JSON Array",
102102
"help": {
103103
"description": "Incoming array can be converted into one CSV file",
104-
"link": "/components/csv/actions#write-csv-attachment"
104+
"link": "/components/csv/actions.html#write-csv-attachment"
105105
},
106106
"fields": {
107107
"uploadToAttachment": {
@@ -147,5 +147,90 @@
147147
},
148148
"dynamicMetadata": true
149149
}
150+
},
151+
"triggers": {
152+
"read_trigger": {
153+
"type": "polling",
154+
"main": "./lib/triggers/read.js",
155+
"title": "Read CSV file from URL",
156+
"help": {
157+
"description": "This action will read the CSV attachment from the specified URL and output a JSON object.",
158+
"link": "/components/csv/triggers.html#read-csv-file"
159+
},
160+
"fields": {
161+
"url": {
162+
"label": "URL of the CSV file",
163+
"order": 90,
164+
"required": true,
165+
"viewClass": "TextFieldView"
166+
},
167+
"header": {
168+
"label": "Contains headers",
169+
"order": 80,
170+
"required": false,
171+
"viewClass": "CheckBoxView",
172+
"help": {
173+
"description": "If true, the first row of parsed data will be interpreted as field names, false by default"
174+
}
175+
},
176+
"delimiter": {
177+
"label": "Delimiter",
178+
"order": 70,
179+
"required": false,
180+
"viewClass": "TextFieldView",
181+
"help": {
182+
"description": "The delimiting character. Leave blank to auto-detect from a list of most common delimiters or provide your own"
183+
}
184+
},
185+
"dynamicTyping": {
186+
"label": "Convert Data types",
187+
"order": 60,
188+
"required": false,
189+
"viewClass": "CheckBoxView",
190+
"help": {
191+
"description": "Numeric data and boolean data will be converted to their type instead of remaining strings, false by default"
192+
}
193+
},
194+
"emitBehavior": {
195+
"label": "Emit Behavior",
196+
"required": true,
197+
"order": 50,
198+
"viewClass": "SelectView",
199+
"model": {
200+
"fetchAll": "Fetch All",
201+
"emitIndividually": "Emit Individually",
202+
"emitBatch": "Emit Batch"
203+
},
204+
"prompt": "Select Emit Behavior"
205+
},
206+
"batchSize": {
207+
"label": "Batch Size",
208+
"required": false,
209+
"order": 45,
210+
"viewClass": "TextFieldView",
211+
"help": {
212+
"description": "Enter batch size if the 'Emit Behavior' field is set to 'Emit Batch'"
213+
}
214+
},
215+
"skipEmptyLines": {
216+
"label": "Skip empty lines",
217+
"order": 40,
218+
"required": false,
219+
"viewClass": "CheckBoxView",
220+
"help": {
221+
"description": "By default, empty lines are parsed if checked they will be skipped"
222+
}
223+
},
224+
"comments": {
225+
"label": "Comment char",
226+
"order": 30,
227+
"required": false,
228+
"viewClass": "TextFieldView",
229+
"help": {
230+
"description": "If specified, skips lines starting with this string"
231+
}
232+
}
233+
}
234+
}
150235
}
151-
}
236+
}

lib/triggers/read.js

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/* eslint-disable no-restricted-syntax,semi,comma-dangle,class-methods-use-this */
2+
3+
const { AttachmentProcessor } = require('@elastic.io/component-commons-library')
4+
const { Writable } = require('stream');
5+
const { messages } = require('elasticio-node')
6+
const stream = require('stream')
7+
const util = require('util')
8+
const papa = require('papaparse')
9+
const { getUserAgent } = require('../util');
10+
11+
const pipeline = util.promisify(stream.pipeline);
12+
13+
// transform array to obj, for example:
14+
// ['aa', 'bb', 'cc'] => {column0: 'aa', column1: 'bb', column2: 'cc'}
15+
function arrayToObj(arr) {
16+
let columns = {}
17+
arr.forEach((value, index) => {
18+
columns = { ...columns, ...{ [`column${index}`]: value } }
19+
})
20+
return columns
21+
}
22+
23+
async function readCSV(msg, cfg) {
24+
const that = this;
25+
const {
26+
url,
27+
header,
28+
delimiter,
29+
dynamicTyping,
30+
emitBehavior,
31+
batchSize,
32+
skipEmptyLines,
33+
comments
34+
} = cfg;
35+
36+
if (emitBehavior === 'emitBatch') {
37+
if (!isPositiveInteger(parseInt(batchSize, 10))) {
38+
throw new Error("'batchSize' must be a positive integer!");
39+
}
40+
}
41+
if (!url) throw new Error('URL of the CSV is missing');
42+
43+
const parseOptions = {
44+
header,
45+
dynamicTyping,
46+
delimiter,
47+
skipEmptyLines,
48+
comments,
49+
}
50+
51+
// if set "Fetch All" create object with results
52+
const result = [];
53+
54+
let dataStream;
55+
const parseStream = papa.parse(papa.NODE_STREAM_INPUT, parseOptions);
56+
57+
const attachmentProcessor = new AttachmentProcessor(getUserAgent(), msg.id);
58+
try {
59+
dataStream = await attachmentProcessor.getAttachment(url, 'stream')
60+
this.logger.info('File received, trying to parse CSV')
61+
} catch (err) {
62+
this.logger.error(`URL - "${url}" unreachable: ${err}`);
63+
await this.emit('error', `URL - "${url}" unreachable: ${err}`);
64+
return;
65+
}
66+
67+
const buf = [];
68+
class CsvWriter extends Writable {
69+
async write(chunk) {
70+
let data = {}
71+
if (parseOptions.header) {
72+
data = chunk
73+
} else {
74+
data = arrayToObj(chunk)
75+
}
76+
if (emitBehavior === 'emitIndividually' || emitBehavior === false || emitBehavior === 'false' || emitBehavior === 'emitBatch') {
77+
parseStream.pause()
78+
if (emitBehavior === 'emitBatch') {
79+
buf.push(data);
80+
if (buf.length >= batchSize) await that.emit('data', messages.newMessageWithBody({ result: buf.splice(0, batchSize) }))
81+
} else {
82+
await that.emit('data', messages.newMessageWithBody(data))
83+
}
84+
parseStream.resume()
85+
} else {
86+
result.push(data)
87+
}
88+
}
89+
}
90+
const writerStream = new CsvWriter();
91+
writerStream.logger = this.logger;
92+
93+
try {
94+
await pipeline(
95+
dataStream.data,
96+
parseStream,
97+
writerStream
98+
)
99+
this.logger.info('File parsed successfully')
100+
} catch (err) {
101+
this.logger.error(`error during file parse: ${err}`);
102+
await this.emit('error', `error during file parse: ${err}`)
103+
return;
104+
}
105+
106+
if (emitBehavior === 'fetchAll') {
107+
await this.emit('data', messages.newMessageWithBody({ result }))
108+
} else if (emitBehavior === 'emitBatch' && buf.length > 0) {
109+
await that.emit('data', messages.newMessageWithBody({ result: buf }))
110+
}
111+
this.logger.info(`Complete, memory used: ${process.memoryUsage().heapUsed / 1024 / 1024} Mb`)
112+
}
113+
114+
module.exports.process = readCSV;
115+
116+
function isPositiveInteger(input) {
117+
return Number.isSafeInteger(input) && input > 0;
118+
}

0 commit comments

Comments
 (0)