Skip to content

Can aiomqtt queue has a ring buffer option, for high frequense in-comming messages? #299

@diamond2nv

Description

@diamond2nv

Mostly, aiomqtt help done the work asyncly and nicely.
However, recently when trying to subscribe a sensor-mqtt-topic for sending these messages to a unix domain socket,I got a blocking outcome, with lots "mqtt queue logger.warning".

Of cause,these messages were about 90 Hz in-coming, with QoS=0。

But, maybe a suggestion is there :

Can aiomqtt queue has a ring buffer option, for high frequency in-coming messages?

Next is:

  1. relate aiomqq client.py code:
def _on_message(
        self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
    ) -> None:
        # Convert the paho.mqtt message into our own Message type
        m = Message._from_paho_message(message)  # noqa: SLF001
        # Put the message in the message queue
        try:
            self._queue.put_nowait(m)
        except asyncio.QueueFull:
            self._logger.warning("Message queue is full. Discarding message.")
  1. try to have a ring buffer (Now not async....this is the problem we want to have your suggestion....thanks)
from collections import deque
...
import asyncio
import aiomqtt
BUFFER_SIZE = 10  
mqtt_to_unix_buffer = deque(maxlen=BUFFER_SIZE) 
...

async def forward_mqtt_to_unix(mqtt_client, unix_reader, unix_writer):  
    global mqtt_to_unix_count  
    async with mqtt_client.filtered_messages(COMMAND_TOPIC) as messages:  
        async for message in messages:  
            data = json.loads(message.payload)  
            mqtt_to_unix_buffer.append(data)  
            mqtt_to_unix_count += 1 
            try:  
                await unix_writer.write(json.dumps(data).encode())  
                await unix_writer.drain()  
            except Exception as e:  
                print(f"Error sending to Unix Socket: {e}")  
                break  

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions