Skip to content

Unexpected ACK received for message-id [null] #89

Closed
@pradoz

Description

@pradoz

I am using Amazon MQ with ActiveMQ engine and seeing an error message where message-id is null when auto ACKs are sent back to the broker. After restarting the program, I will see the client reading the same amount of successful/error messages as if nothing was consumed by the on_message handler

Minimal repro:

message_count = 0
error_count = 0
start = time.time()

async def on_message(message_frame: stompman.MessageFrame) -> None:
    global message_count
    now = time.time()
    message_count += 1
    if message_count % 1000 == 0:
        elapsed = now - start
        print(f"Received {message_count} in {elapsed:.6f}s")


def handle_error(error_frame) -> None:
    global error_count
    error_count += 1
    if error_count % 100 == 0:
        print("Errors:", error_count)

def handle_suppressed_exception(exception: Exception, message_frame: stompman.MessageFrame) -> None:
    print(f"caught an exception, perhaps, producer is not friendly: {message_frame.body=!r} {exception=}")  # noqa: T201

async def main(workers):
    print("[main] entered", flush=True)
    destinations: list = await get_ingest_configurations()
    print(f"[main] destinations={destinations}", flush=True)
    broker_info: dict = await get_broker_information()
    print(f"[main] broker_info={broker_info}", flush=True)
    parsed_url: urllib.parse.ParseResult = urllib.parse.urlparse(broker_info["endpoint_stomp"])
    amq_hosts: list = [(parsed_url.hostname, parsed_url.port)]
    username, password = broker_info["user"], broker_info["password"]
    print(f"username: {username}")
    print(f"password: {password}")
    global error_count
    async with stompman.Client(
        servers=[
            stompman.ConnectionParameters(host=host, port=port, login=username, passcode=password)
            for host, port in amq_hosts
        ],

        # Handlers:
        on_error_frame=lambda error_frame: handle_error(error_frame),
        # on_heartbeat=lambda: print("Server sent a heartbeat"),  # also can be async

        # SSL Context can be either `None` (default), `True`, or `ssl.SSLContext'
        ssl=True,

        # Optional parameters with sensible defaults:
        heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
        connect_retry_attempts=3,
        connect_retry_interval=1,
        connect_timeout=2,
        connection_confirmation_timeout=2,
        disconnect_confirmation_timeout=2,
        read_timeout=2,
        write_retry_attempts=3,
    ) as client:
        producers = [client.subscribe(f"/queue/{TOPIC_NAME}", on_message, on_suppressed_exception=print) for i in range(2)]
        await asyncio.gather(*producers)

        print('subscribed')

Stacktrace:

org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [null]
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:477)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:239)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:172)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:73)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.base/java.lang.Thread.run(Thread.java:829)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions