Skip to content

Commit 176defa

Browse files
authored
68 subscribe to platform events fix (#69)
1 parent 37794b9 commit 176defa

File tree

6 files changed

+58
-35
lines changed

6 files changed

+58
-35
lines changed

.grype-ignore.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
ignore:
2-
- vulnerability: CVE-2023-2650
2+
- vulnerability: CVE-2023-4807
33
package:
44
name: libssl3
5-
version: 3.1.0-r4
5+
version: 3.1.2-r0
66

7-
- vulnerability: CVE-2023-2650
7+
- vulnerability: CVE-2023-4807
88
package:
99
name: libcrypto3
10-
version: 3.1.0-r4
10+
version: 3.1.2-r0

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 2.7.1 (September 21, 2023)
2+
* Improvements in `Subscribe to platform events` trigger:
3+
* Added retry on connections lost
4+
* Changed the behavior where new logs would appear in the first execution regardless of which message they belonged to. Now, all messages will be displayed in their appropriate execution
5+
* Logs with `Going to fetch secret` set to debug level
6+
17
## 2.7.0 (June 29, 2023)
28
Added support for files attachment by providing a URL in the body for all actions where it is used
39

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
44
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
55
"url": "http://www.salesforce.com/",
6-
"version": "2.7.0",
6+
"version": "2.7.1",
77
"authClientTypes": [
88
"oauth2"
99
],

lib/triggers/streamPlatformEvents.js

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,69 +5,86 @@ const { getSecret, refreshToken } = require('../util');
55
const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts;
66

77
let fayeClient;
8+
let context;
9+
let status;
810
/**
911
* This method will be called from elastic.io platform providing following data
1012
*
1113
* @param msg incoming message object that contains ``body`` with payload
1214
* @param configuration configuration that is account information and configuration field values
1315
*/
1416
async function processTrigger(msg, configuration) {
15-
this.logger.info('Starting Subscribe to platform events Trigger');
17+
context = this;
18+
if (status === 'online') {
19+
this.logger.info('Subscription is still running, waiting for new messages');
20+
return;
21+
}
22+
context.logger.info('Starting Subscribe to platform events Trigger');
23+
1624
const { secretId } = configuration;
1725
if (!secretId) {
18-
this.logger.error('secretId is missing in configuration, credentials cannot be fetched');
26+
context.logger.error('secretId is missing in configuration, credentials cannot be fetched');
1927
throw new Error('secretId is missing in configuration, credentials cannot be fetched');
2028
}
21-
this.logger.debug('Fetching credentials by secretId');
29+
context.logger.debug('Fetching credentials by secretId');
2230
const { credentials } = await getSecret(this, secretId, msg.id);
2331
const accessToken = credentials.access_token;
2432
const instanceUrl = credentials.undefined_params.instance_url;
25-
this.logger.debug('Preparing SalesForce connection...');
33+
context.logger.debug('Preparing SalesForce connection...');
2634
const connection = new jsforce.Connection({
2735
instanceUrl,
2836
accessToken,
2937
version: SALESFORCE_API_VERSION,
3038
});
3139
const topic = `/event/${configuration.object}`;
3240
const replayId = -1;
33-
this.logger.debug('Creating streaming client');
34-
if (!fayeClient) {
41+
context.logger.debug('Creating streaming client');
42+
if (!fayeClient || status === 'down') {
3543
fayeClient = connection.streaming.createClient([
3644
new jsforce.StreamingExtension.Replay(topic, replayId),
3745
new jsforce.StreamingExtension.AuthFailure(async (err) => {
38-
this.logger.trace('AuthFailure error occurred');
46+
context.logger.trace('AuthFailure error occurred');
3947
if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid')) {
4048
try {
41-
this.logger.debug('Session is expired, trying to refresh token');
42-
await refreshToken(this, secretId, msg.id);
43-
this.logger.debug('Token is successfully refreshed');
49+
context.logger.debug('Session is expired, trying to refresh token');
50+
await refreshToken(context, secretId, msg.id);
51+
context.logger.debug('Token is successfully refreshed');
4452
} catch (error) {
45-
this.logger.error('Failed to fetch refresh token');
53+
context.logger.error('Failed to fetch refresh token');
4654
throw new Error('Failed to fetch refresh token');
4755
}
4856
fayeClient = undefined;
49-
this.logger.info('Lets call processTrigger one more time');
50-
await processTrigger.call(this, msg, configuration);
57+
context.logger.info('Lets call processTrigger one more time');
58+
await processTrigger.call(context, msg, configuration);
5159
} else {
52-
this.logger.error('AuthFailure extension error occurred');
60+
context.logger.error('AuthFailure extension error occurred');
5361
throw err;
5462
}
5563
}),
5664
]);
5765

58-
fayeClient.subscribe(topic, async (message) => {
59-
this.logger.info('Incoming message found, going to emit...');
60-
await this.emit('data', messages.newMessageWithBody(message));
61-
})
62-
.then(() => {
63-
this.logger.info('Subscribed to PushTopic successfully');
64-
this.logger.trace(`Subscribed to PushTopic: ${topic}`);
65-
},
66-
(err) => {
67-
this.logger.error('Subscriber error occurred');
68-
throw err;
69-
});
70-
this.logger.info('Streaming client created and ready');
66+
fayeClient.on('transport:down', () => {
67+
context.logger.error('Client is offline');
68+
context.emit('error', 'Client is offline');
69+
status = 'down';
70+
context.logger.info('Lets call processTrigger one more time');
71+
processTrigger.call(context, msg, configuration);
72+
});
73+
74+
fayeClient.on('transport:up', () => {
75+
context.logger.info('Client is online');
76+
});
77+
78+
await fayeClient.subscribe(topic, async (message) => {
79+
context.logger.info('Incoming message found, going to emit...');
80+
await context.emit('data', messages.newMessageWithBody(message));
81+
});
82+
status = 'online';
83+
84+
context.logger.info('Subscribed to PushTopic successfully');
85+
context.logger.trace(`Subscribed to PushTopic: ${topic}`);
86+
87+
context.logger.info('Streaming client created and ready');
7188
}
7289
}
7390

lib/util.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ function getSecretUri(secretId, isRefresh) {
4949

5050
module.exports.getSecret = async (emitter, secretId, msgId) => {
5151
const secretUri = getSecretUri(secretId);
52-
emitter.logger.info('Going to fetch secret');
52+
emitter.logger.debug('Going to fetch secret');
5353
const ax = axios.create();
5454
addRetryCountInterceptorToAxios(ax);
5555
const secret = await ax.get(secretUri, {
@@ -62,7 +62,7 @@ module.exports.getSecret = async (emitter, secretId, msgId) => {
6262
},
6363
});
6464
const parsedSecret = secret.data.data.attributes;
65-
emitter.logger.info('Got secret');
65+
emitter.logger.debug('Got secret');
6666
return parsedSecret;
6767
};
6868

spec/triggers/streamPlatformEvents.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe('streamPlatformEvents trigger', () => {
1212
describe('should emit message', async () => {
1313
before(() => {
1414
sinon.stub(jsforce, 'Connection').callsFake(() => ({
15-
streaming: { createClient: () => ({ subscribe: async (_topic, emit) => { emit('some message'); } }) },
15+
streaming: { createClient: () => ({ subscribe: async (_topic, emit) => { emit('some message'); }, on: () => {} }) },
1616
StreamingExtension: { Replay: () => {}, AuthFailure: () => {} },
1717
}));
1818
});

0 commit comments

Comments
 (0)