Skip to content

Commit bfab084

Browse files
authored
Dublicate retry (#78)
1 parent 507e7b8 commit bfab084

File tree

5 files changed

+56
-41
lines changed

5 files changed

+56
-41
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 2.8.1 (December 28, 2023)
2+
* Fixed duplicate retries and added exponential backoff in `Subscribe to PubSub` trigger
3+
14
## 2.8.0 (December 21, 2023)
25
* Added new `Subscribe to PubSub` trigger
36

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ None.
142142
* **payload** - (object, required): Dynamically generated content of the event
143143

144144
#### Limitations:
145+
* The component starts tracking changes after the first execution (it means you have to "run-now" flow with this trigger or wait for the first execution by the scheduler to establish a connection and only after this new event will be listened)
145146
* If you use **"Ordinary"** flow:
146147
* Make sure that you execute it at least once per 3 days - according to the [documentation](https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html#replaying-an-event-stream) Salesforce stores events for up to 3 days.
147-
* The component starts tracking changes after the first execution
148148
* To `Retrieve new sample from Salesforce v2` you need to trigger an event on Salesforce side or provide a sample manually
149149

150150
### Deprecated triggers

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
44
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
55
"url": "http://www.salesforce.com/",
6-
"version": "2.8.0",
6+
"version": "2.8.1",
77
"authClientTypes": [
88
"oauth2"
99
],

lib/PubSubClient.js

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,35 +140,47 @@ class PubSubClient {
140140
this.subscription.write(writeOptions);
141141

142142
this.subscription.on('data', (data) => {
143-
const latestReplayId = Number(data.latestReplayId.readBigUInt64BE());
144-
if (data.events) {
145-
for (const event of data.events) {
146-
const replayId = Number(event.replayId.readBigUInt64BE());
147-
const payload = this.schema.type.fromBuffer(event.event.payload);
148-
this.flattenSinglePropertyObjects(payload);
149-
eventEmitter.emit('data', { event: { replayId }, payload });
150-
}
151-
if (!data.pendingNumRequested) {
152-
this.logger.info(`requesting ${numRequested} more records`);
153-
this.subscription.write({ topicName: this.topicName, numRequested });
143+
try {
144+
const latestReplayId = Number(data.latestReplayId.readBigUInt64BE());
145+
if (data.events) {
146+
for (const event of data.events) {
147+
const replayId = Number(event.replayId.readBigUInt64BE());
148+
const payload = this.schema.type.fromBuffer(event.event.payload);
149+
this.flattenSinglePropertyObjects(payload);
150+
eventEmitter.emit('data', { event: { replayId }, payload });
151+
}
152+
if (!data.pendingNumRequested) {
153+
this.logger.info(`requesting ${numRequested} more records`);
154+
this.subscription.write({ topicName: this.topicName, numRequested });
155+
}
156+
} else {
157+
this.logger.debug(`Received keepalive message. Latest replay ID: ${latestReplayId}`);
158+
data.latestReplayId = latestReplayId;
159+
this.lastKeepAlive = new Date().getTime();
160+
eventEmitter.emit('keepalive', data);
154161
}
155-
} else {
156-
this.logger.debug(`Received keepalive message. Latest replay ID: ${latestReplayId}`);
157-
data.latestReplayId = latestReplayId;
158-
this.lastKeepAlive = new Date().getTime();
159-
eventEmitter.emit('keepalive', data);
162+
} catch (err) {
163+
this.logger.warn('Failed to decode incoming data, skipping');
160164
}
161165
});
162166
this.subscription.on('end', () => {
163167
this.logger.warn('gRPC stream ended');
164168
eventEmitter.emit('end');
165169
});
170+
const tryStringify = (obj) => {
171+
try {
172+
return JSON.stringify(obj);
173+
} catch (err) {
174+
if (typeof obj === 'string') return obj;
175+
return ('Can\'t parse message');
176+
}
177+
};
166178
this.subscription.on('error', (error) => {
167-
this.logger.error(`gRPC stream error: ${JSON.stringify(error)}`);
179+
this.logger.error(`gRPC stream error: ${tryStringify(error)}`);
168180
eventEmitter.emit('error', error);
169181
});
170182
this.subscription.on('status', (status) => {
171-
this.logger.warn(`gRPC stream status: ${JSON.stringify(status)}`);
183+
this.logger.warn(`gRPC stream status: ${tryStringify(status)}`);
172184
eventEmitter.emit('status', status);
173185
});
174186
this.eventEmitter = eventEmitter;
@@ -188,8 +200,8 @@ class PubSubClient {
188200
this.subscription = null;
189201
this.client.close();
190202
this.client = null;
191-
// eslint-disable-next-line no-empty
192-
} catch (_e) {}
203+
// eslint-disable-next-line no-empty
204+
} catch (_e) { }
193205
}
194206
}
195207

lib/triggers/streamPlatformEventsPubSub.js

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* eslint-disable no-use-before-define, no-unused-vars */
1+
/* eslint-disable no-use-before-define, no-unused-vars, no-empty */
22
const { messages } = require('elasticio-node');
33
const commons = require('@elastic.io/component-commons-library');
44
const { callJSForceMethod } = require('../helpers/wrapper');
@@ -10,14 +10,16 @@ let context;
1010
let refreshTokenNeeded = false;
1111
let authFailureRetry = 1;
1212
let connectFailureRetry = 1;
13+
let keepaliveFailureRetry = 1;
1314
let nextStartReplayId;
1415
const MAX_AUTH_RETRIES = 2;
1516
const MAX_CONNECT_RETRIES = 5;
1617
let eventEmitter;
1718
let creationInProgress = false;
1819

1920
const reconnectHandler = async (reconnectSleepTime, cfg) => {
20-
if (client) client.close();
21+
context.logger.warn(`Restarting in ${Math.round(reconnectSleepTime / 1000)} seconds`);
22+
if (client) try { client.close(); } catch (_e) { }
2123
creationInProgress = false;
2224
client = null;
2325
await commons.sleep(reconnectSleepTime);
@@ -49,7 +51,7 @@ const isNumberNaN = (num) => Number(num).toString() === 'NaN';
4951

5052
async function processTrigger(msg, cfg, snapshot) {
5153
context = this;
52-
if (client) {
54+
if (client && client.getLastKeepAlive()) {
5355
this.logger.info(`Subscription is still running, last keepalive from Salesforce ${new Date(client.getLastKeepAlive()).toISOString()}, waiting for new messages`);
5456
client.setLogger(context.logger);
5557
return;
@@ -92,20 +94,16 @@ async function processTrigger(msg, cfg, snapshot) {
9294
const instanceUrl = credentials.undefined_params.instance_url;
9395
nextStartReplayId = snapshot?.nextStartReplayId || initialReplayId;
9496

95-
let emitted = false;
97+
let emittedOrAlive = false;
9698
const timeOut = async (t) => {
9799
await commons.sleep(t);
98-
if (emitted) {
99-
await commons.sleep(3000000);
100-
} else {
101-
throw new Error(`TimeOut ${t}ms limit reached`);
102-
}
100+
if (!emittedOrAlive) throw new Error(`TimeOut ${t}ms limit reached`);
103101
};
104102

105103
context.logger.info('Preparing SalesForce connection...');
106104
try {
107105
await Promise.race([
108-
await client.connect(accessToken, instanceUrl),
106+
client.connect(accessToken, instanceUrl),
109107
timeOut(30000),
110108
]);
111109
authFailureRetry = 1;
@@ -119,7 +117,7 @@ async function processTrigger(msg, cfg, snapshot) {
119117
} else if (connectFailureRetry <= MAX_CONNECT_RETRIES) {
120118
context.logger.warn(`Got error during connection to Salesforce: ${err.message}, lets call processTrigger one more (retry ${connectFailureRetry} of ${MAX_CONNECT_RETRIES})`);
121119
connectFailureRetry++;
122-
await reconnectHandler(60000, cfg);
120+
await reconnectHandler(Math.round(2 ** (connectFailureRetry + 5)) * 1000, cfg);
123121
} else {
124122
context.logger.error(`Failed to connect to Salesforce: ${err.message}`);
125123
await context.emit('error', err);
@@ -135,16 +133,19 @@ async function processTrigger(msg, cfg, snapshot) {
135133
context.logger.info('Got new message, emitting');
136134
await context.emit('data', messages.newMessageWithBody(event));
137135
nextStartReplayId = event.replayId;
138-
emitted = true;
136+
emittedOrAlive = true;
139137
await this.emit('snapshot', { nextStartReplayId });
140138
});
141139

140+
eventEmitter.once('keepalive', (data) => { context.logger.info('Got keepalive message'); });
141+
142142
try {
143143
await Promise.race([
144144
new Promise((resolve, reject) => {
145145
context.logger.info('Processing existing events if present and waiting for keepalive message to finish');
146146
eventEmitter.on('keepalive', async (data) => {
147-
connectFailureRetry = 1;
147+
keepaliveFailureRetry = 1;
148+
emittedOrAlive = true;
148149
nextStartReplayId = data.latestReplayId;
149150
await this.emit('snapshot', { nextStartReplayId });
150151
resolve();
@@ -156,22 +157,21 @@ async function processTrigger(msg, cfg, snapshot) {
156157
timeOut(300000),
157158
]);
158159
} catch (err) {
159-
if (!err.message.includes('INVALID_ARGUMENT') && connectFailureRetry <= MAX_CONNECT_RETRIES) {
160-
context.logger.warn(`Failed to get keepalive message to finish: ${err.message}, lets call processTrigger one more time (retry ${connectFailureRetry} of ${MAX_CONNECT_RETRIES})`);
161-
connectFailureRetry++;
162-
await reconnectHandler(2000, cfg);
160+
if (!err.message?.includes('INVALID_ARGUMENT') && keepaliveFailureRetry <= MAX_CONNECT_RETRIES) {
161+
context.logger.warn(`Failed to get keepalive message to finish: ${err.message}, lets call processTrigger one more time (retry ${keepaliveFailureRetry} of ${MAX_CONNECT_RETRIES})`);
162+
keepaliveFailureRetry++;
163+
await reconnectHandler(Math.round(2 ** (keepaliveFailureRetry + 5)) * 1000, cfg);
163164
} else {
164165
context.logger.error(`Failed to get keepalive message to finish: ${err.message}`);
165166
await context.emit('error', err);
166167
throw err;
167168
}
168-
await reconnectHandler(2000, cfg);
169169
}
170170

171171
creationInProgress = false;
172172

173173
if (process.env.ELASTICIO_FLOW_TYPE === 'ordinary') {
174-
client.close();
174+
try { client.close(); } catch (_e) { }
175175
client = null;
176176
context.logger.info('"Subscribe to PubSub" trigger finished successfully');
177177
return;

0 commit comments

Comments
 (0)