Skip to content

Commit b349d6d

Browse files
authored
Shutdown FastStream app when STOMP broker becomes unavailable (but was available when app started) (#117)
1 parent b6e4e07 commit b349d6d

File tree

1 file changed

+13
-1
lines changed
  • packages/faststream-stomp/faststream_stomp

1 file changed

+13
-1
lines changed

packages/faststream-stomp/faststream_stomp/broker.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
import types
34
from collections.abc import Callable, Iterable, Mapping, Sequence
@@ -30,6 +31,15 @@ def get_schema(self) -> dict[str, dict[str, str]]: # noqa: PLR6301
3031
return {"user-password": {"type": "userPassword"}}
3132

3233

34+
def _handle_listen_task_done(listen_task: asyncio.Task[None]) -> None:
35+
# Not sure how to test this. See https://github.com/community-of-python/stompman/pull/117#issuecomment-2983584449.
36+
task_exception = listen_task.exception()
37+
if isinstance(task_exception, ExceptionGroup) and isinstance(
38+
task_exception.exceptions[0], stompman.FailedAllConnectAttemptsError
39+
):
40+
raise SystemExit(1)
41+
42+
3343
class StompBroker(StompRegistrator, BrokerUsecase[stompman.MessageFrame, stompman.Client]):
3444
_subscribers: Mapping[int, StompSubscriber]
3545
_publishers: Mapping[int, StompPublisher]
@@ -96,7 +106,9 @@ async def _connect(self, client: stompman.Client) -> stompman.Client: # type: i
96106
return client
97107
self._attempted_to_connect = True
98108
self._producer = StompProducer(client)
99-
return await client.__aenter__()
109+
await client.__aenter__()
110+
client._listen_task.add_done_callback(_handle_listen_task_done) # noqa: SLF001
111+
return client
100112

101113
async def _close(
102114
self,

0 commit comments

Comments
 (0)