@@ -5,15 +5,13 @@ const _ = require('underscore');
5
5
const csv = require ( 'csv' ) ;
6
6
const { messages } = require ( 'elasticio-node' ) ;
7
7
const moment = require ( 'moment' ) ;
8
- const debug = require ( 'debug' ) ( 'csv' ) ;
9
8
const axios = require ( 'axios' ) ;
10
9
const { Writable } = require ( 'stream' ) ;
11
10
const util = require ( '../util/util' ) ;
12
11
13
12
const REQUEST_TIMEOUT = process . env . REQUEST_TIMEOUT || 10000 ; // ms
14
13
const REQUEST_MAX_RETRY = process . env . REQUEST_MAX_RETRY || 7 ;
15
14
const REQUEST_RETRY_DELAY = process . env . REQUEST_RETRY_DELAY || 7000 ; // ms
16
-
17
15
const formatters = {
18
16
date : ( value , col ) => moment ( value , col . format ) . toDate ( ) ,
19
17
number : ( value , col ) => {
@@ -64,16 +62,15 @@ async function ProcessRead(msg, cfg) {
64
62
let index = 0 ;
65
63
const separator = cfg . reader ? cfg . reader . separator || ',' : ',' ;
66
64
const startRow = cfg . reader ? cfg . reader . startRow || 0 : 0 ;
67
-
68
- console . log ( 'Incoming message is %j' , msg ) ;
65
+ that . logger . trace ( 'Incoming message is %j' , msg ) ;
69
66
if ( ! csvURL || csvURL . length === 0 ) {
70
67
// Now let's check for the attachment
71
68
if ( msg && msg . attachments && Object . keys ( msg . attachments ) . length > 0 ) {
72
69
const key = Object . keys ( msg . attachments ) [ 0 ] ;
73
- console . log ( 'Found attachment key=%s attachment=%j' , key , msg . attachments [ key ] ) ;
70
+ that . logger . trace ( 'Found attachment key=%s attachment=%j' , key , msg . attachments [ key ] ) ;
74
71
csvURL = msg . attachments [ key ] . url ;
75
72
} else {
76
- console . error ( 'URL of the CSV is missing' ) ;
73
+ that . logger . error ( 'URL of the CSV is missing' ) ;
77
74
that . emit ( 'error' , 'URL of the CSV is missing' ) ;
78
75
return that . emit ( 'end' ) ;
79
76
}
@@ -91,18 +88,18 @@ async function ProcessRead(msg, cfg) {
91
88
class CsvWriter extends Writable {
92
89
async write ( chunk , encoding , callback ) {
93
90
parser . pause ( ) ;
94
- debug ( 'Processing %d row...' , index ) ;
95
- debug ( 'Memory usage: %d Mb' , process . memoryUsage ( ) . heapUsed / 1024 / 1024 ) ;
91
+ this . logger . debug ( 'Processing %d row...' , index ) ;
92
+ this . logger . debug ( 'Memory usage: %d Mb' , process . memoryUsage ( ) . heapUsed / 1024 / 1024 ) ;
96
93
if ( index >= startRow ) {
97
94
const msg = createRowMessage ( chunk , cfg . reader . columns ) ;
98
95
if ( cfg . emitAll ) {
99
- debug ( 'Row #%s added to result array' , index ) ;
96
+ this . logger . debug ( 'Row #%s added to result array' , index ) ;
100
97
outputMsg . result . push ( msg . body ) ;
101
98
} else {
102
99
await that . emit ( 'data' , msg ) ;
103
100
}
104
101
} else {
105
- debug ( 'Row #%s is skipped based on configuration' , index ) ;
102
+ this . logger . debug ( 'Row #%s is skipped based on configuration' , index ) ;
106
103
}
107
104
index += 1 ;
108
105
parser . resume ( ) ;
@@ -112,18 +109,19 @@ async function ProcessRead(msg, cfg) {
112
109
if ( cfg . emitAll ) {
113
110
await that . emit ( 'data' , messages . newMessageWithBody ( outputMsg ) ) ;
114
111
}
115
- debug ( 'Processing csv writer end event...' ) ;
116
- debug ( 'Memory usage: %d Mb' , process . memoryUsage ( ) . heapUsed / 1024 / 1024 ) ;
112
+ this . logger . debug ( 'Processing csv writer end event...' ) ;
113
+ this . logger . debug ( 'Memory usage: %d Mb' , process . memoryUsage ( ) . heapUsed / 1024 / 1024 ) ;
117
114
118
- debug ( `Number of lines: ${ index } ` ) ;
115
+ this . logger . debug ( `Number of lines: ${ index } ` ) ;
119
116
await that . emit ( 'end' ) ;
120
117
ended = true ;
121
118
}
122
119
}
123
120
124
121
const writer = new CsvWriter ( ) ;
122
+ writer . logger = that . logger ;
125
123
126
- debug ( 'Sending GET request to url=%s' , csvURL ) ;
124
+ that . logger . debug ( 'Sending GET request to url=%s' , csvURL ) ;
127
125
const ax = axios . create ( ) ;
128
126
util . addRetryCountInterceptorToAxios ( ax ) ;
129
127
const response = await ax ( {
@@ -134,7 +132,7 @@ async function ProcessRead(msg, cfg) {
134
132
retry : REQUEST_MAX_RETRY ,
135
133
delay : REQUEST_RETRY_DELAY ,
136
134
} ) ;
137
- debug ( 'Have got response status=%s headers=%j' , response . status , response . headers ) ;
135
+ that . logger . debug ( 'Have got response status=%s headers=%j' , response . status , response . headers ) ;
138
136
if ( response . status !== 200 ) {
139
137
await that . emit ( 'error' , `Unexpected response code code=${ response . status } ` ) ;
140
138
ended = true ;
0 commit comments