From 69abc42e1dab32d5489149123267cd8c339a52c1 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 18 Apr 2025 05:57:00 +0200 Subject: [PATCH] Fixed unacknowledged messages. --- taskiq_redis/redis_broker.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 93bec5e..10baff9 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -164,8 +164,10 @@ def __init__( consumer_name: Optional[str] = None, consumer_id: str = "$", mkstream: bool = True, - xread_block: int = 10000, + xread_block: int = 2000, maxlen: Optional[int] = None, + idle_timeout: int = 600000, # 10 minutes + unacknowledged_batch_size: int = 100, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -189,6 +191,8 @@ def __init__( trims (the old values of) the stream each time a new element is added :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. + :param redeliver_timeout: time in ms to wait before redelivering a message. + :param unacknowledged_batch_size: number of unacknowledged messages to fetch. """ super().__init__( url, @@ -205,6 +209,8 @@ def __init__( self.block = xread_block self.maxlen = maxlen self.additional_streams = additional_streams or {} + self.idle_timeout = idle_timeout + self.unacknowledged_batch_size = unacknowledged_batch_size async def _declare_consumer_group(self) -> None: """ @@ -260,6 +266,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: """Listen to incoming messages.""" async with Redis(connection_pool=self.connection_pool) as redis_conn: while True: + logger.debug("Starting fetching new messages") fetched = await redis_conn.xreadgroup( self.consumer_group_name, self.consumer_name, @@ -277,3 +284,29 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: data=msg[b"data"], ack=self._ack_generator(msg_id), ) + logger.debug("Starting fetching unacknowledged messages") + for stream in [self.queue_name, *self.additional_streams.keys()]: + lock = redis_conn.lock( + f"autoclaim:{self.consumer_group_name}:{stream}", + ) + if await lock.locked(): + continue + async with lock: + pending = await redis_conn.xautoclaim( + name=stream, + groupname=self.consumer_group_name, + consumername=self.consumer_name, + min_idle_time=self.idle_timeout, + count=self.unacknowledged_batch_size, + ) + logger.debug( + "Found %d pending messages in stream %s", + len(pending), + stream, + ) + for msg_id, msg in pending[1]: + logger.debug("Received message: %s", msg) + yield AckableMessage( + data=msg[b"data"], + ack=self._ack_generator(msg_id), + )