1
+ /* eslint-disable no-empty */
1
2
const jsforce = require ( 'jsforce' ) ;
2
3
const { messages } = require ( 'elasticio-node' ) ;
3
4
const { callJSForceMethod } = require ( '../helpers/wrapper' ) ;
@@ -6,7 +7,8 @@ const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts;
6
7
7
8
let fayeClient ;
8
9
let context ;
9
- let status ;
10
+ let refreshTokenNeeded = false ;
11
+ let creationInProgress = false ;
10
12
/**
11
13
* This method will be called from elastic.io platform providing following data
12
14
*
@@ -15,17 +17,28 @@ let status;
15
17
*/
16
18
async function processTrigger ( msg , configuration ) {
17
19
context = this ;
18
- if ( status === 'online' ) {
20
+ if ( fayeClient ) {
19
21
this . logger . info ( 'Subscription is still running, waiting for new messages' ) ;
20
22
return ;
21
23
}
24
+ if ( creationInProgress ) {
25
+ this . logger . info ( 'Subscription recreate in progress' ) ;
26
+ return ;
27
+ }
28
+ if ( fayeClient ) fayeClient = undefined ;
29
+ creationInProgress = true ;
22
30
context . logger . info ( 'Starting Subscribe to platform events Trigger' ) ;
23
31
24
32
const { secretId } = configuration ;
25
33
if ( ! secretId ) {
26
34
context . logger . error ( 'secretId is missing in configuration, credentials cannot be fetched' ) ;
35
+ creationInProgress = false ;
27
36
throw new Error ( 'secretId is missing in configuration, credentials cannot be fetched' ) ;
28
37
}
38
+ if ( refreshTokenNeeded ) {
39
+ await refreshToken ( context , secretId , msg . id ) ;
40
+ refreshTokenNeeded = false ;
41
+ }
29
42
context . logger . debug ( 'Fetching credentials by secretId' ) ;
30
43
const { credentials } = await getSecret ( this , secretId , msg . id ) ;
31
44
const accessToken = credentials . access_token ;
@@ -39,53 +52,42 @@ async function processTrigger(msg, configuration) {
39
52
const topic = `/event/${ configuration . object } ` ;
40
53
const replayId = - 1 ;
41
54
context . logger . debug ( 'Creating streaming client' ) ;
42
- if ( ! fayeClient || status === 'down' ) {
43
- fayeClient = connection . streaming . createClient ( [
44
- new jsforce . StreamingExtension . Replay ( topic , replayId ) ,
45
- new jsforce . StreamingExtension . AuthFailure ( async ( err ) => {
46
- context . logger . trace ( 'AuthFailure error occurred' ) ;
47
- if ( err . ext && err . ext . sfdc && err . ext . sfdc . failureReason && ( err . ext . sfdc . failureReason === '401::Authentication invalid' ) ) {
48
- try {
49
- context . logger . debug ( 'Session is expired, trying to refresh token' ) ;
50
- await refreshToken ( context , secretId , msg . id ) ;
51
- context . logger . debug ( 'Token is successfully refreshed' ) ;
52
- } catch ( error ) {
53
- context . logger . error ( 'Failed to fetch refresh token' ) ;
54
- throw new Error ( 'Failed to fetch refresh token' ) ;
55
- }
56
- fayeClient = undefined ;
57
- context . logger . info ( 'Lets call processTrigger one more time' ) ;
58
- await processTrigger . call ( context , msg , configuration ) ;
59
- } else {
60
- context . logger . error ( 'AuthFailure extension error occurred' ) ;
61
- throw err ;
62
- }
63
- } ) ,
64
- ] ) ;
65
55
66
- fayeClient . on ( 'transport:down' , ( ) => {
67
- context . logger . error ( 'Client is offline' ) ;
68
- context . emit ( 'error' , 'Client is offline' ) ;
69
- status = 'down' ;
56
+ fayeClient = connection . streaming . createClient ( [
57
+ new jsforce . StreamingExtension . Replay ( topic , replayId ) ,
58
+ new jsforce . StreamingExtension . AuthFailure ( ( err ) => {
59
+ let errMsg = '' ;
60
+ try {
61
+ errMsg = JSON . stringify ( err ) ;
62
+ } catch ( e ) {
63
+ errMsg = err ;
64
+ }
65
+ context . logger . warn ( `AuthFailure error occurred ${ errMsg } ` ) ;
66
+ refreshTokenNeeded = true ;
67
+ fayeClient = undefined ;
68
+ creationInProgress = false ;
70
69
context . logger . info ( 'Lets call processTrigger one more time' ) ;
71
70
processTrigger . call ( context , msg , configuration ) ;
72
- } ) ;
71
+ } ) ,
72
+ ] ) ;
73
73
74
- fayeClient . on ( 'transport:up ' , ( ) => {
75
- context . logger . info ( 'Client is online ' ) ;
76
- } ) ;
74
+ fayeClient . on ( 'transport:down ' , ( ) => {
75
+ context . logger . error ( 'Client is offline ' ) ;
76
+ } ) ;
77
77
78
- await fayeClient . subscribe ( topic , async ( message ) => {
79
- context . logger . info ( 'Incoming message found, going to emit...' ) ;
80
- await context . emit ( 'data' , messages . newMessageWithBody ( message ) ) ;
81
- } ) ;
82
- status = 'online' ;
78
+ fayeClient . on ( 'transport:up' , ( ) => {
79
+ context . logger . info ( 'Client is online' ) ;
80
+ } ) ;
83
81
84
- context . logger . info ( 'Subscribed to PushTopic successfully' ) ;
85
- context . logger . trace ( `Subscribed to PushTopic: ${ topic } ` ) ;
82
+ await fayeClient . subscribe ( topic , async ( message ) => {
83
+ context . logger . info ( 'Incoming message found, going to emit...' ) ;
84
+ await context . emit ( 'data' , messages . newMessageWithBody ( message ) ) ;
85
+ } ) ;
86
+ creationInProgress = false ;
87
+ context . logger . info ( 'Subscribed to PushTopic successfully' ) ;
88
+ context . logger . trace ( `Subscribed to PushTopic: ${ topic } ` ) ;
86
89
87
- context . logger . info ( 'Streaming client created and ready' ) ;
88
- }
90
+ context . logger . info ( 'Streaming client created and ready' ) ;
89
91
}
90
92
91
93
/**
0 commit comments