Skip to content

Commit 0d62e3b

Browse files
authored
Merge pull request #93 from taskiq-python/feat/add-maxlen-to-stream-brokers
feat: add maxlen to stream sentinel and cluster brokers
2 parents 513b77e + f4ed075 commit 0d62e3b

File tree

4 files changed

+129
-3
lines changed

4 files changed

+129
-3
lines changed

taskiq_redis/redis_broker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def __init__(
166166
mkstream: bool = True,
167167
xread_block: int = 2000,
168168
maxlen: Optional[int] = None,
169+
approximate: bool = True,
169170
idle_timeout: int = 600000, # 10 minutes
170171
unacknowledged_batch_size: int = 100,
171172
xread_count: Optional[int] = 100,
@@ -190,6 +191,8 @@ def __init__(
190191
Better to set it to a bigger value, to avoid unnecessary calls.
191192
:param maxlen: sets the maximum length of the stream
192193
trims (the old values of) the stream each time a new element is added
194+
:param approximate: decides wether to trim the stream immediately (False) or
195+
later on (True)
193196
:param xread_count: number of messages to fetch from the stream at once.
194197
:param additional_streams: additional streams to read from.
195198
Each key is a stream name, value is a consumer id.
@@ -210,6 +213,7 @@ def __init__(
210213
self.mkstream = mkstream
211214
self.block = xread_block
212215
self.maxlen = maxlen
216+
self.approximate = approximate
213217
self.additional_streams = additional_streams or {}
214218
self.idle_timeout = idle_timeout
215219
self.unacknowledged_batch_size = unacknowledged_batch_size
@@ -252,6 +256,7 @@ async def kick(self, message: BrokerMessage) -> None:
252256
self.queue_name,
253257
{b"data": message.message},
254258
maxlen=self.maxlen,
259+
approximate=self.approximate,
255260
)
256261

257262
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:

taskiq_redis/redis_cluster_broker.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ def __init__(
9292
consumer_id: str = "$",
9393
mkstream: bool = True,
9494
xread_block: int = 10000,
95+
maxlen: Optional[int] = None,
96+
approximate: bool = True,
9597
additional_streams: Optional[Dict[str, str]] = None,
9698
**connection_kwargs: Any,
9799
) -> None:
@@ -111,6 +113,10 @@ def __init__(
111113
:param mkstream: create stream if it does not exist.
112114
:param xread_block: block time in ms for xreadgroup.
113115
Better to set it to a bigger value, to avoid unnecessary calls.
116+
:param maxlen: sets the maximum length of the stream
117+
trims (the old values of) the stream each time a new element is added
118+
:param approximate: decides wether to trim the stream immediately (False) or
119+
later on (True)
114120
:param additional_streams: additional streams to read from.
115121
Each key is a stream name, value is a consumer id.
116122
"""
@@ -125,6 +131,8 @@ def __init__(
125131
self.consumer_id = consumer_id
126132
self.mkstream = mkstream
127133
self.block = xread_block
134+
self.maxlen = maxlen
135+
self.approximate = approximate
128136
self.additional_streams = additional_streams or {}
129137

130138
async def _declare_consumer_group(self) -> None:
@@ -154,7 +162,12 @@ async def kick(self, message: BrokerMessage) -> None:
154162
155163
:param message: message to append.
156164
"""
157-
await self.redis.xadd(self.queue_name, {b"data": message.message})
165+
await self.redis.xadd(
166+
self.queue_name,
167+
{b"data": message.message},
168+
maxlen=self.maxlen,
169+
approximate=self.approximate,
170+
)
158171

159172
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
160173
async def _ack() -> None:

taskiq_redis/redis_sentinel_broker.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ def __init__(
157157
consumer_id: str = "$",
158158
mkstream: bool = True,
159159
xread_block: int = 10000,
160+
maxlen: Optional[int] = None,
161+
approximate: bool = True,
160162
additional_streams: Optional[Dict[str, str]] = None,
161163
**connection_kwargs: Any,
162164
) -> None:
@@ -176,6 +178,10 @@ def __init__(
176178
:param mkstream: create stream if it does not exist.
177179
:param xread_block: block time in ms for xreadgroup.
178180
Better to set it to a bigger value, to avoid unnecessary calls.
181+
:param maxlen: sets the maximum length of the stream
182+
trims (the old values of) the stream each time a new element is added
183+
:param approximate: decides wether to trim the stream immediately (False) or
184+
later on (True)
179185
:param additional_streams: additional streams to read from.
180186
Each key is a stream name, value is a consumer id.
181187
"""
@@ -193,6 +199,8 @@ def __init__(
193199
self.consumer_id = consumer_id
194200
self.mkstream = mkstream
195201
self.block = xread_block
202+
self.maxlen = maxlen
203+
self.approximate = approximate
196204
self.additional_streams = additional_streams or {}
197205

198206
async def _declare_consumer_group(self) -> None:
@@ -223,7 +231,12 @@ async def kick(self, message: BrokerMessage) -> None:
223231
:param message: message to append.
224232
"""
225233
async with self._acquire_master_conn() as redis_conn:
226-
await redis_conn.xadd(self.queue_name, {b"data": message.message})
234+
await redis_conn.xadd(
235+
self.queue_name,
236+
{b"data": message.message},
237+
maxlen=self.maxlen,
238+
approximate=self.approximate,
239+
)
227240

228241
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
229242
async def _ack() -> None:

tests/test_broker.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import List, Tuple, Union
44

55
import pytest
6+
from redis.asyncio import Redis
67
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
78

89
from taskiq_redis import (
@@ -316,7 +317,7 @@ async def test_streams_sentinel_broker(
316317
redis_sentinel_master_name: str,
317318
) -> None:
318319
"""
319-
Test that messages are published and read correctly by ListQueueSentinelBroker.
320+
Test that messages are published and read correctly by RedisStreamSentinelBroker.
320321
321322
We create two workers that listen and send a message to them.
322323
Expect only one worker to receive the same message we sent.
@@ -338,3 +339,97 @@ async def test_streams_sentinel_broker(
338339
await result.ack() # type: ignore
339340
worker_task.cancel()
340341
await broker.shutdown()
342+
343+
344+
@pytest.mark.anyio
345+
async def test_maxlen_in_stream_broker(
346+
redis_url: str,
347+
valid_broker_message: BrokerMessage,
348+
) -> None:
349+
"""
350+
Test that maxlen parameter works correctly in RedisStreamBroker.
351+
352+
We create RedisStreamBroker, fill in them with messages in the amount of
353+
> maxlen and check that only maxlen messages are in the stream.
354+
"""
355+
maxlen = 20
356+
357+
broker = RedisStreamBroker(
358+
url=redis_url,
359+
maxlen=maxlen,
360+
approximate=False,
361+
queue_name=uuid.uuid4().hex,
362+
consumer_group_name=uuid.uuid4().hex,
363+
)
364+
365+
await broker.startup()
366+
367+
for _ in range(maxlen * 2):
368+
await broker.kick(valid_broker_message)
369+
370+
async with Redis(connection_pool=broker.connection_pool) as redis:
371+
assert await redis.xlen(broker.queue_name) == maxlen
372+
await broker.shutdown()
373+
374+
375+
@pytest.mark.anyio
376+
async def test_maxlen_in_cluster_stream_broker(
377+
redis_cluster_url: str,
378+
valid_broker_message: BrokerMessage,
379+
) -> None:
380+
"""
381+
Test that maxlen parameter works correctly in RedisStreamClusterBroker.
382+
383+
We create RedisStreamClusterBroker, fill it with messages in the amount of
384+
> maxlen and check that only maxlen messages are in the stream.
385+
"""
386+
maxlen = 20
387+
388+
broker = RedisStreamClusterBroker(
389+
maxlen=maxlen,
390+
approximate=False,
391+
url=redis_cluster_url,
392+
queue_name=uuid.uuid4().hex,
393+
consumer_group_name=uuid.uuid4().hex,
394+
)
395+
396+
await broker.startup()
397+
398+
for _ in range(maxlen * 2):
399+
await broker.kick(valid_broker_message)
400+
401+
assert await broker.redis.xlen(broker.queue_name) == maxlen
402+
await broker.shutdown()
403+
404+
405+
@pytest.mark.anyio
406+
async def test_maxlen_in_sentinel_stream_broker(
407+
redis_sentinel_master_name: str,
408+
redis_sentinels: List[Tuple[str, int]],
409+
valid_broker_message: BrokerMessage,
410+
) -> None:
411+
"""
412+
Test that maxlen parameter works correctly in RedisStreamSentinelBroker.
413+
414+
We create RedisStreamSentinelBroker, fill it with messages in the amount of
415+
> maxlen and check that only maxlen messages are in the stream.
416+
"""
417+
maxlen = 20
418+
419+
broker = RedisStreamSentinelBroker(
420+
maxlen=maxlen,
421+
approximate=False,
422+
sentinels=redis_sentinels,
423+
queue_name=uuid.uuid4().hex,
424+
consumer_group_name=uuid.uuid4().hex,
425+
master_name=redis_sentinel_master_name,
426+
)
427+
428+
await broker.startup()
429+
430+
for _ in range(maxlen * 2):
431+
await broker.kick(valid_broker_message)
432+
433+
async with broker._acquire_master_conn() as redis_conn:
434+
assert await redis_conn.xlen(broker.queue_name) == maxlen
435+
await broker.shutdown()

0 commit comments

Comments
 (0)