Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f5c703b
Add logging for failed ack and nack operations and introduce subscrip…
vrslev Aug 8, 2025
9a09417
Add asyncio sleep calls to improve test timing stability
vrslev Aug 8, 2025
633734b
Verify log entry is created when unsubscribing before ack or nack
vrslev Aug 8, 2025
d01be6e
Fix log message formatting by splitting long string for clarity in na…
vrslev Aug 8, 2025
cc149b6
Add logging for server heartbeat timeout detection and simplify NoopL…
vrslev Aug 8, 2025
80d87ee
Refactor ConnectionLost to ConnectionLostOnLifespanEnter for clarity
vrslev Aug 8, 2025
c9d9938
Use reconnecting connection state method in manager initialization an…
vrslev Aug 8, 2025
2495fa3
Refactor connection state handling to use a dedicated creation method
vrslev Aug 8, 2025
bca03e0
Refactor connection state initialization to handle reconnection logic…
vrslev Aug 8, 2025
f3ff6cc
Add connection_parameters field to ConnectionLifespan dataclass
vrslev Aug 8, 2025
dadcd90
Import Final directly from typing to simplify annotation
vrslev Aug 8, 2025
219d4e2
Expose the logger in the public API and add connection state logging …
vrslev Aug 8, 2025
d2fe8be
Rename method `_get_active_connection_state_reconnecting` to `_get_ac…
vrslev Aug 8, 2025
2c13be9
Implement test for handling message acknowledgment without ack header…
vrslev Aug 8, 2025
a357be8
Refactor logging module to logger.py and update imports
vrslev Aug 8, 2025
d0f8b66
Fix active connection closure by removing redundant close call
vrslev Aug 8, 2025
ed44c3f
Add trailing comma to logger in __all__ list for consistency
vrslev Aug 8, 2025
fd43925
Remove logging when server heartbeat times out and disconnect occurs
vrslev Aug 8, 2025
32895b6
Add log warnings for connection loss, reconnection, and invalid ack/n…
vrslev Aug 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ stompman takes care of cleaning up resources automatically. When you leave the c

- When connection is lost, stompman will attempt to handle it automatically. `stompman.FailedAllConnectAttemptsError` will be raised if all connection attempts fail. `stompman.FailedAllWriteAttemptsError` will be raised if connection succeeds but sending a frame or heartbeat lead to losing connection.
- To implement health checks, use `stompman.Client.is_alive()` — it will return `True` if everything is OK and `False` if server is not responding.
- `stompman` will write log warnings when connection is lost, after successful reconnection or invalid state during ack/nack.

### ...and caveats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _() -> None:
await broker.start()
await broker.publish(faker.pystr(), destination)
await event.wait()
await asyncio.sleep(0)
await asyncio.sleep(0)


@pytest.mark.parametrize("method_name", ["ack", "nack", "reject"])
Expand Down Expand Up @@ -211,6 +213,8 @@ def some_handler(message_frame: Annotated[stompman.MessageFrame, Context("messag
await broker.start()
await broker.publish(faker.pystr(), destination)
await event.wait()
await asyncio.sleep(0)
await asyncio.sleep(0)

assert message_id
extra = {"destination": destination, "message_id": message_id}
Expand Down
2 changes: 2 additions & 0 deletions packages/stompman/stompman/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
SubscribeFrame,
UnsubscribeFrame,
)
from stompman.logger import LOGGER as logger # noqa: N811
from stompman.serde import FrameParser, dump_frame
from stompman.subscription import AckableMessageFrame, AutoAckSubscription, ManualAckSubscription
from stompman.transaction import Transaction
Expand Down Expand Up @@ -70,4 +71,5 @@
"UnsubscribeFrame",
"UnsupportedProtocolVersion",
"dump_frame",
"logger",
]
2 changes: 2 additions & 0 deletions packages/stompman/stompman/connection_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class EstablishedConnectionResult:


class AbstractConnectionLifespan(Protocol):
connection_parameters: ConnectionParameters

async def enter(self) -> EstablishedConnectionResult | StompProtocolConnectionIssue: ...
async def exit(self) -> None: ...

Expand Down
22 changes: 17 additions & 5 deletions packages/stompman/stompman/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
from stompman.errors import (
AllServersUnavailable,
AnyConnectionIssue,
ConnectionLost,
ConnectionLostError,
ConnectionLostOnLifespanEnter,
FailedAllConnectAttemptsError,
FailedAllWriteAttemptsError,
)
from stompman.frames import AnyClientFrame, AnyServerFrame
from stompman.logger import LOGGER

if TYPE_CHECKING:
from stompman.connection_lifespan import AbstractConnectionLifespan, ConnectionLifespanFactory
Expand Down Expand Up @@ -60,7 +61,7 @@ async def __aenter__(self) -> Self:
await self._task_group.__aenter__()
self._send_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
self._check_server_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
self._active_connection_state = await self._get_active_connection_state()
self._active_connection_state = await self._get_active_connection_state(is_initial_call=True)
return self

async def __aexit__(
Expand Down Expand Up @@ -102,7 +103,7 @@ async def _check_server_heartbeat_forever(self, receive_heartbeat_interval_ms: i
if not self._active_connection_state:
continue
if not self._active_connection_state.is_alive(self.check_server_alive_interval_factor):
self._active_connection_state = None
self._clear_active_connection_state()

async def _create_connection_to_one_server(
self, server: ConnectionParameters
Expand Down Expand Up @@ -140,7 +141,7 @@ async def _connect_to_any_server(self) -> ActiveConnectionState | AnyConnectionI
try:
connection_result = await lifespan.enter()
except ConnectionLostError:
return ConnectionLost()
return ConnectionLostOnLifespanEnter()

return (
ActiveConnectionState(
Expand All @@ -150,7 +151,7 @@ async def _connect_to_any_server(self) -> ActiveConnectionState | AnyConnectionI
else connection_result
)

async def _get_active_connection_state(self) -> ActiveConnectionState:
async def _get_active_connection_state(self, *, is_initial_call: bool = False) -> ActiveConnectionState:
if self._active_connection_state:
return self._active_connection_state

Expand All @@ -165,6 +166,11 @@ async def _get_active_connection_state(self) -> ActiveConnectionState:

if isinstance(connection_result, ActiveConnectionState):
self._active_connection_state = connection_result
if not is_initial_call:
LOGGER.warning(
"reconnected after failure connection failure. connection_parameters: %s",
connection_result.lifespan.connection_parameters,
)
return connection_result

connection_issues.append(connection_result)
Expand All @@ -173,6 +179,12 @@ async def _get_active_connection_state(self) -> ActiveConnectionState:
raise FailedAllConnectAttemptsError(retry_attempts=self.connect_retry_attempts, issues=connection_issues)

def _clear_active_connection_state(self) -> None:
if not self._active_connection_state:
return
LOGGER.warning(
"connection lost. connection_parameters: %s",
self._active_connection_state.lifespan.connection_parameters,
)
self._active_connection_state = None

async def write_heartbeat_reconnecting(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions packages/stompman/stompman/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class UnsupportedProtocolVersion:


@dataclass(frozen=True, kw_only=True, slots=True)
class ConnectionLost: ...
class ConnectionLostOnLifespanEnter: ...


@dataclass(frozen=True, kw_only=True, slots=True)
Expand All @@ -38,7 +38,7 @@ class AllServersUnavailable:


StompProtocolConnectionIssue = ConnectionConfirmationTimeout | UnsupportedProtocolVersion
AnyConnectionIssue = StompProtocolConnectionIssue | ConnectionLost | AllServersUnavailable
AnyConnectionIssue = StompProtocolConnectionIssue | ConnectionLostOnLifespanEnter | AllServersUnavailable


@dataclass(kw_only=True)
Expand Down
5 changes: 5 additions & 0 deletions packages/stompman/stompman/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import logging
from typing import Final

__all__ = ["LOGGER"]
LOGGER: Final = logging.getLogger("stompman")
49 changes: 42 additions & 7 deletions packages/stompman/stompman/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SubscribeFrame,
UnsubscribeFrame,
)
from stompman.logger import LOGGER


@dataclass(kw_only=True, slots=True, frozen=True)
Expand All @@ -30,8 +31,12 @@ def get_by_id(self, subscription_id: str) -> "AutoAckSubscription | ManualAckSub
def get_all(self) -> list["AutoAckSubscription | ManualAckSubscription"]:
return list(self.subscriptions.values())

def get_ids(self) -> list[str]:
return list(self.subscriptions.keys())

def delete_by_id(self, subscription_id: str) -> None:
del self.subscriptions[subscription_id]
if subscription_id in self.subscriptions:
del self.subscriptions[subscription_id]
if not self.subscriptions:
self.event.set()

Expand Down Expand Up @@ -68,16 +73,46 @@ async def unsubscribe(self) -> None:
await self._connection_manager.maybe_write_frame(UnsubscribeFrame(headers={"id": self.id}))

async def _nack(self, frame: MessageFrame) -> None:
if self._active_subscriptions.contains_by_id(self.id) and (ack_id := frame.headers.get("ack")):
await self._connection_manager.maybe_write_frame(
NackFrame(headers={"id": ack_id, "subscription": frame.headers["subscription"]})
if not self._active_subscriptions.contains_by_id(self.id):
LOGGER.warning(
"failed to nack message frame: subscription is not active. "
"message_id: %s, subscription_id: %s, active_subscriptions: %s",
frame.headers["message-id"],
self.id,
self._active_subscriptions.get_ids(),
)
return
if not (ack_id := frame.headers.get("ack")):
LOGGER.warning(
'failed to nack message frame: it has no "ack" header. "'
"message_id: %s, subscription_id: %s, frame_header_names: %s",
frame.headers["message-id"],
self.id,
frame.headers.keys(),
)
return
await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id}))

async def _ack(self, frame: MessageFrame) -> None:
if self._active_subscriptions.contains_by_id(self.id) and (ack_id := frame.headers.get("ack")):
await self._connection_manager.maybe_write_frame(
AckFrame(headers={"id": ack_id, "subscription": frame.headers["subscription"]})
if not self._active_subscriptions.contains_by_id(self.id):
LOGGER.warning(
"failed to ack message frame: subscription is not active. "
"message_id: %s, subscription_id: %s, active_subscriptions: %s",
frame.headers["message-id"],
self.id,
self._active_subscriptions.get_ids(),
)
return
if not (ack_id := frame.headers.get("ack")):
LOGGER.warning(
'failed to ack message frame: it has no "ack" header. "'
"message_id: %s, subscription_id: %s, frame_header_names: %s",
frame.headers["message-id"],
self.id,
frame.headers.keys(),
)
return
await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id}))


@dataclass(kw_only=True, slots=True)
Expand Down
1 change: 1 addition & 0 deletions packages/stompman/test_stompman/test_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ async def test_get_active_connection_state_ok_concurrent() -> None:
async def test_connection_manager_context_connection_lost() -> None:
async with EnrichedConnectionManager(connection_class=BaseMockConnection) as manager:
manager._clear_active_connection_state()
manager._clear_active_connection_state()


async def test_connection_manager_context_lifespan_aexit_raises_connection_lost() -> None:
Expand Down
42 changes: 41 additions & 1 deletion packages/stompman/test_stompman/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ async def test_client_listen_routing_ok(monkeypatch: pytest.MonkeyPatch, faker:
@pytest.mark.parametrize("side_effect", [None, SomeError])
@pytest.mark.parametrize("ack", ["client", "client-individual"])
async def test_client_listen_unsubscribe_before_ack_or_nack(
monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, ack: AckMode, side_effect: object
monkeypatch: pytest.MonkeyPatch,
faker: faker.Faker,
ack: AckMode,
side_effect: object,
caplog: pytest.LogCaptureFixture,
) -> None:
subscription_id, destination = faker.pystr(), faker.pystr()
monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id))
Expand All @@ -213,6 +217,42 @@ async def test_client_listen_unsubscribe_before_ack_or_nack(
message_frame,
UnsubscribeFrame(headers={"id": subscription_id}),
)
assert len(caplog.messages) == 1


@pytest.mark.parametrize("side_effect", [None, SomeError])
@pytest.mark.parametrize("ack", ["client", "client-individual"])
async def test_client_listen_ack_with_no_ack_header(
monkeypatch: pytest.MonkeyPatch,
faker: faker.Faker,
ack: AckMode,
side_effect: object,
caplog: pytest.LogCaptureFixture,
) -> None:
subscription_id, destination = faker.pystr(), faker.pystr()
monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id))

message_frame = build_dataclass(MessageFrame, headers={"subscription": subscription_id})
message_frame.headers.pop("ack")

connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame]))
message_handler = mock.AsyncMock(side_effect=side_effect)

async with EnrichedClient(connection_class=connection_class) as client:
subscription = await client.subscribe(
destination, message_handler, on_suppressed_exception=noop_error_handler, ack=ack
)
await asyncio.sleep(0)
await asyncio.sleep(0)
await subscription.unsubscribe()

message_handler.assert_called_once_with(message_frame)
assert collected_frames == enrich_expected_frames(
SubscribeFrame(headers={"ack": ack, "destination": destination, "id": subscription_id}),
message_frame,
UnsubscribeFrame(headers={"id": subscription_id}),
)
assert len(caplog.messages) == 1


@pytest.mark.parametrize("ok", [True, False])
Expand Down