Skip to content

Commit 33b5b96

Browse files
authored
80 use replay (#81)
1 parent bfab084 commit 33b5b96

File tree

4 files changed

+22
-8
lines changed

4 files changed

+22
-8
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 2.8.2 (February 02, 2023)
2+
* Fixed bug when component didn't use `replayId` after error in `Subscribe to PubSub` trigger
3+
14
## 2.8.1 (December 28, 2023)
25
* Fixed duplicate retries and added exponential backoff in `Subscribe to PubSub` trigger
36

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
44
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
55
"url": "http://www.salesforce.com/",
6-
"version": "2.8.1",
6+
"version": "2.8.2",
77
"authClientTypes": [
88
"oauth2"
99
],

lib/PubSubClient.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class PubSubClient {
131131
const writeOptions = { topicName: this.topicName, numRequested };
132132

133133
if (fromReplayId) {
134+
this.logger.info(`Subscription will start from replayId: ${fromReplayId}`);
134135
const buf = Buffer.allocUnsafe(8);
135136
buf.writeBigUInt64BE(BigInt(fromReplayId), 0);
136137
writeOptions.replayPreset = 2;

lib/triggers/streamPlatformEventsPubSub.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ const MAX_AUTH_RETRIES = 2;
1616
const MAX_CONNECT_RETRIES = 5;
1717
let eventEmitter;
1818
let creationInProgress = false;
19+
let timeoutRef;
20+
21+
const saveSnapshot = () => {
22+
if (timeoutRef) clearTimeout(timeoutRef);
23+
timeoutRef = setTimeout(async () => {
24+
context.logger.debug(`Saving snapshot with nextStartReplayId: ${nextStartReplayId}`);
25+
await context.emit('snapshot', { nextStartReplayId });
26+
context.logger.debug('Snapshot saved');
27+
}, 2000);
28+
};
1929

2030
const reconnectHandler = async (reconnectSleepTime, cfg) => {
2131
context.logger.warn(`Restarting in ${Math.round(reconnectSleepTime / 1000)} seconds`);
@@ -92,7 +102,7 @@ async function processTrigger(msg, cfg, snapshot) {
92102
const { credentials } = await getSecret(this, secretId, msg.id);
93103
const accessToken = credentials.access_token;
94104
const instanceUrl = credentials.undefined_params.instance_url;
95-
nextStartReplayId = snapshot?.nextStartReplayId || initialReplayId;
105+
nextStartReplayId = nextStartReplayId || snapshot?.nextStartReplayId || initialReplayId;
96106

97107
let emittedOrAlive = false;
98108
const timeOut = async (t) => {
@@ -126,18 +136,18 @@ async function processTrigger(msg, cfg, snapshot) {
126136
}
127137
context.logger.info('SalesForce connected');
128138

129-
eventEmitter = await client.subscribe(eventCountPerRequest, snapshot?.nextStartReplayId || initialReplayId);
139+
eventEmitter = await client.subscribe(eventCountPerRequest, nextStartReplayId);
130140
context.logger.info('Streaming client created');
131141

132142
eventEmitter.on('data', async (event) => {
133-
context.logger.info('Got new message, emitting');
143+
context.logger.info(`Got new message, replayId: ${event.event.replayId}, emitting`);
134144
await context.emit('data', messages.newMessageWithBody(event));
135-
nextStartReplayId = event.replayId;
145+
nextStartReplayId = event.event.replayId;
136146
emittedOrAlive = true;
137-
await this.emit('snapshot', { nextStartReplayId });
147+
saveSnapshot();
138148
});
139149

140-
eventEmitter.once('keepalive', (data) => { context.logger.info('Got keepalive message'); });
150+
eventEmitter.once('keepalive', (data) => { context.logger.info(`Got keepalive message, replayId: ${data.latestReplayId}`); });
141151

142152
try {
143153
await Promise.race([
@@ -147,7 +157,7 @@ async function processTrigger(msg, cfg, snapshot) {
147157
keepaliveFailureRetry = 1;
148158
emittedOrAlive = true;
149159
nextStartReplayId = data.latestReplayId;
150-
await this.emit('snapshot', { nextStartReplayId });
160+
saveSnapshot();
151161
resolve();
152162
});
153163
eventEmitter.on('error', (error) => {

0 commit comments

Comments
 (0)