From 0d34a45e5db4dcebea28e64c1efb7232bc302420 Mon Sep 17 00:00:00 2001 From: arturka Date: Fri, 6 Jun 2025 12:23:39 +0200 Subject: [PATCH 1/2] feat: add maxlen to stream sentinel and cluster brokers --- taskiq_redis/redis_cluster_broker.py | 10 +++++++++- taskiq_redis/redis_sentinel_broker.py | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index 6818275..4ba7107 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -92,6 +92,7 @@ def __init__( consumer_id: str = "$", mkstream: bool = True, xread_block: int = 10000, + maxlen: Optional[int] = None, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -111,6 +112,8 @@ def __init__( :param mkstream: create stream if it does not exist. :param xread_block: block time in ms for xreadgroup. Better to set it to a bigger value, to avoid unnecessary calls. + :param maxlen: sets the maximum length of the stream + trims (the old values of) the stream each time a new element is added :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. """ @@ -125,6 +128,7 @@ def __init__( self.consumer_id = consumer_id self.mkstream = mkstream self.block = xread_block + self.maxlen = maxlen self.additional_streams = additional_streams or {} async def _declare_consumer_group(self) -> None: @@ -154,7 +158,11 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ - await self.redis.xadd(self.queue_name, {b"data": message.message}) + await self.redis.xadd( + self.queue_name, + {b"data": message.message}, + maxlen=self.maxlen, + ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: diff --git a/taskiq_redis/redis_sentinel_broker.py b/taskiq_redis/redis_sentinel_broker.py index 198251c..378b394 100644 --- a/taskiq_redis/redis_sentinel_broker.py +++ b/taskiq_redis/redis_sentinel_broker.py @@ -157,6 +157,7 @@ def __init__( consumer_id: str = "$", mkstream: bool = True, xread_block: int = 10000, + maxlen: Optional[int] = None, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -176,6 +177,8 @@ def __init__( :param mkstream: create stream if it does not exist. :param xread_block: block time in ms for xreadgroup. Better to set it to a bigger value, to avoid unnecessary calls. + :param maxlen: sets the maximum length of the stream + trims (the old values of) the stream each time a new element is added :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. """ @@ -193,6 +196,7 @@ def __init__( self.consumer_id = consumer_id self.mkstream = mkstream self.block = xread_block + self.maxlen = maxlen self.additional_streams = additional_streams or {} async def _declare_consumer_group(self) -> None: @@ -223,7 +227,11 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ async with self._acquire_master_conn() as redis_conn: - await redis_conn.xadd(self.queue_name, {b"data": message.message}) + await redis_conn.xadd( + self.queue_name, + {b"data": message.message}, + maxlen=self.maxlen, + ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: async def _ack() -> None: From f4ed0757210d56d7c81a86de7293e082ec0f1498 Mon Sep 17 00:00:00 2001 From: arturka Date: Fri, 6 Jun 2025 13:53:43 +0200 Subject: [PATCH 2/2] feat: add approximate param to stream brokers, add tests for maxlen --- taskiq_redis/redis_broker.py | 5 ++ taskiq_redis/redis_cluster_broker.py | 5 ++ taskiq_redis/redis_sentinel_broker.py | 5 ++ tests/test_broker.py | 97 ++++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 1 deletion(-) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 4be8337..f16ee52 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -166,6 +166,7 @@ def __init__( mkstream: bool = True, xread_block: int = 2000, maxlen: Optional[int] = None, + approximate: bool = True, idle_timeout: int = 600000, # 10 minutes unacknowledged_batch_size: int = 100, xread_count: Optional[int] = 100, @@ -190,6 +191,8 @@ def __init__( Better to set it to a bigger value, to avoid unnecessary calls. :param maxlen: sets the maximum length of the stream trims (the old values of) the stream each time a new element is added + :param approximate: decides wether to trim the stream immediately (False) or + later on (True) :param xread_count: number of messages to fetch from the stream at once. :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. @@ -210,6 +213,7 @@ def __init__( self.mkstream = mkstream self.block = xread_block self.maxlen = maxlen + self.approximate = approximate self.additional_streams = additional_streams or {} self.idle_timeout = idle_timeout self.unacknowledged_batch_size = unacknowledged_batch_size @@ -252,6 +256,7 @@ async def kick(self, message: BrokerMessage) -> None: self.queue_name, {b"data": message.message}, maxlen=self.maxlen, + approximate=self.approximate, ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index 4ba7107..abe1892 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -93,6 +93,7 @@ def __init__( mkstream: bool = True, xread_block: int = 10000, maxlen: Optional[int] = None, + approximate: bool = True, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -114,6 +115,8 @@ def __init__( Better to set it to a bigger value, to avoid unnecessary calls. :param maxlen: sets the maximum length of the stream trims (the old values of) the stream each time a new element is added + :param approximate: decides wether to trim the stream immediately (False) or + later on (True) :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. """ @@ -129,6 +132,7 @@ def __init__( self.mkstream = mkstream self.block = xread_block self.maxlen = maxlen + self.approximate = approximate self.additional_streams = additional_streams or {} async def _declare_consumer_group(self) -> None: @@ -162,6 +166,7 @@ async def kick(self, message: BrokerMessage) -> None: self.queue_name, {b"data": message.message}, maxlen=self.maxlen, + approximate=self.approximate, ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: diff --git a/taskiq_redis/redis_sentinel_broker.py b/taskiq_redis/redis_sentinel_broker.py index 378b394..45a788e 100644 --- a/taskiq_redis/redis_sentinel_broker.py +++ b/taskiq_redis/redis_sentinel_broker.py @@ -158,6 +158,7 @@ def __init__( mkstream: bool = True, xread_block: int = 10000, maxlen: Optional[int] = None, + approximate: bool = True, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, ) -> None: @@ -179,6 +180,8 @@ def __init__( Better to set it to a bigger value, to avoid unnecessary calls. :param maxlen: sets the maximum length of the stream trims (the old values of) the stream each time a new element is added + :param approximate: decides wether to trim the stream immediately (False) or + later on (True) :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. """ @@ -197,6 +200,7 @@ def __init__( self.mkstream = mkstream self.block = xread_block self.maxlen = maxlen + self.approximate = approximate self.additional_streams = additional_streams or {} async def _declare_consumer_group(self) -> None: @@ -231,6 +235,7 @@ async def kick(self, message: BrokerMessage) -> None: self.queue_name, {b"data": message.message}, maxlen=self.maxlen, + approximate=self.approximate, ) def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]: diff --git a/tests/test_broker.py b/tests/test_broker.py index b62b80d..89e5aac 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -3,6 +3,7 @@ from typing import List, Tuple, Union import pytest +from redis.asyncio import Redis from taskiq import AckableMessage, AsyncBroker, BrokerMessage from taskiq_redis import ( @@ -316,7 +317,7 @@ async def test_streams_sentinel_broker( redis_sentinel_master_name: str, ) -> None: """ - Test that messages are published and read correctly by ListQueueSentinelBroker. + Test that messages are published and read correctly by RedisStreamSentinelBroker. We create two workers that listen and send a message to them. Expect only one worker to receive the same message we sent. @@ -338,3 +339,97 @@ async def test_streams_sentinel_broker( await result.ack() # type: ignore worker_task.cancel() await broker.shutdown() + + +@pytest.mark.anyio +async def test_maxlen_in_stream_broker( + redis_url: str, + valid_broker_message: BrokerMessage, +) -> None: + """ + Test that maxlen parameter works correctly in RedisStreamBroker. + + We create RedisStreamBroker, fill in them with messages in the amount of + > maxlen and check that only maxlen messages are in the stream. + """ + maxlen = 20 + + broker = RedisStreamBroker( + url=redis_url, + maxlen=maxlen, + approximate=False, + queue_name=uuid.uuid4().hex, + consumer_group_name=uuid.uuid4().hex, + ) + + await broker.startup() + + for _ in range(maxlen * 2): + await broker.kick(valid_broker_message) + + async with Redis(connection_pool=broker.connection_pool) as redis: + assert await redis.xlen(broker.queue_name) == maxlen + await broker.shutdown() + + +@pytest.mark.anyio +async def test_maxlen_in_cluster_stream_broker( + redis_cluster_url: str, + valid_broker_message: BrokerMessage, +) -> None: + """ + Test that maxlen parameter works correctly in RedisStreamClusterBroker. + + We create RedisStreamClusterBroker, fill it with messages in the amount of + > maxlen and check that only maxlen messages are in the stream. + """ + maxlen = 20 + + broker = RedisStreamClusterBroker( + maxlen=maxlen, + approximate=False, + url=redis_cluster_url, + queue_name=uuid.uuid4().hex, + consumer_group_name=uuid.uuid4().hex, + ) + + await broker.startup() + + for _ in range(maxlen * 2): + await broker.kick(valid_broker_message) + + assert await broker.redis.xlen(broker.queue_name) == maxlen + await broker.shutdown() + + +@pytest.mark.anyio +async def test_maxlen_in_sentinel_stream_broker( + redis_sentinel_master_name: str, + redis_sentinels: List[Tuple[str, int]], + valid_broker_message: BrokerMessage, +) -> None: + """ + Test that maxlen parameter works correctly in RedisStreamSentinelBroker. + + We create RedisStreamSentinelBroker, fill it with messages in the amount of + > maxlen and check that only maxlen messages are in the stream. + """ + maxlen = 20 + + broker = RedisStreamSentinelBroker( + maxlen=maxlen, + approximate=False, + sentinels=redis_sentinels, + queue_name=uuid.uuid4().hex, + consumer_group_name=uuid.uuid4().hex, + master_name=redis_sentinel_master_name, + ) + + await broker.startup() + + for _ in range(maxlen * 2): + await broker.kick(valid_broker_message) + + async with broker._acquire_master_conn() as redis_conn: + assert await redis_conn.xlen(broker.queue_name) == maxlen + await broker.shutdown()