Closed
Description
I am using Amazon MQ with ActiveMQ engine and seeing an error message where message-id is null when auto ACKs are sent back to the broker. After restarting the program, I will see the client reading the same amount of successful/error messages as if nothing was consumed by the on_message
handler
Minimal repro:
message_count = 0
error_count = 0
start = time.time()
async def on_message(message_frame: stompman.MessageFrame) -> None:
global message_count
now = time.time()
message_count += 1
if message_count % 1000 == 0:
elapsed = now - start
print(f"Received {message_count} in {elapsed:.6f}s")
def handle_error(error_frame) -> None:
global error_count
error_count += 1
if error_count % 100 == 0:
print("Errors:", error_count)
def handle_suppressed_exception(exception: Exception, message_frame: stompman.MessageFrame) -> None:
print(f"caught an exception, perhaps, producer is not friendly: {message_frame.body=!r} {exception=}") # noqa: T201
async def main(workers):
print("[main] entered", flush=True)
destinations: list = await get_ingest_configurations()
print(f"[main] destinations={destinations}", flush=True)
broker_info: dict = await get_broker_information()
print(f"[main] broker_info={broker_info}", flush=True)
parsed_url: urllib.parse.ParseResult = urllib.parse.urlparse(broker_info["endpoint_stomp"])
amq_hosts: list = [(parsed_url.hostname, parsed_url.port)]
username, password = broker_info["user"], broker_info["password"]
print(f"username: {username}")
print(f"password: {password}")
global error_count
async with stompman.Client(
servers=[
stompman.ConnectionParameters(host=host, port=port, login=username, passcode=password)
for host, port in amq_hosts
],
# Handlers:
on_error_frame=lambda error_frame: handle_error(error_frame),
# on_heartbeat=lambda: print("Server sent a heartbeat"), # also can be async
# SSL Context can be either `None` (default), `True`, or `ssl.SSLContext'
ssl=True,
# Optional parameters with sensible defaults:
heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
connect_retry_attempts=3,
connect_retry_interval=1,
connect_timeout=2,
connection_confirmation_timeout=2,
disconnect_confirmation_timeout=2,
read_timeout=2,
write_retry_attempts=3,
) as client:
producers = [client.subscribe(f"/queue/{TOPIC_NAME}", on_message, on_suppressed_exception=print) for i in range(2)]
await asyncio.gather(*producers)
print('subscribed')
Stacktrace:
org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [null]
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:477)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:239)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:172)
at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:73)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.base/java.lang.Thread.run(Thread.java:829)
Metadata
Metadata
Assignees
Labels
No labels