Skip to content

Commit 328db53

Browse files
authored
Steward fix and IncludeHeader option (#33)
1 parent cefbb1e commit 328db53

File tree

9 files changed

+90
-2716
lines changed

9 files changed

+90
-2716
lines changed

.eslintrc.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const NEVER = 'never';
88

99
module.exports = {
1010
'parserOptions': {
11-
ecmaVersion: 2017
11+
ecmaVersion: 2019
1212
},
1313
'env': {
1414
es6: true,

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.1.5 (July 15, 2019)
2+
3+
* Add retries mechanism to all request, refactor component to use axios library
4+
15
## 1.1.4 (June 27, 2019)
26

37
* Added emitAll feature for CSV Write action

README.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ attachment. It can also write a CSV file from the incoming events.
1313

1414
## Environment variables
1515

16-
Component does not has any required environment variables, but we suggest to use `EIO_REQUIRED_RAM_MB` - recommended value of allocated memory is `512` MB
17-
16+
1. `EIO_REQUIRED_RAM_MB` - recommended value of allocated memory is `512` MB
17+
2. `REQUEST_TIMEOUT` - HTTP request timeout in milliseconds, default value 10000
18+
3. `REQUEST_RETRY_DELAY` - delay between retry attempts in milliseconds, default value 7000
19+
4. `REQUEST_MAX_RETRY` - number of HTTP request retry attempts, default value 7
20+
5. `REQUEST_MAX_CONTENT_LENGTH` - max size of http request in bytes, default value: 104857600
21+
6. `TIMEOUT_BETWEEN_EVENTS` - number of milliseconds write action wait before creating separate attachments, default value: 10000
1822

1923
## Credentials
2024

@@ -56,6 +60,8 @@ a `JSON` object. To configure this action the following fields can be used:
5660

5761
### Write CSV attachment
5862

63+
* `Include Header` - this select configures output behavior of the component. If option is `Yes` or no value chosen than header of csv file will be written to attachment, this is default behavior. If value `No` selected than csv header will be omitted from attachment.
64+
5965
This action will combine multiple incoming events into a CSV file until there is a gap
6066
of more than 10 seconds between events. Afterwards, the CSV file will be closed
6167
and attached to the outgoing message.
@@ -95,6 +101,6 @@ able to handle file attachments.
95101

96102
1. You may get `Component run out of memory and terminated.` error during run-time, that means that component needs more memory, please add
97103
`EIO_REQUIRED_RAM_MB` environment variable with an appropriate value (e.g. value `512` means that 512 MB will be allocated) for the component in this case.
98-
2. Maximal possible size for an attachment is 10 MB.
99-
3. Attachments mechanism does not work with [Local Agent Installation](https://support.elastic.io/support/solutions/articles/14000076461-announcing-the-local-agent-)
100-
104+
2. You may get `Error: write after end` error, as a current workaround try increase value of environment variable: `TIMEOUT_BETWEEN_EVENTS`.
105+
3. Maximal possible size for an attachment is 10 MB.
106+
4. Attachments mechanism does not work with [Local Agent Installation](https://support.elastic.io/support/solutions/articles/14000076461-announcing-the-local-agent-)

component.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
"main": "./lib/actions/write.js",
4949
"title": "Write CSV attachment",
5050
"fields": {
51+
"includeHeaders": {
52+
"label" : "Include Headers",
53+
"required": false,
54+
"viewClass" : "SelectView",
55+
"description" : "Default Yes",
56+
"model": {
57+
"Yes" : "Yes",
58+
"No" : "No"
59+
},
60+
"prompt": "Include headers? Default Yes."
61+
},
5162
"writer": {
5263
"viewClass": "CSVWriteView"
5364
}

lib/actions/write.js

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,34 @@
11
'use strict';
22

3-
const request = require('request-promise');
3+
const axios = require('axios');
44
const co = require('co');
55
const csv = require('csv');
66
const _ = require('lodash');
77
const messages = require('elasticio-node').messages;
88
const client = require('elasticio-rest-node')();
99

10-
const TIMEOUT_BETWEEN_EVENTS = 10000;
10+
const util = require('../util/util');
11+
12+
const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 10000; // 10s
13+
const REQUEST_MAX_RETRY = process.env.REQUEST_MAX_RETRY || 7;
14+
const REQUEST_RETRY_DELAY = process.env.REQUEST_RETRY_DELAY || 7000; // 7s
15+
const REQUEST_MAX_CONTENT_LENGTH = process.env.REQUEST_MAX_CONTENT_LENGTH || 104857600; // 100MB
16+
const TIMEOUT_BETWEEN_EVENTS = process.env.TIMEOUT_BETWEEN_EVENTS || 10000; // 10s;
1117

1218
let stringifier;
1319
let signedUrl;
1420
let timeout;
1521
let rowCount = 0;
16-
17-
exports.init = function init(cfg) {
22+
let ax;
23+
let putUrl;
24+
exports.init = async function init(cfg) {
1825
return co(function* gen() {
1926

2027
const delimiter = cfg.writer.separator || ',';
28+
const header = cfg.includeHeaders !== 'No';
2129
console.log('Using delimiter: \'%s\'', delimiter);
2230
const options = {
23-
header: true,
31+
header,
2432
delimiter
2533
};
2634

@@ -33,13 +41,11 @@ exports.init = function init(cfg) {
3341
}
3442

3543
stringifier = csv.stringify(options);
36-
3744
signedUrl = yield client.resources.storage.createSignedUrl();
38-
39-
let putUrl = signedUrl.put_url;
45+
putUrl = signedUrl.put_url;
4046
console.log('CSV file to be uploaded file to uri=%s', putUrl);
41-
42-
stringifier.pipe(request.put(putUrl));
47+
ax = axios.create();
48+
util.addRetryCountInterceptorToAxios(ax);
4349
});
4450
};
4551

@@ -51,13 +57,20 @@ exports.process = async function ProcessAction(msg, cfg) {
5157
clearTimeout(timeout);
5258
}
5359

60+
5461
timeout = setTimeout(() => {
5562
console.log('Closing the stream due to inactivity');
5663
co(function* gen() {
5764
const finalRowCount = rowCount;
5865
console.log('The resulting CSV file contains %s rows', finalRowCount);
66+
ax.put(putUrl, stringifier ,{
67+
method: 'PUT',
68+
timeout: REQUEST_TIMEOUT,
69+
retry: REQUEST_MAX_RETRY,
70+
delay: REQUEST_RETRY_DELAY,
71+
maxContentLength: REQUEST_MAX_CONTENT_LENGTH
72+
});
5973
stringifier.end();
60-
6174
const messageToEmit = messages.newMessageWithBody({
6275
rowCount: finalRowCount
6376
});
@@ -68,14 +81,11 @@ exports.process = async function ProcessAction(msg, cfg) {
6881
};
6982
signedUrl = null;
7083
rowCount = 0;
71-
7284
console.log('Emitting message %j', messageToEmit);
7385
yield self.emit('data', messageToEmit);
74-
7586
yield exports.init(cfg);
7687
});
7788
}, TIMEOUT_BETWEEN_EVENTS);
78-
7989
let row = msg.body.writer;
8090
console.log(`Incoming data: ${JSON.stringify(row)}`);
8191
if (cfg.writer.columns) {

lib/read.js

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
'use strict';
22

3+
const util = require('./util/util');
4+
35
const _ = require('underscore');
46
const csv = require('csv');
57
const messages = require('elasticio-node').messages;
68
const moment = require('moment');
79
const debug = require('debug')('csv');
8-
const request = require('request');
10+
const axios = require('axios');
911
const { Writable } = require('stream');
1012

1113
const formatters = {};
14+
const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 10000; //ms
15+
const REQUEST_MAX_RETRY = process.env.REQUEST_MAX_RETRY || 7;
16+
const REQUEST_RETRY_DELAY = process.env.REQUEST_RETRY_DELAY || 7000; // ms
17+
1218

1319
formatters.date = function formatDate(value, col) {
1420

@@ -123,16 +129,23 @@ async function ProcessRead(msg, cfg) {
123129
const writer = new CsvWriter();
124130

125131
debug('Sending GET request to url=%s', csvURL);
126-
request.get(csvURL)
127-
.on('response', async function onResponse(response) {
128-
debug('Have got response status=%s headers=%j', response.statusCode, response.headers);
129-
if (response.statusCode !== 200) {
130-
await that.emit('error', 'Unexpected response code code=' + response.statusCode);
131-
ended = true;
132-
throw Error('Unexpected response code code=' + response.statusCode);
133-
}
134-
})
135-
.pipe(parser).pipe(writer);
132+
const ax = axios.create();
133+
util.addRetryCountInterceptorToAxios(ax);
134+
const response = await ax({
135+
method: 'get',
136+
url: csvURL,
137+
responseType: 'stream',
138+
timeout: REQUEST_TIMEOUT,
139+
retry: REQUEST_MAX_RETRY,
140+
delay: REQUEST_RETRY_DELAY
141+
});
142+
debug('Have got response status=%s headers=%j', response.status, response.headers);
143+
if (response.status !== 200) {
144+
await that.emit('error', 'Unexpected response code code=' + response.status);
145+
ended = true;
146+
throw Error('Unexpected response code code=' + response.status);
147+
}
148+
response.data.pipe(parser).pipe(writer);
136149

137150
// Need to wait for processing all messages
138151
while (!ended) {

lib/util/util.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
exports.addRetryCountInterceptorToAxios = (ax) => {
2+
ax.interceptors.response.use(undefined, (err) => { // Retry count interceptor for axios
3+
const { config } = err;
4+
if (!config || !config.retry || !config.delay) {return Promise.reject(err);}
5+
config.currentRetryCount = config.currentRetryCount || 0;
6+
if (config.currentRetryCount >= config.retry) {
7+
return Promise.reject(err);
8+
}
9+
config.currentRetryCount += 1;
10+
return new Promise(resolve => setTimeout(() => resolve(ax(config)), config.delay));
11+
});
12+
};

0 commit comments

Comments
 (0)