Skip to content

Commit e77063b

Browse files
feat: resume the change stream upon reconnection
Upon reconnection to the MongoDB server, the client will now try to resume the stream at the last offset it has processed. If the MongoDB client is disconnected for too long and its token is no longer valid, then the Socket.IO clients connected to this server may miss some packets (which was the previous behavior).
1 parent 761e902 commit e77063b

File tree

2 files changed

+52
-19
lines changed

2 files changed

+52
-19
lines changed

lib/index.ts

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import {
66
Session,
77
} from "socket.io-adapter";
88
import { randomBytes } from "crypto";
9-
import { ObjectId } from "mongodb";
10-
import type { Collection } from "mongodb";
9+
import { ObjectId, MongoServerError } from "mongodb";
10+
import type { Collection, ChangeStream, ResumeToken } from "mongodb";
1111

1212
const randomId = () => randomBytes(8).toString("hex");
1313
const debug = require("debug")("socket.io-mongo-adapter");
@@ -149,39 +149,52 @@ const replaceBinaryObjectsByBuffers = (obj: any) => {
149149
* @public
150150
*/
151151
export function createAdapter(
152-
mongoCollection: any,
152+
mongoCollection: Collection,
153153
opts: Partial<MongoAdapterOptions> = {}
154154
) {
155155
opts.uid = opts.uid || randomId();
156156

157157
let isClosed = false;
158158
let adapters = new Map<string, MongoAdapter>();
159-
let changeStream: any;
159+
let changeStream: ChangeStream;
160+
let resumeToken: ResumeToken;
160161

161162
const initChangeStream = () => {
162-
if (isClosed) {
163+
if (isClosed || (changeStream && !changeStream.closed)) {
163164
return;
164165
}
165-
if (changeStream) {
166-
changeStream.removeAllListeners("change");
167-
changeStream.removeAllListeners("close");
168-
}
169-
changeStream = mongoCollection.watch([
170-
{
171-
$match: {
172-
"fullDocument.uid": {
173-
$ne: opts.uid, // ignore events from self
166+
debug("opening change stream");
167+
changeStream = mongoCollection.watch(
168+
[
169+
{
170+
$match: {
171+
"fullDocument.uid": {
172+
$ne: opts.uid, // ignore events from self
173+
},
174174
},
175175
},
176-
},
177-
]);
176+
],
177+
{
178+
resumeAfter: resumeToken,
179+
}
180+
);
178181

179182
changeStream.on("change", (event: any) => {
180-
adapters.get(event.fullDocument?.nsp)?.onEvent(event);
183+
if (event.operationType === "insert") {
184+
resumeToken = changeStream.resumeToken;
185+
adapters.get(event.fullDocument?.nsp)?.onEvent(event);
186+
}
181187
});
182188

183189
changeStream.on("error", (err: Error) => {
184190
debug("change stream encountered an error: %s", err.message);
191+
if (
192+
err instanceof MongoServerError &&
193+
!err.hasErrorLabel("ResumableChangeStreamError")
194+
) {
195+
// the resume token was not found in the oplog
196+
resumeToken = null;
197+
}
185198
});
186199

187200
changeStream.on("close", () => {
@@ -210,6 +223,7 @@ export function createAdapter(
210223
if (adapters.size === 0) {
211224
changeStream.removeAllListeners("close");
212225
changeStream.close();
226+
// @ts-ignore
213227
changeStream = null;
214228
isClosed = true;
215229
}
@@ -246,7 +260,7 @@ export class MongoAdapter extends Adapter {
246260
*/
247261
constructor(
248262
nsp: any,
249-
mongoCollection: any,
263+
mongoCollection: Collection,
250264
opts: Partial<MongoAdapterOptions> = {}
251265
) {
252266
super(nsp);
@@ -456,11 +470,11 @@ export class MongoAdapter extends Adapter {
456470
}
457471

458472
private scheduleHeartbeat() {
459-
debug("schedule heartbeat in %d ms", this.heartbeatInterval);
460473
if (this.heartbeatTimer) {
461474
clearTimeout(this.heartbeatTimer);
462475
}
463476
this.heartbeatTimer = setTimeout(() => {
477+
debug("sending heartbeat");
464478
this.publish({
465479
type: EventType.HEARTBEAT,
466480
});

test/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,5 +441,24 @@ describe("@socket.io/mongodb-adapter", () => {
441441
await sleep(100);
442442
});
443443

444+
it("should resume the change stream upon reconnection", async () => {
445+
await mongoClient.close(true);
446+
447+
return new Promise(async (resolve) => {
448+
const partialDone = times(3, resolve);
449+
clientSockets[0].on("test1", partialDone);
450+
clientSockets[1].on("test2", partialDone);
451+
clientSockets[2].on("test3", partialDone);
452+
453+
await mongoClient.connect();
454+
servers[0].to(clientSockets[1].id).emit("test2");
455+
456+
await sleep(500);
457+
458+
servers[1].to(clientSockets[2].id).emit("test3");
459+
servers[2].to(clientSockets[0].id).emit("test1");
460+
});
461+
});
462+
444463
import("./connection-state-recovery");
445464
});

0 commit comments

Comments
 (0)