From bd8b99458e2f1623fc9f7c7516b0871596f4b9ba Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 13:45:46 +0800 Subject: [PATCH 01/14] fix: disable worker --- src/api.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api.js b/src/api.js index 693793c..e13bbb8 100644 --- a/src/api.js +++ b/src/api.js @@ -124,11 +124,11 @@ export class Api { addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` - if redis.call("EXISTS", KEYS[1]) == 0 then - redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) - elseif redis.call("XLEN", KEYS[1]) > 100 then - redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) - end + -- if redis.call("EXISTS", KEYS[1]) == 0 then + -- redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + -- elseif redis.call("XLEN", KEYS[1]) > 100 then + -- redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) + -- end redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) `, /** From 68276efb44b73026e3d73b408fd93df229a45d96 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 13:46:00 +0800 Subject: [PATCH 02/14] feat: helper fn for redis stream --- src/api.js | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/api.js b/src/api.js index e13bbb8..d38dba0 100644 --- a/src/api.js +++ b/src/api.js @@ -265,6 +265,35 @@ export class Api { } } + /** + * @param {string} room + * @param {string} docid + */ + async getRedisLastId (room, docid) { + const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const docMessages = ms.get(room)?.get(docid) || null + return docMessages?.lastId.toString() || '0' + } + + /** + * @param {string} room + * @param {string} docid + * @param {boolean} [remove=false] + */ + async trimRoomStream (room, docid, remove = false) { + const roomName = computeRedisRoomStreamName(room, docid, this.prefix) + const redisLastId = await this.getRedisLastId(room, docid) + const lastId = number.parseInt(redisLastId.split('-')[0]) + if (remove) { + await this.redis.del(roomName) + } else { + await this.redis.multi() + .xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) + .xDelIfEmpty(roomName) + .exec() + } + } + /** * @param {Object} opts * @param {number} [opts.blockTime] From 8c46f6e29c902c4b624fe05c5e6e5a8ab374ecaf Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 13:47:08 +0800 Subject: [PATCH 03/14] feat: do persist in socketio connection layer --- src/y-socket-io/y-socket-io.js | 189 ++++++++++++++++++++++++++------- 1 file changed, 150 insertions(+), 39 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 590facf..41fa391 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -4,10 +4,13 @@ import * as promise from 'lib0/promise' import * as encoding from 'lib0/encoding' import * as decoding from 'lib0/decoding' import { assert } from 'lib0/testing' -import { User } from './user.js' import * as api from '../api.js' import * as protocol from '../protocol.js' import { createSubscriber } from '../subscriber.js' +import { isDeepStrictEqual } from 'util' +import { User } from './user.js' + +const PERSIST_INTERVAL = 5000 /** * @typedef {import('socket.io').Namespace} Namespace @@ -89,6 +92,18 @@ export class YSocketIO { * @readonly */ namespaceMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + namespaceDocMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + socketUserCache = new Map() /** * YSocketIO constructor. @@ -123,12 +138,21 @@ export class YSocketIO { this.nsp = this.io.of(/^\/yjs\|.*$/) this.nsp.use(async (socket, next) => { - if (this.configuration.authenticate == null) return next() - const user = await this.configuration.authenticate(socket) - if (user) { - socket.user = new User(this.getNamespaceString(socket.nsp), user.userid) - return next() - } else return next(new Error('Unauthorized')) + if (this.configuration.authenticate === null) return next() + const userCache = this.socketUserCache.get(socket) + const namespace = this.getNamespaceString(socket.nsp) + if (!userCache || Date.now() - userCache.validatedAt > 60_000) { + this.socketUserCache.delete(socket) + const user = await this.configuration.authenticate(socket) + if (!user) return next(new Error('Unauthorized')) + this.socketUserCache.set(socket, { user, validatedAt: Date.now() }) + socket.user = new User(namespace, user.userid) + } else { + socket.user = new User(namespace, userCache.user.userid) + } + + if (socket.user) return next() + else return next(new Error('Unauthorized')) }) this.nsp.on('connection', async (socket) => { @@ -156,17 +180,23 @@ export class YSocketIO { this.initSyncListeners(socket) this.initAwarenessListeners(socket) this.initSocketListeners(socket) + ;(async () => { + assert(this.client) + assert(socket.user) + const doc = + this.namespaceDocMap.get(namespace) || + (await this.client.getDoc(namespace, 'index')) + this.namespaceDocMap.set(namespace, doc) - const doc = await this.client.getDoc(namespace, 'index') - - if ( - api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) - ) { - // our subscription is newer than the content that we received from the api - // need to renew subscription id and make sure that we catch the latest content. - this.subscriber.ensureSubId(stream, doc.redisLastId) - } - this.startSynchronization(socket, doc) + if ( + api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) + ) { + // our subscription is newer than the content that we received from the api + // need to renew subscription id and make sure that we catch the latest content. + this.subscriber?.ensureSubId(stream, doc.redisLastId) + } + this.startSynchronization(socket, doc) + })() }) return { client, subscriber } @@ -200,22 +230,31 @@ export class YSocketIO { syncStep2 ) => { assert(this.client) - const doc = await this.client.getDoc( - this.getNamespaceString(socket.nsp), - 'index' - ) + const namespace = this.getNamespaceString(socket.nsp) + const doc = + this.namespaceDocMap.get(namespace) || + (await this.client.getDoc(namespace, 'index')) + this.namespaceDocMap.set(namespace, doc) + assert(doc) syncStep2(Y.encodeStateAsUpdate(doc.ydoc, stateVector)) } ) + /** @type {unknown} */ + let prevMsg = null socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => { + if (isDeepStrictEqual(update, prevMsg)) return assert(this.client) + const namespace = this.getNamespaceString(socket.nsp) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('sync-update', message)) - ).catch(console.error) + this.client + .addMessage( + namespace, + 'index', + Buffer.from(this.toRedis('sync-update', message)) + ) + .catch(console.error) + prevMsg = update }) } @@ -232,14 +271,19 @@ export class YSocketIO { * @readonly */ initAwarenessListeners = (socket) => { + /** @type {unknown} */ + const prevMsg = null socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => { + if (isDeepStrictEqual(update, prevMsg)) return assert(this.client) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('awareness-update', new Uint8Array(message))) - ).catch(console.error) + this.client + .addMessage( + this.getNamespaceString(socket.nsp), + 'index', + Buffer.from(this.toRedis('awareness-update', new Uint8Array(message))) + ) + .catch(console.error) }) } @@ -253,14 +297,18 @@ export class YSocketIO { socket.on('disconnect', async () => { assert(this.subscriber) if (!socket.user) return - for (const ns of socket.user.subs) { - const stream = this.namespaceStreamMap.get(ns) + this.socketUserCache.delete(socket) + for (const stream of socket.user.subs) { + const ns = this.streamNamespaceMap.get(stream) + if (!ns) continue const nsp = this.namespaceMap.get(ns) if (nsp?.sockets.size === 0 && stream) { this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) this.namespaceStreamMap.delete(ns) this.streamNamespaceMap.delete(stream) this.namespaceMap.delete(ns) + this.namespaceDocMap.get(ns)?.ydoc.destroy() + this.namespaceDocMap.delete(ns) } } }) @@ -280,11 +328,13 @@ export class YSocketIO { (/** @type {Uint8Array} */ update) => { assert(this.client) const message = Buffer.from(update.slice(0, update.byteLength)) - this.client.addMessage( - this.getNamespaceString(socket.nsp), - 'index', - Buffer.from(this.toRedis('sync-step-2', message)) - ).catch(console.error) + this.client + .addMessage( + this.getNamespaceString(socket.nsp), + 'index', + Buffer.from(this.toRedis('sync-step-2', message)) + ) + .catch(console.error) } ) if (doc.awareness.states.size > 0) { @@ -303,7 +353,7 @@ export class YSocketIO { * @param {string} stream * @param {Array} messages */ - redisMessageSubscriber = (stream, messages) => { + redisMessageSubscriber = async (stream, messages) => { const namespace = this.streamNamespaceMap.get(stream) if (!namespace) return const nsp = this.namespaceMap.get(namespace) @@ -313,6 +363,8 @@ export class YSocketIO { this.namespaceStreamMap.delete(namespace) this.streamNamespaceMap.delete(stream) this.namespaceMap.delete(namespace) + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.delete(namespace) } /** @type {Uint8Array[]} */ @@ -334,6 +386,65 @@ export class YSocketIO { if (msg.length === 0) continue nsp.emit('awareness-update', msg) } + + let changed = false + const existDoc = this.namespaceDocMap.get(namespace) + if (existDoc) { + existDoc.ydoc.on('afterTransaction', (tr) => { + changed = tr.changed.size > 0 + }) + Y.transact(existDoc.ydoc, () => { + for (const msg of updates) Y.applyUpdate(existDoc.ydoc, msg) + for (const msg of awareness) { + AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null) + } + }) + } + + assert(this.client) + let doc = existDoc + if (!existDoc) { + const getDoc = await this.client.getDoc(namespace, 'index') + doc = getDoc + changed = getDoc.changed + } + assert(doc) + if (changed) this.debouncedPersist(namespace, doc.ydoc) + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.set(namespace, doc) + await this.client.trimRoomStream(namespace, 'index', nsp.sockets.size === 0) + } + + /** + * @type {Map} + */ + debouncedPersistMap = new Map() + /** + * @type {Map} + */ + debouncedPersistDocMap = new Map() + + /** + * @param {string} namespace + * @param {Y.Doc} doc + */ + async debouncedPersist (namespace, doc) { + this.debouncedPersistDocMap.set(namespace, doc) + if (this.debouncedPersistMap.has(namespace)) return + this.debouncedPersistMap.set( + namespace, + setTimeout( + async () => { + assert(this.client) + const doc = this.debouncedPersistDocMap.get(namespace) + if (!doc) return + await this.client.store.persistDoc(namespace, 'index', doc) + this.debouncedPersistDocMap.delete(namespace) + this.debouncedPersistMap.delete(namespace) + }, + PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL + ) + ) } /** From b68197e0c5eb71d1759ddf66c50f22992b5a0a53 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 14:57:41 +0800 Subject: [PATCH 04/14] chore: toobusy & env & logs --- package.json | 2 ++ pnpm-lock.yaml | 17 +++++++++++++++++ src/y-socket-io/y-socket-io.js | 25 +++++++++++++++++++++---- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 1dc0e30..a74a17e 100644 --- a/package.json +++ b/package.json @@ -65,6 +65,7 @@ "redis": "^4.6.12", "socket.io": "^4.7.5", "socket.io-client": "^4.8.0", + "toobusy-js": "^0.5.1", "y-protocols": "^1.0.6", "yjs": "^13.6.18" }, @@ -80,6 +81,7 @@ "@dotenvx/dotenvx": "^1.14.0", "@redis/client": "^1.6.0", "@types/node": "^20.11.5", + "@types/toobusy-js": "^0.5.4", "@types/ws": "^8.5.10", "concurrently": "^8.2.2", "standard": "^17.1.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 72e83e1..13f8261 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: socket.io-client: specifier: ^4.8.0 version: 4.8.0 + toobusy-js: + specifier: ^0.5.1 + version: 0.5.1 y-protocols: specifier: ^1.0.6 version: 1.0.6(yjs@13.6.18) @@ -43,6 +46,9 @@ importers: '@types/node': specifier: ^20.11.5 version: 20.12.11 + '@types/toobusy-js': + specifier: ^0.5.4 + version: 0.5.4 '@types/ws': specifier: ^8.5.10 version: 8.5.10 @@ -424,6 +430,9 @@ packages: '@types/node@20.12.11': resolution: {integrity: sha512-vDg9PZ/zi+Nqp6boSOT7plNuthRugEKixDv5sFTIpkE89MmNtEArAShI4mxuX2+UrLEe9pxC1vm2cjm9YlWbJw==} + '@types/toobusy-js@0.5.4': + resolution: {integrity: sha512-hsKMbYiaL3ZWx7B3FYyN0rEJexw7I1HgKbNToX3ZZJv6373to954wlA7zrXR3/XoVwZnFwWqFguBs91sNzJGKQ==} + '@types/ws@8.5.10': resolution: {integrity: sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==} @@ -1808,6 +1817,10 @@ packages: resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==} engines: {node: '>=8.0'} + toobusy-js@0.5.1: + resolution: {integrity: sha512-GiCux/c8G2TV0FTDgtxnXOxmSAndaI/9b1YxT14CqyeBDtTZAcJLx9KlXT3qECi8D0XCc78T4sN/7gWtjRyCaA==} + engines: {node: '>=0.9.1'} + tr46@1.0.1: resolution: {integrity: sha512-dTpowEjclQ7Kgx5SdBkqRzVhERQXov8/l9Ft9dVM9fmg0W0KQSVaXX9T4i6twCPNtYiZM53lpSSUAwJbFPOHxA==} @@ -2278,6 +2291,8 @@ snapshots: dependencies: undici-types: 5.26.5 + '@types/toobusy-js@0.5.4': {} + '@types/ws@8.5.10': dependencies: '@types/node': 20.12.11 @@ -3916,6 +3931,8 @@ snapshots: dependencies: is-number: 7.0.0 + toobusy-js@0.5.1: {} + tr46@1.0.1: dependencies: punycode: 2.3.1 diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 41fa391..f2520cd 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -6,11 +6,23 @@ import * as decoding from 'lib0/decoding' import { assert } from 'lib0/testing' import * as api from '../api.js' import * as protocol from '../protocol.js' +import * as number from 'lib0/number' +import * as env from 'lib0/environment' import { createSubscriber } from '../subscriber.js' import { isDeepStrictEqual } from 'util' import { User } from './user.js' +import { createModuleLogger } from 'lib0/logging' +import toobusy from 'toobusy-js' -const PERSIST_INTERVAL = 5000 +const logSocketIO = createModuleLogger('@y/socket-io/server') +const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') +const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') + +process.on('SIGINT', function() { + // calling .shutdown allows your process to exit normally + toobusy.shutdown(); + process.exit(); +}); /** * @typedef {import('socket.io').Namespace} Namespace @@ -141,7 +153,7 @@ export class YSocketIO { if (this.configuration.authenticate === null) return next() const userCache = this.socketUserCache.get(socket) const namespace = this.getNamespaceString(socket.nsp) - if (!userCache || Date.now() - userCache.validatedAt > 60_000) { + if (!userCache || Date.now() - userCache.validatedAt > REVALIDATE_TIMEOUT) { this.socketUserCache.delete(socket) const user = await this.configuration.authenticate(socket) if (!user) return next(new Error('Unauthorized')) @@ -158,9 +170,14 @@ export class YSocketIO { this.nsp.on('connection', async (socket) => { assert(this.client) assert(this.subscriber) + const namespace = this.getNamespaceString(socket.nsp) + if (toobusy()) { + logSocketIO(`warning server too busy, rejecting connection: ${namespace}`) + throw new Error('server too busy, please try again latter') + } if (!socket.user) throw new Error('user does not exist in socket') - const namespace = this.getNamespaceString(socket.nsp) + logSocketIO(`new connection in namespace: ${namespace}`) const stream = api.computeRedisRoomStreamName( namespace, 'index', @@ -409,7 +426,7 @@ export class YSocketIO { changed = getDoc.changed } assert(doc) - if (changed) this.debouncedPersist(namespace, doc.ydoc) + this.debouncedPersist(namespace, doc.ydoc) this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) await this.client.trimRoomStream(namespace, 'index', nsp.sockets.size === 0) From e75d89472b0b0f33d19bac3f0a57832f6db48185 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 15:23:09 +0800 Subject: [PATCH 05/14] fix: trim after persist --- package.json | 1 + src/y-socket-io/y-socket-io.js | 18 +++++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index a74a17e..6ca60c7 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "test-db": "docker-compose -f ./docker-compose.test.yaml up", "dist": "tsup", "lint": "standard && tsc", + "lint:fix": "standard --fix && tsc", "test": "dotenvx run --env-file=.env -- node tests/index.js", "test-inspect": "dotenvx run --env-file=.env -- node --inspect-brk tests/index.js", "preversion": "npm run lint && npm run dist", diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index f2520cd..eae92d8 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -18,11 +18,11 @@ const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') -process.on('SIGINT', function() { +process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally - toobusy.shutdown(); - process.exit(); -}); + toobusy.shutdown() + process.exit() +}) /** * @typedef {import('socket.io').Namespace} Namespace @@ -411,8 +411,12 @@ export class YSocketIO { changed = tr.changed.size > 0 }) Y.transact(existDoc.ydoc, () => { - for (const msg of updates) Y.applyUpdate(existDoc.ydoc, msg) + for (const msg of updates) { + if (msg.length === 0) continue + Y.applyUpdate(existDoc.ydoc, msg) + } for (const msg of awareness) { + if (msg.length === 0) continue AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null) } }) @@ -426,10 +430,9 @@ export class YSocketIO { changed = getDoc.changed } assert(doc) - this.debouncedPersist(namespace, doc.ydoc) + if (changed) this.debouncedPersist(namespace, doc.ydoc) this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) - await this.client.trimRoomStream(namespace, 'index', nsp.sockets.size === 0) } /** @@ -456,6 +459,7 @@ export class YSocketIO { const doc = this.debouncedPersistDocMap.get(namespace) if (!doc) return await this.client.store.persistDoc(namespace, 'index', doc) + await this.client.trimRoomStream(namespace, 'index', true) this.debouncedPersistDocMap.delete(namespace) this.debouncedPersistMap.delete(namespace) }, From adfec25313b35db3614d26a041450cb0a190b14b Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 15:55:11 +0800 Subject: [PATCH 06/14] feat: add env for disabling worker --- src/api.js | 22 ++++++++++++++-------- src/y-socket-io/y-socket-io.js | 15 +++++++-------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/api.js b/src/api.js index d38dba0..633a4ea 100644 --- a/src/api.js +++ b/src/api.js @@ -19,6 +19,7 @@ let ydocUpdateCallback = env.getConf('ydoc-update-callback') if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') { ydocUpdateCallback += '/' } +const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' /** * @param {string} a @@ -117,20 +118,25 @@ export class Api { this.redisWorkerGroupName = this.prefix + ':worker' this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset` this._destroyed = false + + const addScript = WORKER_DISABLED + ? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])' + : ` + if redis.call("EXISTS", KEYS[1]) == 0 then + redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + elseif redis.call("XLEN", KEYS[1]) > 100 then + redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) + end + redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) + ` + this.redis = redis.createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, - SCRIPT: ` - -- if redis.call("EXISTS", KEYS[1]) == 0 then - -- redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) - -- elseif redis.call("XLEN", KEYS[1]) > 100 then - -- redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) - -- end - redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) - `, + SCRIPT: addScript, /** * @param {string} key * @param {Buffer} message diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index eae92d8..8e41d0c 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -17,6 +17,7 @@ import toobusy from 'toobusy-js' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') +const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -200,10 +201,8 @@ export class YSocketIO { ;(async () => { assert(this.client) assert(socket.user) - const doc = - this.namespaceDocMap.get(namespace) || - (await this.client.getDoc(namespace, 'index')) - this.namespaceDocMap.set(namespace, doc) + const doc = WORKER_DISABLED && this.namespaceDocMap.get(namespace) || (await this.client.getDoc(namespace, 'index')) + if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) if ( api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) @@ -248,10 +247,8 @@ export class YSocketIO { ) => { assert(this.client) const namespace = this.getNamespaceString(socket.nsp) - const doc = - this.namespaceDocMap.get(namespace) || - (await this.client.getDoc(namespace, 'index')) - this.namespaceDocMap.set(namespace, doc) + const doc = WORKER_DISABLED && this.namespaceDocMap.get(namespace) || (await this.client.getDoc(namespace, 'index')) + if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) assert(doc) syncStep2(Y.encodeStateAsUpdate(doc.ydoc, stateVector)) } @@ -404,6 +401,8 @@ export class YSocketIO { nsp.emit('awareness-update', msg) } + if (!WORKER_DISABLED) return + let changed = false const existDoc = this.namespaceDocMap.get(namespace) if (existDoc) { From a61d8ed61dbead5200d84a8639b49f948e04bf45 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 15:56:48 +0800 Subject: [PATCH 07/14] fix: lint fix --- src/y-socket-io/y-socket-io.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 8e41d0c..c72a173 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -201,7 +201,7 @@ export class YSocketIO { ;(async () => { assert(this.client) assert(socket.user) - const doc = WORKER_DISABLED && this.namespaceDocMap.get(namespace) || (await this.client.getDoc(namespace, 'index')) + const doc = (WORKER_DISABLED && this.namespaceDocMap.get(namespace)) || (await this.client.getDoc(namespace, 'index')) if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) if ( @@ -247,7 +247,7 @@ export class YSocketIO { ) => { assert(this.client) const namespace = this.getNamespaceString(socket.nsp) - const doc = WORKER_DISABLED && this.namespaceDocMap.get(namespace) || (await this.client.getDoc(namespace, 'index')) + const doc = (WORKER_DISABLED && this.namespaceDocMap.get(namespace)) || (await this.client.getDoc(namespace, 'index')) if (WORKER_DISABLED) this.namespaceDocMap.set(namespace, doc) assert(doc) syncStep2(Y.encodeStateAsUpdate(doc.ydoc, stateVector)) From 60ac340bd3f93d1c4b341cdf30b5b9337bd66685 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 18:32:31 +0800 Subject: [PATCH 08/14] fix: also call persist if last call too old --- src/y-socket-io/y-socket-io.js | 38 +++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index c72a173..904db57 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -16,6 +16,7 @@ import toobusy from 'toobusy-js' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') +const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max-persist-interval') || '30000') const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' @@ -117,6 +118,24 @@ export class YSocketIO { * @readonly */ socketUserCache = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + debouncedPersistMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + debouncedPersistDocMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + namespacePersistentMap = new Map() /** * YSocketIO constructor. @@ -323,6 +342,7 @@ export class YSocketIO { this.namespaceMap.delete(ns) this.namespaceDocMap.get(ns)?.ydoc.destroy() this.namespaceDocMap.delete(ns) + this.namespacePersistentMap.delete(ns) } } }) @@ -379,6 +399,7 @@ export class YSocketIO { this.namespaceMap.delete(namespace) this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.delete(namespace) + this.namespacePersistentMap.delete(namespace) } /** @type {Uint8Array[]} */ @@ -429,20 +450,17 @@ export class YSocketIO { changed = getDoc.changed } assert(doc) - if (changed) this.debouncedPersist(namespace, doc.ydoc) + const lastPersistCalledAt = this.namespacePersistentMap.get(namespace) ?? 0 + const now = Date.now() + const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL + if (changed || shouldPersist) { + this.namespacePersistentMap.set(namespace, now) + this.debouncedPersist(namespace, doc.ydoc) + } this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) } - /** - * @type {Map} - */ - debouncedPersistMap = new Map() - /** - * @type {Map} - */ - debouncedPersistDocMap = new Map() - /** * @param {string} namespace * @param {Y.Doc} doc From 1f87975ddae7173ee85c8fa7c5a06048cb583d8b Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 20:02:28 +0800 Subject: [PATCH 09/14] feat: allow worker thread persist --- src/api.js | 2 ++ src/index.js | 1 + src/persist-worker-thread.js | 43 ++++++++++++++++++++++++++++++++++ src/socketio.js | 5 ++-- src/y-socket-io/y-socket-io.js | 40 ++++++++++++++++++++++++++++--- 5 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 src/persist-worker-thread.js diff --git a/src/api.js b/src/api.js index 633a4ea..64860fd 100644 --- a/src/api.js +++ b/src/api.js @@ -118,6 +118,8 @@ export class Api { this.redisWorkerGroupName = this.prefix + ':worker' this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset` this._destroyed = false + /** @type {import('worker_threads').Worker | null} */ + this.persistWorker = null const addScript = WORKER_DISABLED ? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])' diff --git a/src/index.js b/src/index.js index 29e80d4..8c183d9 100644 --- a/src/index.js +++ b/src/index.js @@ -3,4 +3,5 @@ export * from './server.js' export * from './storage.js' export * from './api.js' export * from './subscriber.js' +export * from './persist-worker-thread.js' export * from './y-socket-io/index.js' diff --git a/src/persist-worker-thread.js b/src/persist-worker-thread.js new file mode 100644 index 0000000..6acd57b --- /dev/null +++ b/src/persist-worker-thread.js @@ -0,0 +1,43 @@ +import * as Y from 'yjs' +import * as logging from 'lib0/logging' +import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' +import path from 'path' + +export class PersistWorkerThread { + /** + * @private + * @readonly + */ + log = logging.createModuleLogger('@y/persist-worker-thread') + + /** + * @param {import('./storage.js').AbstractStorage} store + */ + constructor(store) { + if (isMainThread) { + this.log('persist worker cannot run on main thread') + return + } + this.store = store + parentPort?.on('message', this.persist) + } + + /** + * @param {{ room: string, docstate: SharedArrayBuffer }} props + */ + persist = async ({ room, docstate }) => { + const state = new Uint8Array(docstate) + const doc = new Y.Doc() + Y.applyUpdateV2(doc, state) + await this.store?.persistDoc(room, 'index', doc) + doc.destroy() + } +} + +/** + * @param {import('./storage.js').AbstractStorage} store + */ +export function createPersistWorkerThread(store) { + if (isMainThread) throw new Error('cannot create persist worker in main thread') + return new PersistWorkerThread(store) +} diff --git a/src/socketio.js b/src/socketio.js index e288f3a..0d26540 100644 --- a/src/socketio.js +++ b/src/socketio.js @@ -37,9 +37,10 @@ class YSocketIOServer { * @param {string} [conf.redisPrefix] * @param {string} [conf.redisUrl] * @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate + * @param {import('worker_threads').Worker=} [conf.persistWorker] */ -export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix }) => { +export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => { const app = new YSocketIO(io, { authenticate }) - const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix }) + const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker }) return new YSocketIOServer(app, client, subscriber) } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 904db57..c99f3b1 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -136,6 +136,12 @@ export class YSocketIO { * @readonly */ namespacePersistentMap = new Map() + /** + * @type {Map void>} + * @private + * @readonly + */ + awaitingPersistMap = new Map() /** * YSocketIO constructor. @@ -156,16 +162,20 @@ export class YSocketIO { * * It also starts socket connection listeners. * @param {import('../storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string }=} opts + * @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts * @public */ - async initialize (store, { redisUrl, redisPrefix = 'y' } = {}) { + async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) { const [client, subscriber] = await promise.all([ api.createApiClient(store, { redisUrl, redisPrefix }), createSubscriber(store, { redisUrl, redisPrefix }) ]) this.client = client this.subscriber = subscriber + if (persistWorker) { + this.client.persistWorker = persistWorker + this.registerPersistWorkerResolve() + } this.nsp = this.io.of(/^\/yjs\|.*$/) @@ -475,7 +485,24 @@ export class YSocketIO { assert(this.client) const doc = this.debouncedPersistDocMap.get(namespace) if (!doc) return - await this.client.store.persistDoc(namespace, 'index', doc) + if (this.client.persistWorker) { + /** @type {Promise} */ + const promise = new Promise((res) => { + assert(this.client?.persistWorker) + this.awaitingPersistMap.set(namespace, res) + + const docState = Y.encodeStateAsUpdateV2(doc) + const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) + buf.set(docState) + this.client.persistWorker.postMessage({ + room: namespace, + docstate: buf + }) + }) + await promise + } else { + await this.client.store.persistDoc(namespace, 'index', doc) + } await this.client.trimRoomStream(namespace, 'index', true) this.debouncedPersistDocMap.delete(namespace) this.debouncedPersistMap.delete(namespace) @@ -569,4 +596,11 @@ export class YSocketIO { console.error(e) } } + + registerPersistWorkerResolve () { + if (!this.client?.persistWorker) return + this.client.persistWorker.on('message', ({ event, room }) => { + if (event === 'persisted') this.awaitingPersistMap.get(room)?.() + }) + } } From 03178fb1381017a53b97ed608c567feb119c7032 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 20:03:24 +0800 Subject: [PATCH 10/14] fix: lint error --- src/persist-worker-thread.js | 7 +++---- src/y-socket-io/y-socket-io.js | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/persist-worker-thread.js b/src/persist-worker-thread.js index 6acd57b..98f405c 100644 --- a/src/persist-worker-thread.js +++ b/src/persist-worker-thread.js @@ -1,7 +1,6 @@ import * as Y from 'yjs' import * as logging from 'lib0/logging' -import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' -import path from 'path' +import { isMainThread, parentPort } from 'worker_threads' export class PersistWorkerThread { /** @@ -13,7 +12,7 @@ export class PersistWorkerThread { /** * @param {import('./storage.js').AbstractStorage} store */ - constructor(store) { + constructor (store) { if (isMainThread) { this.log('persist worker cannot run on main thread') return @@ -37,7 +36,7 @@ export class PersistWorkerThread { /** * @param {import('./storage.js').AbstractStorage} store */ -export function createPersistWorkerThread(store) { +export function createPersistWorkerThread (store) { if (isMainThread) throw new Error('cannot create persist worker in main thread') return new PersistWorkerThread(store) } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index c99f3b1..3a3adc8 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -487,9 +487,9 @@ export class YSocketIO { if (!doc) return if (this.client.persistWorker) { /** @type {Promise} */ - const promise = new Promise((res) => { + const promise = new Promise((resolve) => { assert(this.client?.persistWorker) - this.awaitingPersistMap.set(namespace, res) + this.awaitingPersistMap.set(namespace, resolve) const docState = Y.encodeStateAsUpdateV2(doc) const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) From 4517cd5cb10e0a62528ca574647ea4180debde83 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 20:59:56 +0800 Subject: [PATCH 11/14] fix: worker emit back persist event --- src/persist-worker-thread.js | 2 ++ src/y-socket-io/y-socket-io.js | 28 ++++++++++++++++++---------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/persist-worker-thread.js b/src/persist-worker-thread.js index 98f405c..4e588be 100644 --- a/src/persist-worker-thread.js +++ b/src/persist-worker-thread.js @@ -25,11 +25,13 @@ export class PersistWorkerThread { * @param {{ room: string, docstate: SharedArrayBuffer }} props */ persist = async ({ room, docstate }) => { + this.log(`persisting ${room} in worker`) const state = new Uint8Array(docstate) const doc = new Y.Doc() Y.applyUpdateV2(doc, state) await this.store?.persistDoc(room, 'index', doc) doc.destroy() + parentPort?.postMessage({ event: 'persisted', room }) } } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 3a3adc8..040dd0f 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -478,16 +478,17 @@ export class YSocketIO { async debouncedPersist (namespace, doc) { this.debouncedPersistDocMap.set(namespace, doc) if (this.debouncedPersistMap.has(namespace)) return - this.debouncedPersistMap.set( - namespace, - setTimeout( - async () => { + const timeout = setTimeout( + async () => { + try { assert(this.client) const doc = this.debouncedPersistDocMap.get(namespace) + logSocketIO(`trying to persist ${namespace}`) if (!doc) return + /** @type {Promise | null} */ + let workerPromise = null if (this.client.persistWorker) { - /** @type {Promise} */ - const promise = new Promise((resolve) => { + workerPromise = new Promise((resolve) => { assert(this.client?.persistWorker) this.awaitingPersistMap.set(namespace, resolve) @@ -499,17 +500,24 @@ export class YSocketIO { docstate: buf }) }) - await promise + if (workerPromise) { + await workerPromise + } } else { await this.client.store.persistDoc(namespace, 'index', doc) } await this.client.trimRoomStream(namespace, 'index', true) + } catch (e) { + console.error(e) + } finally { this.debouncedPersistDocMap.delete(namespace) this.debouncedPersistMap.delete(namespace) - }, - PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL - ) + } + }, + PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL ) + + this.debouncedPersistMap.set(namespace, timeout) } /** From 35223ce4e5ae7ab6ab16413e95ce87d74b0af4ed Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 21:35:54 +0800 Subject: [PATCH 12/14] fix: should wait a while before clearing resource --- src/y-socket-io/utils.js | 17 +++++ src/y-socket-io/y-socket-io.js | 121 ++++++++++++++++++++++----------- 2 files changed, 100 insertions(+), 38 deletions(-) create mode 100644 src/y-socket-io/utils.js diff --git a/src/y-socket-io/utils.js b/src/y-socket-io/utils.js new file mode 100644 index 0000000..42481de --- /dev/null +++ b/src/y-socket-io/utils.js @@ -0,0 +1,17 @@ +/** + * Basically Promise.withResolvers() + * @template T + * @see https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers + */ +export function promiseWithResolvers () { + /** @type {(value: T | PromiseLike) => void} */ + let res = () => {} + /** @type {(reason?: Error) => void} */ + let rej = () => {} + /** @type {Promise} */ + const promise = new Promise((resolve, reject) => { + res = resolve + rej = reject + }) + return { promise, resolve: res, reject: rej } +} diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 040dd0f..6c4d67d 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -13,12 +13,14 @@ import { isDeepStrictEqual } from 'util' import { User } from './user.js' import { createModuleLogger } from 'lib0/logging' import toobusy from 'toobusy-js' +import { promiseWithResolvers } from './utils.js' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max-persist-interval') || '30000') const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' +const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -137,11 +139,17 @@ export class YSocketIO { */ namespacePersistentMap = new Map() /** - * @type {Map void>} + * @type {Map, resolve: () => void }>} * @private * @readonly */ awaitingPersistMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + awaitingCleanupNamespace = new Map() /** * YSocketIO constructor. @@ -213,6 +221,12 @@ export class YSocketIO { 'index', redisPrefix ) + const prevAwaitCleanup = this.awaitingCleanupNamespace.get(namespace) + if (prevAwaitCleanup) { + clearTimeout(prevAwaitCleanup) + this.cleanupNamespace(namespace, stream) + } + if (!this.namespaceMap.has(namespace)) { this.namespaceMap.set(namespace, socket.nsp) } @@ -346,13 +360,9 @@ export class YSocketIO { if (!ns) continue const nsp = this.namespaceMap.get(ns) if (nsp?.sockets.size === 0 && stream) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(ns) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(ns) - this.namespaceDocMap.get(ns)?.ydoc.destroy() - this.namespaceDocMap.delete(ns) - this.namespacePersistentMap.delete(ns) + this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) + const doc = this.namespaceDocMap.get(ns) + if (doc) this.debouncedPersist(ns, doc.ydoc, true) } } }) @@ -403,13 +413,7 @@ export class YSocketIO { const nsp = this.namespaceMap.get(namespace) if (!nsp) return if (nsp.sockets.size === 0 && this.subscriber) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(namespace) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(namespace) - this.namespaceDocMap.get(namespace)?.ydoc.destroy() - this.namespaceDocMap.delete(namespace) - this.namespacePersistentMap.delete(namespace) + this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT) } /** @type {Uint8Array[]} */ @@ -463,9 +467,9 @@ export class YSocketIO { const lastPersistCalledAt = this.namespacePersistentMap.get(namespace) ?? 0 const now = Date.now() const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL - if (changed || shouldPersist) { + if (changed || shouldPersist || nsp.sockets.size === 0) { this.namespacePersistentMap.set(namespace, now) - this.debouncedPersist(namespace, doc.ydoc) + this.debouncedPersist(namespace, doc.ydoc, nsp.sockets.size === 0) } this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) @@ -474,10 +478,17 @@ export class YSocketIO { /** * @param {string} namespace * @param {Y.Doc} doc + * @param {boolean=} immediate */ - async debouncedPersist (namespace, doc) { + debouncedPersist (namespace, doc, immediate = false) { this.debouncedPersistDocMap.set(namespace, doc) - if (this.debouncedPersistMap.has(namespace)) return + if (this.debouncedPersistMap.has(namespace)) { + if (!immediate) return + clearTimeout(this.debouncedPersistMap.get(namespace) || undefined) + } + const timeoutInterval = immediate + ? 0 + : PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL const timeout = setTimeout( async () => { try { @@ -485,28 +496,24 @@ export class YSocketIO { const doc = this.debouncedPersistDocMap.get(namespace) logSocketIO(`trying to persist ${namespace}`) if (!doc) return - /** @type {Promise | null} */ - let workerPromise = null if (this.client.persistWorker) { - workerPromise = new Promise((resolve) => { - assert(this.client?.persistWorker) - this.awaitingPersistMap.set(namespace, resolve) - - const docState = Y.encodeStateAsUpdateV2(doc) - const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) - buf.set(docState) - this.client.persistWorker.postMessage({ - room: namespace, - docstate: buf - }) + /** @type {ReturnType>} */ + const { promise, resolve } = promiseWithResolvers() + assert(this.client?.persistWorker) + this.awaitingPersistMap.set(namespace, { promise, resolve }) + + const docState = Y.encodeStateAsUpdateV2(doc) + const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) + buf.set(docState) + this.client.persistWorker.postMessage({ + room: namespace, + docstate: buf }) - if (workerPromise) { - await workerPromise - } + await promise } else { await this.client.store.persistDoc(namespace, 'index', doc) } - await this.client.trimRoomStream(namespace, 'index', true) + await this.client.trimRoomStream(namespace, 'index') } catch (e) { console.error(e) } finally { @@ -514,7 +521,7 @@ export class YSocketIO { this.debouncedPersistMap.delete(namespace) } }, - PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL + timeoutInterval ) this.debouncedPersistMap.set(namespace, timeout) @@ -608,7 +615,45 @@ export class YSocketIO { registerPersistWorkerResolve () { if (!this.client?.persistWorker) return this.client.persistWorker.on('message', ({ event, room }) => { - if (event === 'persisted') this.awaitingPersistMap.get(room)?.() + if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve() }) } + + /** + * @param {string} namespace + * @param {string} stream + * @param {number=} removeAfterWait + */ + cleanupNamespace (namespace, stream, removeAfterWait) { + if (!removeAfterWait) { + this.awaitingCleanupNamespace.delete(namespace) + return this.cleanupNamespaceImpl(namespace, stream) + } + if (this.awaitingCleanupNamespace.has(namespace)) return + + const timer = setTimeout(async () => { + const awaitingPersist = this.awaitingPersistMap.get(namespace) + if (awaitingPersist) await awaitingPersist.promise + this.cleanupNamespaceImpl(namespace, stream) + this.awaitingCleanupNamespace.delete(namespace) + logSocketIO(`no active connection, namespace: ${namespace} cleared`) + }, removeAfterWait) + this.awaitingCleanupNamespace.set(namespace, timer) + } + + /** + * @param {string} namespace + * @param {string} stream + * @private + */ + cleanupNamespaceImpl (namespace, stream) { + this.subscriber?.unsubscribe(stream, this.redisMessageSubscriber) + this.namespaceStreamMap.delete(namespace) + this.streamNamespaceMap.delete(stream) + this.namespaceMap.delete(namespace) + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.delete(namespace) + this.namespacePersistentMap.delete(namespace) + this.client?.trimRoomStream(namespace, 'index', true) + } } From b1777ddccf6473a64f42c0887def605b5cabfd7f Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 23 Jan 2025 12:11:22 +0800 Subject: [PATCH 13/14] fix: handle too busy reconnection --- src/y-socket-io/client.js | 2 ++ src/y-socket-io/y-socket-io.js | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index ebeacde..c1a1d11 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -351,6 +351,8 @@ export class SocketIOProvider extends Observable { * @readonly */ onSocketDisconnection = (event) => { + if (event === 'io server disconnect') this.socket.connect() + this.emit('connection-close', [event, this]) this.synced = false AwarenessProtocol.removeAwarenessStates( diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 6c4d67d..3a40940 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -211,7 +211,11 @@ export class YSocketIO { const namespace = this.getNamespaceString(socket.nsp) if (toobusy()) { logSocketIO(`warning server too busy, rejecting connection: ${namespace}`) - throw new Error('server too busy, please try again latter') + // wait a bit to prevent client reconnect too fast + await promise.wait(100) + socket.send('server too busy, please try again latter') + socket.disconnect(true) + return } if (!socket.user) throw new Error('user does not exist in socket') From b9d188258e893b1f94e8d5c5aff974ef36412b83 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 23 Jan 2025 12:11:33 +0800 Subject: [PATCH 14/14] feat: handle reload event --- src/y-socket-io/y-socket-io.js | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 3a40940..8a50f55 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -370,6 +370,15 @@ export class YSocketIO { } } }) + socket.onAnyOutgoing(async (ev) => { + if (ev !== 'reload') return + if (!WORKER_DISABLED) return + const namespace = this.getNamespaceString(socket.nsp) + logSocketIO(`reload event triggered, updating namespace doc in: ${namespace}`) + assert(this.client) + const doc = await this.client.getDoc(namespace, 'index') + this.namespaceDocMap.set(namespace, doc) + }) } /**