Skip to content

Commit 938674d

Browse files
fix: properly handle invalidate events
Reference: https://www.mongodb.com/docs/manual/changeStreams/#open-a-change-stream Related: #5
1 parent d42bf39 commit 938674d

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

lib/index.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,14 @@ export function createAdapter(
115115
) {
116116
opts.uid = opts.uid || randomId();
117117

118+
let isClosed = false;
118119
let adapters = new Map<string, MongoAdapter>();
119120
let changeStream: any;
120121

121122
const initChangeStream = () => {
123+
if (isClosed) {
124+
return;
125+
}
122126
if (changeStream) {
123127
changeStream.removeAllListeners("change");
124128
changeStream.removeAllListeners("close");
@@ -134,7 +138,11 @@ export function createAdapter(
134138
]);
135139

136140
changeStream.on("change", (event: any) => {
137-
adapters.get(event.fullDocument.nsp)?.onEvent(event);
141+
adapters.get(event.fullDocument?.nsp)?.onEvent(event);
142+
});
143+
144+
changeStream.on("error", (err: Error) => {
145+
debug("change stream encountered an error: %s", err.message);
138146
});
139147

140148
changeStream.on("close", () => {
@@ -147,6 +155,7 @@ export function createAdapter(
147155

148156
return function (nsp: any) {
149157
if (!changeStream) {
158+
isClosed = false;
150159
initChangeStream();
151160
}
152161

@@ -163,6 +172,7 @@ export function createAdapter(
163172
changeStream.removeAllListeners("close");
164173
changeStream.close();
165174
changeStream = null;
175+
isClosed = true;
166176
}
167177

168178
defaultClose.call(adapter);

test/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,4 +434,10 @@ describe("@socket.io/mongodb-adapter", () => {
434434
});
435435
});
436436
});
437+
438+
it("should not throw when receiving a drop event", async () => {
439+
await mongoClient.db("test").dropCollection("events");
440+
441+
await sleep(100);
442+
});
437443
});

0 commit comments

Comments
 (0)