Skip to content

Commit 842a8d7

Browse files
committed
Split heartbeat tasks and improve server heartbeat monitoring
1 parent 407dbc2 commit 842a8d7

File tree

3 files changed

+44
-19
lines changed

3 files changed

+44
-19
lines changed

packages/stompman/stompman/client.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import inspect
3+
import time
34
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
45
from contextlib import AsyncExitStack, asynccontextmanager
56
from dataclasses import dataclass, field
@@ -52,7 +53,8 @@ class Client:
5253
_active_subscriptions: ActiveSubscriptions = field(default_factory=dict, init=False)
5354
_active_transactions: set[Transaction] = field(default_factory=set, init=False)
5455
_exit_stack: AsyncExitStack = field(default_factory=AsyncExitStack, init=False)
55-
_heartbeat_task: asyncio.Task[None] = field(init=False)
56+
_send_heartbeat_task: asyncio.Task[None] = field(init=False)
57+
_check_server_heartbeat_task: asyncio.Task[None] = field(init=False)
5658
_listen_task: asyncio.Task[None] = field(init=False)
5759
_task_group: asyncio.TaskGroup = field(init=False)
5860
_on_heartbeat_is_async: bool = field(init=False)
@@ -68,7 +70,7 @@ def __post_init__(self) -> None:
6870
disconnect_confirmation_timeout=self.disconnect_confirmation_timeout,
6971
active_subscriptions=self._active_subscriptions,
7072
active_transactions=self._active_transactions,
71-
set_heartbeat_interval=self._restart_heartbeat_task,
73+
set_heartbeat_interval=self._restart_heartbeat_tasks,
7274
),
7375
connection_class=self.connection_class,
7476
connect_retry_attempts=self.connect_retry_attempts,
@@ -83,7 +85,8 @@ def __post_init__(self) -> None:
8385

8486
async def __aenter__(self) -> Self:
8587
self._task_group = await self._exit_stack.enter_async_context(asyncio.TaskGroup())
86-
self._heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
88+
self._send_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
89+
self._check_server_heartbeat_task = self._task_group.create_task(asyncio.sleep(0))
8790
await self._exit_stack.enter_async_context(self._connection_manager)
8891
self._listen_task = self._task_group.create_task(self._listen_to_frames())
8992
return self
@@ -96,18 +99,42 @@ async def __aexit__(
9699
await asyncio.Future()
97100
finally:
98101
self._listen_task.cancel()
99-
self._heartbeat_task.cancel()
100-
await asyncio.wait([self._listen_task, self._heartbeat_task])
102+
self._send_heartbeat_task.cancel()
103+
self._check_server_heartbeat_task.cancel()
104+
await asyncio.wait([self._listen_task, self._send_heartbeat_task, self._check_server_heartbeat_task])
101105
await self._exit_stack.aclose()
102106

103-
def _restart_heartbeat_task(self, interval: float) -> None:
104-
self._heartbeat_task.cancel()
105-
self._heartbeat_task = self._task_group.create_task(self._send_heartbeats_forever(interval))
107+
def _restart_heartbeat_tasks(self, server_heartbeat: Heartbeat) -> None:
108+
self._send_heartbeat_task.cancel()
109+
self._check_server_heartbeat_task.cancel()
110+
self._send_heartbeat_task = self._task_group.create_task(
111+
self._send_heartbeats_forever(server_heartbeat.want_to_receive_interval_ms)
112+
)
113+
self._check_server_heartbeat_task = self._task_group.create_task(
114+
self._monitor_server_aliveness(server_heartbeat.will_send_interval_ms)
115+
)
106116

107-
async def _send_heartbeats_forever(self, interval: float) -> None:
117+
async def _send_heartbeats_forever(self, client_heartbeat_interval_ms: int) -> None:
108118
while True:
109119
await self._connection_manager.write_heartbeat_reconnecting()
110-
await asyncio.sleep(interval)
120+
await asyncio.sleep(client_heartbeat_interval_ms / 1000)
121+
122+
async def _monitor_server_aliveness(self, server_heartbeat_interval_ms: int) -> None:
123+
server_heartbeat_interval_seconds = server_heartbeat_interval_ms / 1000
124+
while True:
125+
await asyncio.sleep(server_heartbeat_interval_seconds * self.check_server_alive_interval_factor)
126+
last_read_time_ms = (
127+
self._connection_manager._active_connection_state
128+
and self._connection_manager._active_connection_state.connection.last_read_time_ms
129+
)
130+
if not last_read_time_ms:
131+
print("No last_Read_time_ms")
132+
continue
133+
if (t:=time.time() - last_read_time_ms) > server_heartbeat_interval_seconds:
134+
print("reset time", t)
135+
self._connection_manager._active_connection_state = None
136+
else:
137+
print("no reset")
111138

112139
async def _listen_to_frames(self) -> None:
113140
async with asyncio.TaskGroup() as task_group:

packages/stompman/stompman/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
@dataclass(kw_only=True)
1616
class AbstractConnection(Protocol):
17-
last_read_time: float | None = field(init=False, default=None)
17+
last_read_time_ms: float | None = field(init=False, default=None)
1818

1919
@classmethod
2020
async def connect(
@@ -103,7 +103,7 @@ async def read_frames(self) -> AsyncGenerator[AnyServerFrame, None]:
103103
raw_frames = await asyncio.wait_for(
104104
self._read_non_empty_bytes(self.read_max_chunk_size), timeout=self.read_timeout
105105
)
106-
self.last_read_time = time.time()
106+
self.last_read_time_ms = time.time()
107107

108108
for frame in cast("Iterator[AnyServerFrame]", parser.parse_frames_from_chunk(raw_frames)):
109109
yield frame

packages/stompman/stompman/connection_lifespan.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import asyncio
2-
from collections.abc import Callable
32
from contextlib import suppress
43
from dataclasses import dataclass
5-
from typing import Any, Protocol
4+
from typing import Any, Callable, Protocol
65
from uuid import uuid4
76

87
from stompman.config import ConnectionParameters, Heartbeat
@@ -27,6 +26,8 @@ async def enter(self) -> StompProtocolConnectionIssue | None: ...
2726
async def exit(self) -> None: ...
2827

2928

29+
30+
3031
@dataclass(kw_only=True, slots=True)
3132
class ConnectionLifespan(AbstractConnectionLifespan):
3233
connection: AbstractConnection
@@ -37,7 +38,7 @@ class ConnectionLifespan(AbstractConnectionLifespan):
3738
disconnect_confirmation_timeout: int
3839
active_subscriptions: ActiveSubscriptions
3940
active_transactions: ActiveTransactions
40-
set_heartbeat_interval: Callable[[float], Any]
41+
set_heartbeat_interval: Callable[[Heartbeat], Any]
4142

4243
async def _establish_connection(self) -> StompProtocolConnectionIssue | None:
4344
await self.connection.write_frame(
@@ -73,10 +74,7 @@ async def take_connected_frame_and_collect_other_frames() -> ConnectedFrame:
7374
given_version=connected_frame.headers["version"], supported_version=self.protocol_version
7475
)
7576

76-
server_heartbeat = Heartbeat.from_header(connected_frame.headers["heart-beat"])
77-
self.set_heartbeat_interval(
78-
max(self.client_heartbeat.will_send_interval_ms, server_heartbeat.want_to_receive_interval_ms) / 1000
79-
)
77+
self.set_heartbeat_interval(Heartbeat.from_header(connected_frame.headers["heart-beat"]))
8078
return None
8179

8280
async def enter(self) -> StompProtocolConnectionIssue | None:

0 commit comments

Comments
 (0)