Skip to content

Unexpected ACK received for message-id [null] #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
pradoz opened this issue Nov 5, 2024 · 3 comments · Fixed by #92
Closed

Unexpected ACK received for message-id [null] #89

pradoz opened this issue Nov 5, 2024 · 3 comments · Fixed by #92

Comments

@pradoz
Copy link

pradoz commented Nov 5, 2024

I am using Amazon MQ with ActiveMQ engine and seeing an error message where message-id is null when auto ACKs are sent back to the broker. After restarting the program, I will see the client reading the same amount of successful/error messages as if nothing was consumed by the on_message handler

Minimal repro:

message_count = 0
error_count = 0
start = time.time()

async def on_message(message_frame: stompman.MessageFrame) -> None:
    global message_count
    now = time.time()
    message_count += 1
    if message_count % 1000 == 0:
        elapsed = now - start
        print(f"Received {message_count} in {elapsed:.6f}s")


def handle_error(error_frame) -> None:
    global error_count
    error_count += 1
    if error_count % 100 == 0:
        print("Errors:", error_count)

def handle_suppressed_exception(exception: Exception, message_frame: stompman.MessageFrame) -> None:
    print(f"caught an exception, perhaps, producer is not friendly: {message_frame.body=!r} {exception=}")  # noqa: T201

async def main(workers):
    print("[main] entered", flush=True)
    destinations: list = await get_ingest_configurations()
    print(f"[main] destinations={destinations}", flush=True)
    broker_info: dict = await get_broker_information()
    print(f"[main] broker_info={broker_info}", flush=True)
    parsed_url: urllib.parse.ParseResult = urllib.parse.urlparse(broker_info["endpoint_stomp"])
    amq_hosts: list = [(parsed_url.hostname, parsed_url.port)]
    username, password = broker_info["user"], broker_info["password"]
    print(f"username: {username}")
    print(f"password: {password}")
    global error_count
    async with stompman.Client(
        servers=[
            stompman.ConnectionParameters(host=host, port=port, login=username, passcode=password)
            for host, port in amq_hosts
        ],

        # Handlers:
        on_error_frame=lambda error_frame: handle_error(error_frame),
        # on_heartbeat=lambda: print("Server sent a heartbeat"),  # also can be async

        # SSL Context can be either `None` (default), `True`, or `ssl.SSLContext'
        ssl=True,

        # Optional parameters with sensible defaults:
        heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
        connect_retry_attempts=3,
        connect_retry_interval=1,
        connect_timeout=2,
        connection_confirmation_timeout=2,
        disconnect_confirmation_timeout=2,
        read_timeout=2,
        write_retry_attempts=3,
    ) as client:
        producers = [client.subscribe(f"/queue/{TOPIC_NAME}", on_message, on_suppressed_exception=print) for i in range(2)]
        await asyncio.gather(*producers)

        print('subscribed')

Stacktrace:

org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [null]
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:477)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:239)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:172)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:73)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.base/java.lang.Thread.run(Thread.java:829)
@vrslev
Copy link
Collaborator

vrslev commented Nov 5, 2024

Hi! I encountered the same issue when adding an integration test update to run on ActiveMQ Classic in addition to ActiveMQ Artemis. Fixed in 1.6.0.

@pradoz
Copy link
Author

pradoz commented Nov 5, 2024

@vrslev Thank you for the incredibly quick turn around, I'm a true fan of stompman now because of your support. I will test upgrading to 1.6.0 and re running today

@vrslev
Copy link
Collaborator

vrslev commented Nov 5, 2024

That’s very nice to hear, thanks ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants