@@ -7,14 +7,15 @@ const timestamp = (date) => new Date(date).getTime();
7
7
const timeToString = ( date ) => JSON . stringify ( new Date ( date ) ) . replace ( / " / g, '' ) ;
8
8
const isDateValid = ( date ) => new Date ( date ) . toString ( ) !== 'Invalid Date' ;
9
9
const isNumberNaN = ( num ) => Number ( num ) . toString ( ) === 'NaN' ;
10
+ const MAX_FETCH = 10000 ;
10
11
11
12
exports . process = async function processTrigger ( _msg , cfg , snapshot ) {
12
13
this . logger . info ( 'Start processing "Get Updated Objects Polling" trigger' ) ;
13
14
const currentTime = new Date ( ) ;
14
15
15
16
const {
16
17
sobject,
17
- pageSize = 10000 ,
18
+ pageSize = MAX_FETCH ,
18
19
linkedObjects = [ ] ,
19
20
emitBehavior = 'emitIndividually' ,
20
21
singlePagePerInterval,
@@ -25,8 +26,8 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
25
26
if ( ! endTime ) endTime = currentTime ;
26
27
if ( ! isDateValid ( startTime ) ) throw new Error ( 'invalid "Start Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ' ) ;
27
28
if ( ! isDateValid ( endTime ) ) throw new Error ( 'invalid "End Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ' ) ;
28
- if ( pageSize > 10000 || isNumberNaN ( pageSize ) || Number ( pageSize ) < 0 ) throw new Error ( ' "Size of Polling Page" must be valid number between 0 and 10000' ) ;
29
- let from = snapshot ?. nextStartTime || startTime ;
29
+ if ( pageSize > MAX_FETCH || isNumberNaN ( pageSize ) || Number ( pageSize ) < 0 ) throw new Error ( ` "Size of Polling Page" must be valid number between 0 and ${ MAX_FETCH } ` ) ;
30
+ const from = snapshot ?. nextStartTime || startTime ;
30
31
const to = endTime || currentTime ;
31
32
let nextStartTime = currentTime ;
32
33
@@ -42,45 +43,59 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
42
43
sobject,
43
44
selectedObjects : linkedObjects . reduce ( ( query , obj ) => ( obj . startsWith ( '!' ) ? query : `${ query } , ${ obj } .*` ) , '*' ) ,
44
45
linkedObjects,
45
- maxFetch : pageSize ,
46
46
} ;
47
47
48
48
let proceed = true ;
49
+ let hasNextPage = false ;
49
50
let iteration = 1 ;
51
+ let results = [ ] ;
50
52
try {
51
53
do {
52
- options . whereCondition = `LastModifiedDate >= ${ timeToString ( from ) } AND LastModifiedDate < ${ timeToString ( to ) } ` ;
53
-
54
- let results = await callJSForceMethod . call ( this , cfg , 'pollingSelectQuery' , options ) ;
55
- this . logger . info ( `Polling iteration ${ iteration } - ${ results . length } results found` ) ;
56
- iteration ++ ;
57
-
54
+ if ( ! hasNextPage ) {
55
+ options . whereCondition = `LastModifiedDate >= ${ timeToString ( from ) } AND LastModifiedDate < ${ timeToString ( to ) } ` ;
56
+ results = await callJSForceMethod . call ( this , cfg , 'pollingSelectQuery' , options ) ;
57
+ this . logger . info ( `Polling iteration ${ iteration } - ${ results . length } results found` ) ;
58
+ iteration ++ ;
59
+ }
58
60
if ( results . length !== 0 ) {
59
- if ( results . length === Number ( pageSize ) ) {
60
- const currentResultsLength = results . length ;
61
- nextStartTime = results [ results . length - 1 ] . LastModifiedDate ;
62
- const filteredResults = results . filter ( ( item ) => item . LastModifiedDate !== nextStartTime ) ;
63
- if ( currentResultsLength - filteredResults . length === 0 ) {
64
- this . logger . warn ( `All results from this iteration have same "LastModifiedDate" - ${ timeToString ( nextStartTime ) } , they will be proceed, but all objects in the next iteration will start from date strictly greater than this` ) ;
65
- nextStartTime = new Date ( new Date ( nextStartTime ) . getTime ( ) + 1 ) ;
66
- } else {
67
- results = filteredResults ;
68
- this . logger . warn ( `Founded ${ currentResultsLength } results, which is equal to page limit.. New start time - ${ timeToString ( nextStartTime ) } , ${ currentResultsLength - results . length } result(s) excluded from this iteration` ) ;
69
- }
70
- from = nextStartTime ;
71
- } else {
72
- nextStartTime = currentTime ;
73
- await this . emit ( 'snapshot' , { nextStartTime } ) ;
74
- proceed = false ;
61
+ nextStartTime = currentTime ;
62
+ if ( singlePagePerInterval && snapshot . lastElementId ) {
63
+ const lastElement = results . filter ( ( item ) => item . Id === snapshot . lastElementId ) [ 0 ] ;
64
+ const lastElementIndex = results . indexOf ( lastElement ) ;
65
+ results = results . slice ( lastElementIndex + 1 ) ;
75
66
}
76
-
77
- if ( emitBehavior === 'fetchPage' ) {
78
- await this . emit ( 'data' , messages . newMessageWithBody ( { results } ) ) ;
79
- } else if ( emitBehavior === 'emitIndividually' ) {
80
- for ( const record of results ) { await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ; }
67
+ if ( results . length === MAX_FETCH ) {
68
+ nextStartTime = results [ results . length - 1 ] . LastModifiedDate ;
69
+ const filteredResults = results . filter ( ( item ) => item . LastModifiedDate === nextStartTime ) ;
70
+ results = results . slice ( 0 , MAX_FETCH - filteredResults . length ) ;
81
71
}
72
+ if ( results . length >= Number ( pageSize ) ) {
73
+ hasNextPage = true ;
74
+ const pageResults = results . slice ( 0 , pageSize ) ;
75
+ results = results . slice ( pageSize ) ;
76
+ const lastElementLastModifiedDate = pageResults [ pageSize - 1 ] . LastModifiedDate ;
77
+ const lastElementId = pageResults [ pageSize - 1 ] . Id ;
78
+ if ( emitBehavior === 'fetchPage' ) {
79
+ await this . emit ( 'data' , messages . newMessageWithBody ( { results : pageResults } ) ) ;
80
+ } else if ( emitBehavior === 'emitIndividually' ) {
81
+ for ( const record of pageResults ) {
82
+ await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ;
83
+ }
84
+ }
82
85
83
- if ( singlePagePerInterval ) {
86
+ if ( singlePagePerInterval ) {
87
+ await this . emit ( 'snapshot' , { nextStartTime : lastElementLastModifiedDate , lastElementId } ) ;
88
+ proceed = false ;
89
+ }
90
+ } else {
91
+ hasNextPage = false ;
92
+ if ( emitBehavior === 'fetchPage' ) {
93
+ await this . emit ( 'data' , messages . newMessageWithBody ( { results } ) ) ;
94
+ } else if ( emitBehavior === 'emitIndividually' ) {
95
+ for ( const record of results ) {
96
+ await this . emit ( 'data' , messages . newMessageWithBody ( record ) ) ;
97
+ }
98
+ }
84
99
await this . emit ( 'snapshot' , { nextStartTime } ) ;
85
100
proceed = false ;
86
101
}
0 commit comments