Skip to content

feat: add maxlen to stream sentinel and cluster brokers #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def __init__(
mkstream: bool = True,
xread_block: int = 2000,
maxlen: Optional[int] = None,
approximate: bool = True,
idle_timeout: int = 600000, # 10 minutes
unacknowledged_batch_size: int = 100,
xread_count: Optional[int] = 100,
Expand All @@ -190,6 +191,8 @@ def __init__(
Better to set it to a bigger value, to avoid unnecessary calls.
:param maxlen: sets the maximum length of the stream
trims (the old values of) the stream each time a new element is added
:param approximate: decides wether to trim the stream immediately (False) or
later on (True)
:param xread_count: number of messages to fetch from the stream at once.
:param additional_streams: additional streams to read from.
Each key is a stream name, value is a consumer id.
Expand All @@ -210,6 +213,7 @@ def __init__(
self.mkstream = mkstream
self.block = xread_block
self.maxlen = maxlen
self.approximate = approximate
self.additional_streams = additional_streams or {}
self.idle_timeout = idle_timeout
self.unacknowledged_batch_size = unacknowledged_batch_size
Expand Down Expand Up @@ -252,6 +256,7 @@ async def kick(self, message: BrokerMessage) -> None:
self.queue_name,
{b"data": message.message},
maxlen=self.maxlen,
approximate=self.approximate,
)

def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
Expand Down
15 changes: 14 additions & 1 deletion taskiq_redis/redis_cluster_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def __init__(
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 10000,
maxlen: Optional[int] = None,
approximate: bool = True,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
Expand All @@ -111,6 +113,10 @@ def __init__(
:param mkstream: create stream if it does not exist.
:param xread_block: block time in ms for xreadgroup.
Better to set it to a bigger value, to avoid unnecessary calls.
:param maxlen: sets the maximum length of the stream
trims (the old values of) the stream each time a new element is added
:param approximate: decides wether to trim the stream immediately (False) or
later on (True)
:param additional_streams: additional streams to read from.
Each key is a stream name, value is a consumer id.
"""
Expand All @@ -125,6 +131,8 @@ def __init__(
self.consumer_id = consumer_id
self.mkstream = mkstream
self.block = xread_block
self.maxlen = maxlen
self.approximate = approximate
self.additional_streams = additional_streams or {}

async def _declare_consumer_group(self) -> None:
Expand Down Expand Up @@ -154,7 +162,12 @@ async def kick(self, message: BrokerMessage) -> None:

:param message: message to append.
"""
await self.redis.xadd(self.queue_name, {b"data": message.message})
await self.redis.xadd(
self.queue_name,
{b"data": message.message},
maxlen=self.maxlen,
approximate=self.approximate,
)

def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
async def _ack() -> None:
Expand Down
15 changes: 14 additions & 1 deletion taskiq_redis/redis_sentinel_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def __init__(
consumer_id: str = "$",
mkstream: bool = True,
xread_block: int = 10000,
maxlen: Optional[int] = None,
approximate: bool = True,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
) -> None:
Expand All @@ -176,6 +178,10 @@ def __init__(
:param mkstream: create stream if it does not exist.
:param xread_block: block time in ms for xreadgroup.
Better to set it to a bigger value, to avoid unnecessary calls.
:param maxlen: sets the maximum length of the stream
trims (the old values of) the stream each time a new element is added
:param approximate: decides wether to trim the stream immediately (False) or
later on (True)
:param additional_streams: additional streams to read from.
Each key is a stream name, value is a consumer id.
"""
Expand All @@ -193,6 +199,8 @@ def __init__(
self.consumer_id = consumer_id
self.mkstream = mkstream
self.block = xread_block
self.maxlen = maxlen
self.approximate = approximate
self.additional_streams = additional_streams or {}

async def _declare_consumer_group(self) -> None:
Expand Down Expand Up @@ -223,7 +231,12 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to append.
"""
async with self._acquire_master_conn() as redis_conn:
await redis_conn.xadd(self.queue_name, {b"data": message.message})
await redis_conn.xadd(
self.queue_name,
{b"data": message.message},
maxlen=self.maxlen,
approximate=self.approximate,
)

def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
async def _ack() -> None:
Expand Down
97 changes: 96 additions & 1 deletion tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Tuple, Union

import pytest
from redis.asyncio import Redis
from taskiq import AckableMessage, AsyncBroker, BrokerMessage

from taskiq_redis import (
Expand Down Expand Up @@ -316,7 +317,7 @@ async def test_streams_sentinel_broker(
redis_sentinel_master_name: str,
) -> None:
"""
Test that messages are published and read correctly by ListQueueSentinelBroker.
Test that messages are published and read correctly by RedisStreamSentinelBroker.

We create two workers that listen and send a message to them.
Expect only one worker to receive the same message we sent.
Expand All @@ -338,3 +339,97 @@ async def test_streams_sentinel_broker(
await result.ack() # type: ignore
worker_task.cancel()
await broker.shutdown()


@pytest.mark.anyio
async def test_maxlen_in_stream_broker(
redis_url: str,
valid_broker_message: BrokerMessage,
) -> None:
"""
Test that maxlen parameter works correctly in RedisStreamBroker.

We create RedisStreamBroker, fill in them with messages in the amount of
> maxlen and check that only maxlen messages are in the stream.
"""
maxlen = 20

broker = RedisStreamBroker(
url=redis_url,
maxlen=maxlen,
approximate=False,
queue_name=uuid.uuid4().hex,
consumer_group_name=uuid.uuid4().hex,
)

await broker.startup()

for _ in range(maxlen * 2):
await broker.kick(valid_broker_message)

async with Redis(connection_pool=broker.connection_pool) as redis:
assert await redis.xlen(broker.queue_name) == maxlen
await broker.shutdown()


@pytest.mark.anyio
async def test_maxlen_in_cluster_stream_broker(
redis_cluster_url: str,
valid_broker_message: BrokerMessage,
) -> None:
"""
Test that maxlen parameter works correctly in RedisStreamClusterBroker.

We create RedisStreamClusterBroker, fill it with messages in the amount of
> maxlen and check that only maxlen messages are in the stream.
"""
maxlen = 20

broker = RedisStreamClusterBroker(
maxlen=maxlen,
approximate=False,
url=redis_cluster_url,
queue_name=uuid.uuid4().hex,
consumer_group_name=uuid.uuid4().hex,
)

await broker.startup()

for _ in range(maxlen * 2):
await broker.kick(valid_broker_message)

assert await broker.redis.xlen(broker.queue_name) == maxlen
await broker.shutdown()


@pytest.mark.anyio
async def test_maxlen_in_sentinel_stream_broker(
redis_sentinel_master_name: str,
redis_sentinels: List[Tuple[str, int]],
valid_broker_message: BrokerMessage,
) -> None:
"""
Test that maxlen parameter works correctly in RedisStreamSentinelBroker.

We create RedisStreamSentinelBroker, fill it with messages in the amount of
> maxlen and check that only maxlen messages are in the stream.
"""
maxlen = 20

broker = RedisStreamSentinelBroker(
maxlen=maxlen,
approximate=False,
sentinels=redis_sentinels,
queue_name=uuid.uuid4().hex,
consumer_group_name=uuid.uuid4().hex,
master_name=redis_sentinel_master_name,
)

await broker.startup()

for _ in range(maxlen * 2):
await broker.kick(valid_broker_message)

async with broker._acquire_master_conn() as redis_conn:
assert await redis_conn.xlen(broker.queue_name) == maxlen
await broker.shutdown()