diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 75c0755..93bec5e 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -165,6 +165,7 @@ def __init__( consumer_id: str = "$", mkstream: bool = True, xread_block: int = 10000, + maxlen: Optional[int] = None, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -184,6 +185,8 @@ def __init__( :param mkstream: create stream if it does not exist. :param xread_block: block time in ms for xreadgroup. Better to set it to a bigger value, to avoid unnecessary calls. + :param maxlen: sets the maximum length of the stream + trims (the old values of) the stream each time a new element is added :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. """ @@ -200,6 +203,7 @@ def __init__( self.consumer_id = consumer_id self.mkstream = mkstream self.block = xread_block + self.maxlen = maxlen self.additional_streams = additional_streams or {} async def _declare_consumer_group(self) -> None: @@ -235,7 +239,11 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.xadd(self.queue_name, {b"data": message.message}) + await redis_conn.xadd( + self.queue_name, + {b"data": message.message}, + maxlen=self.maxlen, + ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: