From c39f90d206c82232de9efc1ba202c616f854bfb2 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Sat, 25 Jan 2025 15:53:57 +0800 Subject: [PATCH 1/3] feat: use ttl to remove inactive room --- src/api.js | 22 +++++++++++----------- src/y-socket-io/y-socket-io.js | 1 - 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/api.js b/src/api.js index e712ff6..4fb5d17 100644 --- a/src/api.js +++ b/src/api.js @@ -20,6 +20,7 @@ if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') { ydocUpdateCallback += '/' } const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' +const ROOM_STREAM_TTL = number.parseInt(env.getConf('y-room-stream-ttl') || '300') /** * @param {string} a @@ -125,7 +126,10 @@ export class Api { this.persistWorker = null const addScript = WORKER_DISABLED - ? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])' + ? ` + redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) + redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL}) + ` : ` if redis.call("EXISTS", KEYS[1]) == 0 then redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) @@ -133,6 +137,7 @@ export class Api { redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) end redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) + redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL}) ` this.redis = redis.createClient({ @@ -294,20 +299,15 @@ export class Api { /** * @param {string} room * @param {string} docid - * @param {boolean} [remove=false] */ - async trimRoomStream (room, docid, remove = false) { + async trimRoomStream (room, docid) { const roomName = computeRedisRoomStreamName(room, docid, this.prefix) const redisLastId = await this.getRedisLastId(room, docid) const lastId = number.parseInt(redisLastId.split('-')[0]) - if (remove) { - await this.redis.del(roomName) - } else { - await this.redis.multi() - .xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) - .xDelIfEmpty(roomName) - .exec() - } + await this.redis.multi() + .xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) + .xDelIfEmpty(roomName) + .exec() } /** diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 0df9393..597d5f0 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -687,6 +687,5 @@ export class YSocketIO { this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.delete(namespace) this.namespacePersistentMap.delete(namespace) - this.client?.trimRoomStream(namespace, 'index', true) } } From c2dc0e28b8dca9ded0735d699e5a0c2a964d12e4 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Sat, 25 Jan 2025 16:02:52 +0800 Subject: [PATCH 2/3] fix: don't delete stream if empty --- src/api.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/api.js b/src/api.js index 4fb5d17..4509d95 100644 --- a/src/api.js +++ b/src/api.js @@ -304,10 +304,7 @@ export class Api { const roomName = computeRedisRoomStreamName(room, docid, this.prefix) const redisLastId = await this.getRedisLastId(room, docid) const lastId = number.parseInt(redisLastId.split('-')[0]) - await this.redis.multi() - .xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) - .xDelIfEmpty(roomName) - .exec() + await this.redis.xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime) } /** From ac1f0041123ef5b461210d48ccbb749d2424f43b Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Sat, 25 Jan 2025 16:03:09 +0800 Subject: [PATCH 3/3] fix: no need to check for namespace --- src/y-socket-io/y-socket-io.js | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 597d5f0..12f6bfc 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -537,16 +537,7 @@ export class YSocketIO { await this.client.store.persistDoc(namespace, 'index', doc) } - /** - * there's a possibility where the namespace is deleted after the - * persist promise resolved, so we have to check if the room still - * exist. - * @see cleanupNamespace - * @see cleanupNamespaceImpl - */ - if (this.namespaceMap.has(namespace)) { - await this.client.trimRoomStream(namespace, 'index') - } + await this.client.trimRoomStream(namespace, 'index') } catch (e) { console.error(e) } finally {