-
Notifications
You must be signed in to change notification settings - Fork 84
Closed
Description
Mostly, aiomqtt help done the work asyncly and nicely.
However, recently when trying to subscribe a sensor-mqtt-topic for sending these messages to a unix domain socket,I got a blocking outcome, with lots "mqtt queue logger.warning".
Of cause,these messages were about 90 Hz in-coming, with QoS=0。
But, maybe a suggestion is there :
Can aiomqtt queue has a ring buffer option, for high frequency in-coming messages?
Next is:
- relate aiomqq client.py code:
def _on_message(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None:
# Convert the paho.mqtt message into our own Message type
m = Message._from_paho_message(message) # noqa: SLF001
# Put the message in the message queue
try:
self._queue.put_nowait(m)
except asyncio.QueueFull:
self._logger.warning("Message queue is full. Discarding message.")
- try to have a ring buffer (Now not async....this is the problem we want to have your suggestion....thanks)
from collections import deque
...
import asyncio
import aiomqtt
BUFFER_SIZE = 10
mqtt_to_unix_buffer = deque(maxlen=BUFFER_SIZE)
...
async def forward_mqtt_to_unix(mqtt_client, unix_reader, unix_writer):
global mqtt_to_unix_count
async with mqtt_client.filtered_messages(COMMAND_TOPIC) as messages:
async for message in messages:
data = json.loads(message.payload)
mqtt_to_unix_buffer.append(data)
mqtt_to_unix_count += 1
try:
await unix_writer.write(json.dumps(data).encode())
await unix_writer.drain()
except Exception as e:
print(f"Error sending to Unix Socket: {e}")
break
Metadata
Metadata
Assignees
Labels
No labels