Skip to content

Commit 982b965

Browse files
committed
Add support for quorum queues and max_attempts_at_message
1 parent 8fdbc43 commit 982b965

File tree

5 files changed

+259
-23
lines changed

5 files changed

+259
-23
lines changed

.flake8

+8
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ ignore =
6868
WPS229,
6969
; Found function with too much cognitive complexity
7070
WPS231,
71+
; Found walrus operator
72+
WPS332
73+
; Found multiline conditions
74+
WPS337
75+
; Found multi-line function type annotation
76+
WPS320
77+
; Found `in` used with a non-set container
78+
WPS510
7179

7280
per-file-ignores =
7381
; all tests

README.md

+31-6
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,32 @@ async def main():
116116

117117
```
118118

119+
## Queue Types and Message Reliability
120+
121+
AioPikaBroker supports both classic and quorum queues. Quorum queues are a more modern queue type in RabbitMQ that provides better reliability and data safety guarantees.
122+
123+
```python
124+
from taskiq_aio_pika import AioPikaBroker, QueueType
125+
126+
broker = AioPikaBroker(
127+
queue_type=QueueType.QUORUM, # Use quorum queues for better reliability
128+
max_attempts_at_message=3 # Limit redelivery attempts
129+
)
130+
```
131+
132+
### Message Redelivery Control
133+
134+
When message processing fails due to consumer crashes (e.g. due to an OOM condition resulting in a SIGKILL), network issues, or other infrastructure problems, before the consumer has had the chance to acknowledge, positively or negatively, the message (and schedule a retry via taskiq's retry middleware), RabbitMQ will requeue the message to the front of the queue and it will be redelivered. With quorum queues, you can control how many times such a message will be redelivered:
135+
136+
- Set `max_attempts_at_message` to limit delivery attempts.
137+
- Set `max_attempts_at_message=None` for unlimited attempts.
138+
- This operates at the message delivery level, not application retry level. For application-level retries in case of exceptions that can be caught (e.g., temporary API failures), use taskiq's retry middleware instead.
139+
- After max attempts, the message is logged and discarded.
140+
- `max_attempts_at_message` requires using quorum queues (`queue_type=QueueType.QUORUM`).
141+
142+
This is particularly useful for preventing infinite loops of redeliveries of messages that consistently cause the consumer to crash ([poison messages](https://www.rabbitmq.com/docs/quorum-queues#poison-message-handling)) and can cause the queue to backup.
143+
144+
119145
## Configuration
120146

121147
AioPikaBroker parameters:
@@ -125,13 +151,12 @@ AioPikaBroker parameters:
125151
* `exchange_name` - name of exchange that used to send messages.
126152
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
127153
* `queue_name` - queue that used to get incoming messages.
154+
* `queue_type` - type of RabbitMQ queue to use: `classic` or `quorum`. defaults to `classic`.
128155
* `routing_key` - that used to bind that queue to the exchange.
129156
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
130157
* `max_priority` - maximum priority for messages.
131-
* `delay_queue_name` - custom delay queue name.
132-
This queue is used to deliver messages with delays.
133-
* `dead_letter_queue_name` - custom dead letter queue name.
134-
This queue is used to receive negatively acknowleged messages from the main queue.
158+
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
159+
* `dead_letter_queue_name` - custom dead letter queue name. This queue is used to receive negatively acknowleged messages from the main queue.
135160
* `qos` - number of messages that worker can prefetch.
136-
* `declare_queues` - whether you want to declare queues even on
137-
client side. May be useful for message persistance.
161+
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistance.
162+
* `max_attempts_at_message` - maximum number of attempts at processing the same message. requires the queue type to be set to `QueueType.QUORUM`. defaults to `20` for quorum queues and to `None` for classic queues. is not the same as task retries. pass `None` for unlimited attempts.

taskiq_aio_pika/broker.py

+98-13
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,32 @@
11
import asyncio
2+
import copy
23
from datetime import timedelta
4+
from enum import Enum
35
from logging import getLogger
4-
from typing import Any, AsyncGenerator, Callable, Dict, Optional, TypeVar
6+
from typing import Any, AsyncGenerator, Callable, Dict, Literal, Optional, TypeVar
57

68
from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
79
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
8-
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
10+
from taskiq import (
11+
AckableMessage,
12+
AckableMessageWithDeliveryCount,
13+
AsyncBroker,
14+
AsyncResultBackend,
15+
BrokerMessage,
16+
)
917

1018
_T = TypeVar("_T") # noqa: WPS111
1119

1220
logger = getLogger("taskiq.aio_pika_broker")
1321

1422

23+
class QueueType(Enum):
24+
"""Type of RabbitMQ queue."""
25+
26+
CLASSIC = "classic"
27+
QUORUM = "quorum"
28+
29+
1530
def parse_val(
1631
parse_func: Callable[[str], _T],
1732
target: Optional[str] = None,
@@ -35,7 +50,7 @@ def parse_val(
3550
class AioPikaBroker(AsyncBroker):
3651
"""Broker that works with RabbitMQ."""
3752

38-
def __init__( # noqa: WPS211
53+
def __init__( # noqa: C901, WPS211
3954
self,
4055
url: Optional[str] = None,
4156
result_backend: Optional[AsyncResultBackend[_T]] = None,
@@ -44,6 +59,7 @@ def __init__( # noqa: WPS211
4459
loop: Optional[asyncio.AbstractEventLoop] = None,
4560
exchange_name: str = "taskiq",
4661
queue_name: str = "taskiq",
62+
queue_type: QueueType = QueueType.CLASSIC,
4763
dead_letter_queue_name: Optional[str] = None,
4864
delay_queue_name: Optional[str] = None,
4965
declare_exchange: bool = True,
@@ -54,6 +70,7 @@ def __init__( # noqa: WPS211
5470
delayed_message_exchange_plugin: bool = False,
5571
declare_exchange_kwargs: Optional[Dict[Any, Any]] = None,
5672
declare_queues_kwargs: Optional[Dict[Any, Any]] = None,
73+
max_attempts_at_message: Optional[int] | Literal["default"] = "default",
5774
**connection_kwargs: Any,
5875
) -> None:
5976
"""
@@ -62,12 +79,13 @@ def __init__( # noqa: WPS211
6279
:param url: url to rabbitmq. If None,
6380
the default "amqp://guest:guest@localhost:5672" is used.
6481
:param result_backend: custom result backend.
65-
6682
:param task_id_generator: custom task_id genertaor.
6783
:param qos: number of messages that worker can prefetch.
6884
:param loop: specific even loop.
6985
:param exchange_name: name of exchange that used to send messages.
7086
:param queue_name: queue that used to get incoming messages.
87+
:param queue_type: type of RabbitMQ queue to use: `classic` or `quorum`.
88+
defaults to `classic`.
7189
:param dead_letter_queue_name: custom name for dead-letter queue.
7290
by default it set to {queue_name}.dead_letter.
7391
:param delay_queue_name: custom name for queue that used to
@@ -86,6 +104,11 @@ def __init__( # noqa: WPS211
86104
:param declare_queues_kwargs: additional from AbstractChannel.declare_queue
87105
:param connection_kwargs: additional keyword arguments,
88106
for connect_robust method of aio-pika.
107+
:param max_attempts_at_message: maximum number of attempts at processing
108+
the same message. requires the queue type to be set to `QueueType.QUORUM`.
109+
defaults to `20` for quorum queues and to `None` for classic queues.
110+
is not the same as task retries. pass `None` for unlimited attempts.
111+
:raises ValueError: if inappropriate arguments were passed.
89112
"""
90113
super().__init__(result_backend, task_id_generator)
91114

@@ -104,6 +127,52 @@ def __init__( # noqa: WPS211
104127
self._max_priority = max_priority
105128
self._delayed_message_exchange_plugin = delayed_message_exchange_plugin
106129

130+
if self._declare_queues_kwargs.get("arguments", {}).get(
131+
"x-queue-type",
132+
) or self._declare_queues_kwargs.get("arguments", {}).get("x-delivery-limit"):
133+
raise ValueError(
134+
"Use the `queue_type` and `max_attempts_at_message` parameters of "
135+
"`AioPikaBroker.__init__` instead of `x-queue-type` and "
136+
"`x-delivery-limit`",
137+
)
138+
if queue_type == QueueType.QUORUM:
139+
self._declare_queues_kwargs.setdefault("arguments", {})[
140+
"x-queue-type"
141+
] = "quorum"
142+
self._declare_queues_kwargs["durable"] = True
143+
else:
144+
self._declare_queues_kwargs.setdefault("arguments", {})[
145+
"x-queue-type"
146+
] = "classic"
147+
148+
if queue_type != QueueType.QUORUM and max_attempts_at_message not in (
149+
"default",
150+
None,
151+
):
152+
raise ValueError(
153+
"`max_attempts_at_message` requires `queue_type` to be set to "
154+
"`QueueType.QUORUM`.",
155+
)
156+
157+
if max_attempts_at_message == "default":
158+
if queue_type == QueueType.QUORUM:
159+
self.max_attempts_at_message = 20
160+
else:
161+
self.max_attempts_at_message = None
162+
else:
163+
self.max_attempts_at_message = max_attempts_at_message
164+
165+
if queue_type == QueueType.QUORUM:
166+
if self.max_attempts_at_message is None:
167+
# no limit
168+
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = "-1"
169+
else:
170+
# the final attempt will be handled in `taskiq.Receiver`
171+
# to generate visible logs
172+
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = (
173+
self.max_attempts_at_message + 1
174+
)
175+
107176
self._dead_letter_queue_name = f"{queue_name}.dead_letter"
108177
if dead_letter_queue_name:
109178
self._dead_letter_queue_name = dead_letter_queue_name
@@ -183,9 +252,15 @@ async def declare_queues(
183252
:param channel: channel to used for declaration.
184253
:return: main queue instance.
185254
"""
255+
declare_queues_kwargs_ex_arguments = copy.copy(self._declare_queues_kwargs)
256+
declare_queue_arguments = declare_queues_kwargs_ex_arguments.pop(
257+
"arguments",
258+
{},
259+
)
186260
await channel.declare_queue(
187261
self._dead_letter_queue_name,
188-
**self._declare_queues_kwargs,
262+
**declare_queues_kwargs_ex_arguments,
263+
arguments=declare_queue_arguments,
189264
)
190265
args: "Dict[str, Any]" = {
191266
"x-dead-letter-exchange": "",
@@ -195,8 +270,8 @@ async def declare_queues(
195270
args["x-max-priority"] = self._max_priority
196271
queue = await channel.declare_queue(
197272
self._queue_name,
198-
arguments=args,
199-
**self._declare_queues_kwargs,
273+
arguments=args | declare_queue_arguments,
274+
**declare_queues_kwargs_ex_arguments,
200275
)
201276
if self._delayed_message_exchange_plugin:
202277
await queue.bind(
@@ -209,8 +284,9 @@ async def declare_queues(
209284
arguments={
210285
"x-dead-letter-exchange": "",
211286
"x-dead-letter-routing-key": self._queue_name,
212-
},
213-
**self._declare_queues_kwargs,
287+
}
288+
| declare_queue_arguments,
289+
**declare_queues_kwargs_ex_arguments,
214290
)
215291

216292
await queue.bind(
@@ -291,7 +367,16 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
291367
queue = await self.declare_queues(self.read_channel)
292368
async with queue.iterator() as iterator:
293369
async for message in iterator:
294-
yield AckableMessage(
295-
data=message.body,
296-
ack=message.ack,
297-
)
370+
if (
371+
delivery_count := message.headers.get("x-delivery-count")
372+
) is not None:
373+
yield AckableMessageWithDeliveryCount(
374+
data=message.body,
375+
ack=message.ack,
376+
delivery_count=delivery_count,
377+
)
378+
else:
379+
yield AckableMessage(
380+
data=message.body,
381+
ack=message.ack,
382+
)

tests/conftest.py

+11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from contextlib import suppress
23
from typing import AsyncGenerator
34
from uuid import uuid4
45

@@ -229,3 +230,13 @@ async def broker_with_delayed_message_plugin(
229230
if_empty=False,
230231
if_unused=False,
231232
)
233+
234+
235+
@pytest.fixture(autouse=True, scope="function")
236+
async def cleanup_rabbitmq(test_channel: Channel) -> AsyncGenerator[None, None]:
237+
yield
238+
239+
for queue_name in ["taskiq", "taskiq.dead_letter", "taskiq.delay"]:
240+
with suppress(Exception):
241+
queue = await test_channel.get_queue(queue_name, ensure=False)
242+
await queue.delete(if_unused=False, if_empty=False)

0 commit comments

Comments
 (0)