You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am using Amazon MQ with ActiveMQ engine and seeing an error message where message-id is null when auto ACKs are sent back to the broker. After restarting the program, I will see the client reading the same amount of successful/error messages as if nothing was consumed by the on_message handler
Minimal repro:
message_count=0error_count=0start=time.time()
asyncdefon_message(message_frame: stompman.MessageFrame) ->None:
globalmessage_countnow=time.time()
message_count+=1ifmessage_count%1000==0:
elapsed=now-startprint(f"Received {message_count} in {elapsed:.6f}s")
defhandle_error(error_frame) ->None:
globalerror_counterror_count+=1iferror_count%100==0:
print("Errors:", error_count)
defhandle_suppressed_exception(exception: Exception, message_frame: stompman.MessageFrame) ->None:
print(f"caught an exception, perhaps, producer is not friendly: {message_frame.body=!r}{exception=}") # noqa: T201asyncdefmain(workers):
print("[main] entered", flush=True)
destinations: list=awaitget_ingest_configurations()
print(f"[main] destinations={destinations}", flush=True)
broker_info: dict=awaitget_broker_information()
print(f"[main] broker_info={broker_info}", flush=True)
parsed_url: urllib.parse.ParseResult=urllib.parse.urlparse(broker_info["endpoint_stomp"])
amq_hosts: list= [(parsed_url.hostname, parsed_url.port)]
username, password=broker_info["user"], broker_info["password"]
print(f"username: {username}")
print(f"password: {password}")
globalerror_countasyncwithstompman.Client(
servers=[
stompman.ConnectionParameters(host=host, port=port, login=username, passcode=password)
forhost, portinamq_hosts
],
# Handlers:on_error_frame=lambdaerror_frame: handle_error(error_frame),
# on_heartbeat=lambda: print("Server sent a heartbeat"), # also can be async# SSL Context can be either `None` (default), `True`, or `ssl.SSLContext'ssl=True,
# Optional parameters with sensible defaults:heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
connect_retry_attempts=3,
connect_retry_interval=1,
connect_timeout=2,
connection_confirmation_timeout=2,
disconnect_confirmation_timeout=2,
read_timeout=2,
write_retry_attempts=3,
) asclient:
producers= [client.subscribe(f"/queue/{TOPIC_NAME}", on_message, on_suppressed_exception=print) foriinrange(2)]
awaitasyncio.gather(*producers)
print('subscribed')
@vrslev Thank you for the incredibly quick turn around, I'm a true fan of stompman now because of your support. I will test upgrading to 1.6.0 and re running today
Uh oh!
There was an error while loading. Please reload this page.
I am using Amazon MQ with ActiveMQ engine and seeing an error message where message-id is null when auto ACKs are sent back to the broker. After restarting the program, I will see the client reading the same amount of successful/error messages as if nothing was consumed by the
on_message
handlerMinimal repro:
Stacktrace:
The text was updated successfully, but these errors were encountered: