Skip to content

Commit de0064e

Browse files
committed
Support scenario when broker.connect() is called manually (for example, in lifespan hook)
Attempts to fix flaky internal tests
1 parent db6035a commit de0064e

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

packages/faststream-stomp/faststream_stomp/broker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(
8282
name="stomp", default_context={"channel": ""}, message_id_ln=self.__max_msg_id_ln
8383
),
8484
)
85+
self._attempted_to_connect = False
8586

8687
async def start(self) -> None:
8788
await super().start()
@@ -91,6 +92,9 @@ async def start(self) -> None:
9192
await handler.start()
9293

9394
async def _connect(self, client: stompman.Client) -> stompman.Client: # type: ignore[override]
95+
if self._attempted_to_connect:
96+
return client
97+
self._attempted_to_connect = True
9498
self._producer = StompProducer(client)
9599
return await client.__aenter__()
96100

packages/faststream-stomp/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ requires = ["hatchling", "hatch-vcs"]
2424
build-backend = "hatchling.build"
2525

2626
[dependency-groups]
27-
dev = ["faststream[otel,prometheus]~=0.5"]
27+
dev = ["faststream[otel,prometheus]~=0.5", "asgi-lifespan"]
2828

2929
[tool.hatch.version]
3030
source = "vcs"

packages/faststream-stomp/test_faststream_stomp/test_integration.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import faststream_stomp
99
import pytest
1010
import stompman
11+
from asgi_lifespan import LifespanManager
1112
from faststream import BaseMiddleware, Context, FastStream
13+
from faststream.asgi import AsgiFastStream
1214
from faststream.broker.message import gen_cor_id
1315
from faststream.broker.middlewares.logging import CriticalLogMiddleware
1416
from faststream.exceptions import AckMessage, NackMessage, RejectMessage
@@ -217,3 +219,9 @@ def some_handler(message_frame: Annotated[stompman.MessageFrame, Context("messag
217219
mock.call(logging.ERROR, "MyError: ", extra=extra, exc_info=MyError()),
218220
mock.call(logging.INFO, "Processed", extra=extra),
219221
]
222+
223+
224+
async def test_broker_connect_twice(broker: faststream_stomp.StompBroker) -> None:
225+
app = AsgiFastStream(broker, on_startup=[broker.connect])
226+
async with LifespanManager(app):
227+
pass

0 commit comments

Comments
 (0)