@@ -8,6 +8,7 @@ const timeToString = (date) => JSON.stringify(new Date(date)).replace(/"/g, '');
8
8
const isDateValid = ( date ) => new Date ( date ) . toString ( ) !== 'Invalid Date' ;
9
9
const isNumberNaN = ( num ) => Number ( num ) . toString ( ) === 'NaN' ;
10
10
const MAX_FETCH = 10000 ;
11
+ const isDebugFlow = process . env . ELASTICIO_FLOW_TYPE === 'debug' ;
11
12
12
13
function getSelectedFields ( cfg ) {
13
14
const { selectedFields = [ ] } = cfg ;
@@ -27,20 +28,21 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
27
28
const {
28
29
sobject,
29
30
linkedObjects = [ ] ,
30
- pageSize = MAX_FETCH ,
31
31
emitBehavior = 'emitIndividually' ,
32
32
} = cfg ;
33
33
let {
34
34
singlePagePerInterval,
35
35
} = cfg ;
36
36
37
- let { startTime, endTime } = cfg ;
37
+ let { startTime, endTime, pageSize } = cfg ;
38
+ if ( ! pageSize ) pageSize = MAX_FETCH ;
38
39
if ( ! startTime ) startTime = 0 ;
39
40
if ( ! endTime ) endTime = currentTime ;
40
41
if ( ! isDateValid ( startTime ) ) throw new Error ( 'invalid "Start Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ' ) ;
41
42
if ( ! isDateValid ( endTime ) ) throw new Error ( 'invalid "End Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ' ) ;
42
43
if ( pageSize > MAX_FETCH || isNumberNaN ( pageSize ) || Number ( pageSize ) < 0 ) throw new Error ( `"Size of Polling Page" must be valid number between 0 and ${ MAX_FETCH } ` ) ;
43
- const from = snapshot ?. nextStartTime || startTime ;
44
+ pageSize = Number ( pageSize ) ;
45
+ let from = snapshot ?. nextStartTime || startTime ;
44
46
const to = endTime || currentTime ;
45
47
let nextStartTime = currentTime ;
46
48
@@ -57,98 +59,67 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
57
59
sobject,
58
60
selectedObjects : linkedObjects . reduce ( ( query , obj ) => ( obj . startsWith ( '!' ) ? query : `${ query } , ${ obj } .*` ) , selectedFields ) ,
59
61
linkedObjects,
62
+ maxFetch : pageSize ,
60
63
} ;
61
64
62
- const isDebugFlow = process . env . ELASTICIO_FLOW_TYPE === 'debug' ;
63
65
if ( isDebugFlow ) {
64
66
options . maxFetch = 10 ;
65
67
singlePagePerInterval = true ;
66
68
this . logger . info ( 'Debug flow detected, set maxFetch to 10' ) ;
67
69
}
68
70
69
71
let proceed = true ;
70
- let hasNextPage = false ;
72
+ let emitted ;
71
73
let iteration = 1 ;
72
- let results = [ ] ;
74
+ let results ;
73
75
try {
74
76
do {
75
- if ( ! hasNextPage ) {
76
- options . whereCondition = `LastModifiedDate >= ${ timeToString ( from ) } AND LastModifiedDate < ${ timeToString ( to ) } ` ;
77
- this . logger . debug ( 'Start poll object with options: %j' , options ) ;
78
- results = await callJSForceMethod . call ( this , cfg , 'pollingSelectQuery' , options ) ;
79
- this . logger . info ( `Polling iteration ${ iteration } - ${ results . length } results found` ) ;
80
- iteration ++ ;
81
- }
77
+ options . whereCondition = `LastModifiedDate >= ${ timeToString ( from ) } AND LastModifiedDate < ${ timeToString ( to ) } ` ;
78
+ this . logger . debug ( 'Start poll object with options: %j' , options ) ;
79
+ results = await callJSForceMethod . call ( this , cfg , 'pollingSelectQuery' , options ) ;
80
+ this . logger . info ( `Polling iteration ${ iteration } - ${ results . length } results found` ) ;
81
+ iteration ++ ;
82
82
if ( results . length !== 0 ) {
83
- this . logger . debug ( 'New records found, check other options...' ) ;
84
- nextStartTime = currentTime ;
85
- if ( singlePagePerInterval && snapshot . lastElementId ) {
86
- this . logger . debug ( 'Snapshot contain lastElementId, going to delete records that have already been emitted' ) ;
87
- const lastElement = results . filter ( ( item ) => item . Id === snapshot . lastElementId ) [ 0 ] ;
88
- const lastElementIndex = results . indexOf ( lastElement ) ;
89
- results = results . slice ( lastElementIndex + 1 ) ;
90
- this . logger . debug ( 'Emitted records deleted. Current results length is %s' , results . length ) ;
91
- }
92
- if ( results . length === MAX_FETCH ) {
93
- this . logger . debug ( 'The size of the resulting array is equal to MAX_FETCH, so all entries that have the same LastModifiedDate as the last entry will be deleted from the resulting array to prevent emitting duplicates' ) ;
94
- nextStartTime = results [ results . length - 1 ] . LastModifiedDate ;
95
- const filteredResults = results . filter ( ( item ) => item . LastModifiedDate === nextStartTime ) ;
96
- results = results . slice ( 0 , MAX_FETCH - filteredResults . length ) ;
97
- this . logger . debug ( 'Entries that have the same LastModifiedDate as the last entry deleted. Current size of the resulting array is %s' , results . length ) ;
98
- }
99
- if ( results . length >= Number ( pageSize ) ) {
100
- this . logger . debug ( 'The size of the resulting array >= pageSize. Going to process one page...' ) ;
101
- hasNextPage = true ;
102
- const pageResults = results . slice ( 0 , pageSize ) ;
103
- results = results . slice ( pageSize ) ;
104
- const lastElementLastModifiedDate = pageResults [ pageSize - 1 ] . LastModifiedDate ;
105
- const lastElementId = pageResults [ pageSize - 1 ] . Id ;
106
- if ( emitBehavior === 'fetchPage' ) {
107
- this . logger . debug ( 'Emit Behavior set as Fetch Page, going to emit one page...' ) ;
108
- await this . emit ( 'data' , messages . newMessageWithBody ( { results : pageResults } ) ) ;
109
- } else if ( emitBehavior === 'emitIndividually' ) {
110
- this . logger . debug ( 'Emit Behavior set as Emit Individually, going to emit records one by one' ) ;
111
- for ( const record of pageResults ) {
112
- await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ;
113
- }
114
- }
115
- this . logger . debug ( 'Page processing is finished.' ) ;
116
-
117
- if ( singlePagePerInterval ) {
118
- this . logger . debug ( 'Single Page Per Interval option is set. Going to emit snapshot %j' , { nextStartTime : lastElementLastModifiedDate , lastElementId } ) ;
119
- await this . emit ( 'snapshot' , { nextStartTime : lastElementLastModifiedDate , lastElementId } ) ;
120
- proceed = false ;
121
- }
83
+ emitted = true ;
84
+ nextStartTime = results [ results . length - 1 ] . LastModifiedDate ;
85
+ if ( results . length === pageSize ) {
86
+ this . logger . warn ( 'All entries that have the same LastModifiedDate as the last entry will be deleted from the resulting array to prevent emitting duplicates' ) ;
87
+ results = results . filter ( ( item ) => item . LastModifiedDate !== nextStartTime ) ;
88
+ this . logger . warn ( 'Entries that have the same LastModifiedDate as the last entry deleted. Current size of the resulting array is %s' , results . length ) ;
122
89
} else {
123
- this . logger . debug ( 'The size of the resulting array < pageSize. Going to process all found results' ) ;
124
- hasNextPage = false ;
125
- if ( emitBehavior === 'fetchPage' ) {
126
- this . logger . debug ( 'Emit Behavior set as Fetch Page, going to emit one page...' ) ;
127
- await this . emit ( 'data' , messages . newMessageWithBody ( { results } ) ) ;
128
- } else if ( emitBehavior === 'emitIndividually' ) {
129
- this . logger . debug ( 'Emit Behavior set as Emit Individually, going to emit records one by one' ) ;
130
- for ( const record of results ) {
131
- await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ;
132
- }
133
- }
134
- this . logger . debug ( 'All results processed. going to emit snapshot: %j' , { nextStartTime } ) ;
135
- await this . emit ( 'snapshot' , { nextStartTime } ) ;
90
+ nextStartTime = timeToString ( timestamp ( nextStartTime ) + 1000 ) ;
136
91
proceed = false ;
137
92
}
93
+ if ( emitBehavior === 'fetchPage' ) {
94
+ this . logger . debug ( 'Emit Behavior set as Fetch Page, going to emit one page...' ) ;
95
+ await this . emit ( 'data' , messages . newMessageWithBody ( { results } ) ) ;
96
+ } else if ( emitBehavior === 'emitIndividually' ) {
97
+ this . logger . debug ( 'Emit Behavior set as Emit Individually, going to emit records one by one' ) ;
98
+ for ( const record of results ) {
99
+ await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ;
100
+ }
101
+ }
102
+ if ( singlePagePerInterval ) proceed = false ;
103
+ from = nextStartTime ;
138
104
} else {
139
- this . logger . debug ( 'The size of the resulting array is 0, going to emit snapshot: %j' , { nextStartTime } ) ;
140
- await this . emit ( 'snapshot' , { nextStartTime } ) ;
141
- await this . emit ( 'end' ) ;
142
105
proceed = false ;
143
106
}
144
107
} while ( proceed ) ;
145
108
this . logger . info ( 'Processing Polling trigger finished successfully' ) ;
109
+ this . logger . debug ( 'Going to emit snapshot: %j' , { nextStartTime } ) ;
110
+ await this . emit ( 'snapshot' , { nextStartTime } ) ;
146
111
} catch ( e ) {
147
112
if ( e . statusCode ) {
148
113
throw new Error ( `Got error - ${ e . name } \n message: \n${ e . message } \n statusCode: \n${ e . statusCode } \n body: \n${ JSON . stringify ( e . body ) } ` ) ;
149
114
}
150
115
throw e ;
151
116
}
117
+
118
+ if ( isDebugFlow && ! emitted ) {
119
+ throw new Error ( `No object found. Execution stopped.
120
+ This error is only applicable to the Retrieve Sample.
121
+ In flow executions there will be no error, just an execution skip.` ) ;
122
+ }
152
123
} ;
153
124
154
125
module . exports . objectTypes = async function getObjectTypes ( configuration ) {
0 commit comments