一个稳定可靠的 RabbitMQ 连接器,提供简单易用的 API 接口。
- 自动重连机制,确保连接稳定性
- 简洁的装饰器 API,使消息监听更加直观
- 支持环境变量配置
- 线程安全设计
- 完善的错误处理和日志记录
- 支持消息确认机制
- 修复连接泄漏问题,确保生产环境稳定性
- 支持上下文管理器,自动资源清理
pip install use-rabbitmq
或者使用 Poetry:
poetry add use-rabbitmq
from use_rabbitmq import RabbitMQStore
# 推荐:使用上下文管理器,自动清理资源
with RabbitMQStore(
host="localhost",
port=5672,
username="guest",
password="guest"
) as rmq:
# 发送消息
rmq.send(queue_name="test_queue", message="Hello, RabbitMQ!")
# 获取消息数量
count = rmq.get_message_counts("test_queue")
print(f"队列中有 {count} 条消息")
# 连接会自动关闭,避免连接泄漏
from use_rabbitmq import RabbitMQStore
# 创建 RabbitMQ 连接
rmq = RabbitMQStore(
host="localhost",
port=5672,
username="guest",
password="guest"
)
try:
# 发送消息
rmq.send(queue_name="test_queue", message="Hello, RabbitMQ!")
# 使用装饰器监听队列
@rmq.listener(queue_name="test_queue")
def process_message(message):
print(f"收到消息: {message.body}")
message.ack() # 确认消息已处理
finally:
# 重要:手动关闭连接,避免连接泄漏
rmq.shutdown()
from use_rabbitmq import useRabbitMQ, useRabbitListener
# 使用别名创建连接
mq = useRabbitMQ(
host="localhost",
port=5672,
username="admin",
password="admin"
)
# 方式 1: 使用实例的 listener 方法
@mq.listener(queue_name="test_queue")
def do_something(message):
print(message.body)
message.ack()
# 方式 2: 使用 useRabbitListener 装饰器
@useRabbitListener(mq, queue_name="another_queue")
def another_handler(message):
print(message.body)
message.ack()
可以通过环境变量配置连接参数:
# 环境变量优先级:
# RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD
from use_rabbitmq import RabbitMQStore
# 将使用环境变量中的配置
rmq = RabbitMQStore()
# 声明队列
rmq.declare_queue("my_queue", durable=True)
# 获取队列消息数量
count = rmq.get_message_counts("my_queue")
print(f"队列中有 {count} 条消息")
# 清空队列
rmq.flush_queue("my_queue")
# 停止监听
rmq.stop_listener("my_queue")
# 关闭连接
rmq.shutdown()
# 发送带属性的消息
properties = {
'content_type': 'application/json',
'headers': {'source': 'my_app'}
}
rmq.send(
queue_name="my_queue",
message='{"data": "test"}',
properties=properties
)
# 配置预取数量和其他参数
@rmq.listener(queue_name="my_queue", prefetch=10, no_ack=False)
def process_batch(message):
# 处理消息
print(message.body)
message.ack()
在生产环境中,强烈建议使用以下方式之一来确保连接被正确关闭:
with RabbitMQStore() as rmq:
# 你的代码
pass
# 连接自动关闭
rmq = RabbitMQStore()
try:
# 你的代码
pass
finally:
rmq.shutdown() # 确保连接被关闭
本版本修复了以下连接泄漏问题:
- ✅ 线程安全的连接管理
- ✅ 异常情况下的资源清理
- ✅ 正确的连接和通道关闭顺序
- ✅ 消费者停止机制优化
- ✅ 析构函数异常处理
该库内置了自动重试和重连机制:
- 连接错误自动重试
- 发送消息失败自动重试
- 消费者断线自动重连
- 资源清理异常处理
欢迎提交 Issue 和 Pull Request!