Skip to content

Commit e342bde

Browse files
authored
fix: don't wait if reclaimed tasks > 0 (#14)
2 parents 0732006 + 83d2126 commit e342bde

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

src/api.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import * as math from 'lib0/math'
1111
import * as protocol from './protocol.js'
1212
import * as env from 'lib0/environment'
1313
import * as logging from 'lib0/logging'
14-
import * as time from 'lib0/time'
1514

1615
const logWorker = logging.createModuleLogger('@y/redis/api/worker')
1716
// const logApi = logging.createModuleLogger('@y/redis/api')
@@ -280,6 +279,7 @@ export class Api {
280279
})
281280
}
282281
tasks.length > 0 && logWorker('Accepted tasks ', { tasks })
282+
let reclaimCounts = 0
283283
await promise.all(tasks.map(async task => {
284284
const streamlen = await this.redis.xLen(task.stream)
285285
if (streamlen === 0) {
@@ -289,6 +289,7 @@ export class Api {
289289
.exec()
290290
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
291291
} else {
292+
reclaimCounts++
292293
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
293294
const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid)
294295
const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0]))
@@ -321,7 +322,7 @@ export class Api {
321322
ydoc.destroy()
322323
}
323324
}))
324-
return tasks
325+
return { tasks, reclaimCounts }
325326
}
326327

327328
async destroy () {
@@ -353,14 +354,17 @@ export class Worker {
353354
this.client = client
354355
logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime })
355356
;(async () => {
356-
const startRedisTime = await client.redis.time()
357-
const timeDiff = startRedisTime.getTime() - time.getUnixTime()
357+
let prev = performance.now()
358358
while (!client._destroyed) {
359359
try {
360-
const tasks = await client.consumeWorkerQueue(opts)
361-
if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) {
360+
const { reclaimCounts } = await client.consumeWorkerQueue(opts)
361+
const now = performance.now()
362+
if (reclaimCounts === 0) {
362363
await promise.wait(client.redisWorkerTimeout)
364+
} else if (now - prev < client.redisWorkerTimeout) {
365+
await promise.wait(client.redisWorkerTimeout - (now - prev))
363366
}
367+
prev = now
364368
} catch (e) {
365369
console.error(e)
366370
}

0 commit comments

Comments
 (0)