@@ -45,18 +45,25 @@ function init(cfg) {
45
45
function processAction ( msg , cfg ) {
46
46
const self = this ;
47
47
48
+ const startRow = cfg . reader . startRow || 0 ;
49
+
50
+ if ( startRow !== 0 && rowCount === 0 ) {
51
+ console . log ( 'Skipping the first %s rows' , startRow + 1 ) ;
52
+ }
53
+
48
54
if ( timeout ) {
49
55
clearTimeout ( timeout ) ;
50
56
}
51
57
52
58
timeout = setTimeout ( ( ) => {
53
59
console . log ( 'Closing the stream due to inactivity' ) ;
54
60
co ( function * gen ( ) {
55
- console . log ( 'The resulting CSV file contains %s rows' , rowCount ) ;
61
+ const finalRowCount = rowCount - startRow ;
62
+ console . log ( 'The resulting CSV file contains %s rows' , finalRowCount ) ;
56
63
stringifier . end ( ) ;
57
64
58
65
const messageToEmit = messages . newMessageWithBody ( {
59
- rowCount
66
+ rowCount : finalRowCount
60
67
} ) ;
61
68
const fileName = messageToEmit . id + '.csv' ;
62
69
messageToEmit . attachments [ fileName ] = {
@@ -73,12 +80,14 @@ function processAction(msg, cfg) {
73
80
} ) ;
74
81
} , 10000 ) ;
75
82
76
- let row = msg . body ;
77
- if ( cfg . reader . columns ) {
78
- const columns = Object . keys ( _ . keyBy ( cfg . reader . columns , 'property' ) ) ;
79
- row = _ . pick ( msg . body , columns ) ;
83
+ if ( startRow <= rowCount ) {
84
+ let row = msg . body ;
85
+ if ( cfg . reader . columns ) {
86
+ const columns = Object . keys ( _ . keyBy ( cfg . reader . columns , 'property' ) ) ;
87
+ row = _ . pick ( msg . body , columns ) ;
88
+ }
89
+ stringifier . write ( row ) ;
80
90
}
81
- stringifier . write ( row ) ;
82
91
rowCount ++ ;
83
92
this . emit ( 'end' ) ;
84
93
}
0 commit comments