Skip to content

Commit b95f0f8

Browse files
authored
The Replay ID validation failed (#85)
1 parent 2884b04 commit b95f0f8

File tree

5 files changed

+274
-1240
lines changed

5 files changed

+274
-1240
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 2.8.4 (July 11, 2024)
2+
* Attempt to fix error `The Replay ID validation failed` when `Subscribe to PubSub` trigger does't emit messages more than three days
3+
* Update Sailor version to 2.7.2
4+
* Update component-commons-library version to 3.2.0
5+
16
## 2.8.3 (February 27, 2024)
27
* The component interface has not changed. This is a technical enhancement! Introduced baseURL parameter in the `Raw Request` Action's configuration of the axios library. Refer to the [Readme](README.md#raw-request) for the details.
38
It will not affect any of the existing integration. Instead, it gives more flexibility allowing to call other REST endpoints than the standard `/services/data` [(#82)](https://github.com/elasticio/salesforce-component-v2/issues/82)

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
44
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
55
"url": "https://www.salesforce.com/",
6-
"version": "2.8.3",
6+
"version": "2.8.4",
77
"authClientTypes": [
88
"oauth2"
99
],

lib/triggers/streamPlatformEventsPubSub.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,13 @@ async function processTrigger(msg, cfg, snapshot) {
102102
const { credentials } = await getSecret(this, secretId, msg.id);
103103
const accessToken = credentials.access_token;
104104
const instanceUrl = credentials.undefined_params.instance_url;
105-
nextStartReplayId = nextStartReplayId || snapshot?.nextStartReplayId || initialReplayId;
105+
nextStartReplayId = nextStartReplayId || snapshot?.nextStartReplayId;
106+
107+
let replayIdFromConfig = false;
108+
if (!nextStartReplayId && initialReplayId) {
109+
replayIdFromConfig = true;
110+
nextStartReplayId = initialReplayId;
111+
}
106112

107113
let emittedOrAlive = false;
108114
const timeOut = async (t) => {
@@ -171,6 +177,12 @@ async function processTrigger(msg, cfg, snapshot) {
171177
context.logger.warn(`Failed to get keepalive message to finish: ${err.message}, lets call processTrigger one more time (retry ${keepaliveFailureRetry} of ${MAX_CONNECT_RETRIES})`);
172178
keepaliveFailureRetry++;
173179
await reconnectHandler(Math.round(2 ** (keepaliveFailureRetry + 5)) * 1000, cfg);
180+
} else if (err.message.includes('INVALID_ARGUMENT') && !replayIdFromConfig) {
181+
context.logger.error(`Failed to get keepalive message to finish, replayId in seems to be invalid, it will be ignored: ${err.message}, lets call processTrigger one more time (retry ${keepaliveFailureRetry} of ${MAX_CONNECT_RETRIES})`);
182+
keepaliveFailureRetry++;
183+
await context.emit('snapshot', { });
184+
nextStartReplayId = null;
185+
await reconnectHandler(Math.round(2 ** (keepaliveFailureRetry + 5)) * 1000, cfg);
174186
} else {
175187
context.logger.error(`Failed to get keepalive message to finish: ${err.message}`);
176188
await context.emit('error', err);

0 commit comments

Comments
 (0)