Skip to content

Commit 6014b67

Browse files
authored
Fix acks not on ActiveMQ Artemis: use ack MESSAGE frame header, not message-id (#92)
1 parent 5d996b5 commit 6014b67

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

stompman/subscription.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,24 @@ async def _run_handler(self, *, frame: MessageFrame) -> None:
5050
try:
5151
await self.handler(frame)
5252
except self.suppressed_exception_classes as exception:
53-
if self._should_handle_ack_nack and self.id in self._active_subscriptions:
53+
if (
54+
self._should_handle_ack_nack
55+
and self.id in self._active_subscriptions
56+
and (ack_id := frame.headers["ack"])
57+
):
5458
await self._connection_manager.maybe_write_frame(
55-
NackFrame(
56-
headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]}
57-
)
59+
NackFrame(headers={"id": ack_id, "subscription": frame.headers["subscription"]})
5860
)
5961
self.on_suppressed_exception(exception, frame)
6062
else:
61-
if self._should_handle_ack_nack and self.id in self._active_subscriptions:
63+
if (
64+
self._should_handle_ack_nack
65+
and self.id in self._active_subscriptions
66+
and (ack_id := frame.headers["ack"])
67+
):
6268
await self._connection_manager.maybe_write_frame(
6369
AckFrame(
64-
headers={"id": frame.headers["message-id"], "subscription": frame.headers["subscription"]},
70+
headers={"id": ack_id, "subscription": frame.headers["subscription"]},
6571
)
6672
)
6773

tests/test_subscription.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ async def test_client_listen_unsubscribe_before_ack_or_nack(
223223
async def test_client_listen_ack_nack_sent(
224224
monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, ack: AckMode, *, ok: bool
225225
) -> None:
226-
subscription_id, destination, message_id = faker.pystr(), faker.pystr(), faker.pystr()
226+
subscription_id, destination, ack_id = faker.pystr(), faker.pystr(), faker.pystr()
227227
monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id))
228228

229229
message_frame = build_dataclass(
230-
MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id}
230+
MessageFrame, headers={"destination": destination, "ack": ack_id, "subscription": subscription_id}
231231
)
232232
connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame]))
233233
message_handler = mock.AsyncMock(side_effect=None if ok else SomeError)
@@ -244,9 +244,9 @@ async def test_client_listen_ack_nack_sent(
244244
assert collected_frames == enrich_expected_frames(
245245
SubscribeFrame(headers={"id": subscription_id, "destination": destination, "ack": ack}),
246246
message_frame,
247-
AckFrame(headers={"id": message_id, "subscription": subscription_id})
247+
AckFrame(headers={"id": ack_id, "subscription": subscription_id})
248248
if ok
249-
else NackFrame(headers={"id": message_id, "subscription": subscription_id}),
249+
else NackFrame(headers={"id": ack_id, "subscription": subscription_id}),
250250
UnsubscribeFrame(headers={"id": subscription_id}),
251251
)
252252

0 commit comments

Comments
 (0)