Skip to content

Remove unneeded read_timeout #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ async with stompman.Client(
connect_timeout=2,
connection_confirmation_timeout=2,
disconnect_confirmation_timeout=2,
read_timeout=2,
write_retry_attempts=3,
check_server_alive_interval_factor=3,
) as client:
Expand Down Expand Up @@ -144,7 +143,7 @@ Also, I want to pointed out that:

- Protocol parsing is inspired by [aiostomp](https://github.com/pedrokiefer/aiostomp/blob/3449dcb53f43e5956ccc7662bb5b7d76bc6ef36b/aiostomp/protocol.py) (meaning: consumed by me and refactored from).
- stompman is tested and used with [ActiveMQ Artemis](https://activemq.apache.org/components/artemis/) and [ActiveMQ Classic](https://activemq.apache.org/components/classic/).
- Caveat: a message sent by a Stomp client is converted into a JMS `TextMessage`/`BytesMessage` based on the `content-length` header (see the docs [here](https://activemq.apache.org/components/classic/documentation/stomp)). In order to send a `TextMessage`, `Client.send` needs to be invoked with `add_content_length` header set to `False`
- Caveat: a message sent by a Stomp client is converted into a JMS `TextMessage`/`BytesMessage` based on the `content-length` header (see the docs [here](https://activemq.apache.org/components/classic/documentation/stomp)). In order to send a `TextMessage`, `Client.send` needs to be invoked with `add_content_length` header set to `False`
- Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).

### FastStream STOMP broker
Expand Down
5 changes: 5 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import platform
from typing import cast

import pytest
Expand All @@ -24,6 +25,10 @@ def first_server_connection_parameters() -> stompman.ConnectionParameters:
stompman.ConnectionParameters(host="127.0.0.1", port=9000, login="admin", passcode=":=123"),
stompman.ConnectionParameters(host="127.0.0.1", port=9001, login="admin", passcode=":=123"),
]
if platform.platform() == "Linux" # TODO: fix tests with ActiveMQ Classic on Mac # noqa: FIX002, TD002, TD003
else [
stompman.ConnectionParameters(host="127.0.0.1", port=9000, login="admin", passcode=":=123"),
],
)
def connection_parameters(request: pytest.FixtureRequest) -> stompman.ConnectionParameters:
return cast("stompman.ConnectionParameters", request.param)
2 changes: 0 additions & 2 deletions packages/stompman/stompman/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class Client:
connect_retry_attempts: int = 3
connect_retry_interval: int = 1
connect_timeout: int = 2
read_timeout: int = 2
read_max_chunk_size: int = 1024 * 1024
write_retry_attempts: int = 3
connection_confirmation_timeout: int = 2
Expand Down Expand Up @@ -69,7 +68,6 @@ def __post_init__(self) -> None:
connect_retry_attempts=self.connect_retry_attempts,
connect_retry_interval=self.connect_retry_interval,
connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout,
read_max_chunk_size=self.read_max_chunk_size,
write_retry_attempts=self.write_retry_attempts,
check_server_alive_interval_factor=self.check_server_alive_interval_factor,
Expand Down
10 changes: 2 additions & 8 deletions packages/stompman/stompman/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async def connect(
port: int,
timeout: int,
read_max_chunk_size: int,
read_timeout: int,
ssl: Literal[True] | SSLContext | None,
) -> Self | None: ...
async def close(self) -> None: ...
Expand All @@ -46,7 +45,6 @@ class Connection(AbstractConnection):
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
read_max_chunk_size: int
read_timeout: int
ssl: Literal[True] | SSLContext | None

@classmethod
Expand All @@ -57,7 +55,6 @@ async def connect(
port: int,
timeout: int,
read_max_chunk_size: int,
read_timeout: int,
ssl: Literal[True] | SSLContext | None,
) -> Self | None:
try:
Expand All @@ -69,7 +66,6 @@ async def connect(
reader=reader,
writer=writer,
read_max_chunk_size=read_max_chunk_size,
read_timeout=read_timeout,
ssl=ssl,
)

Expand Down Expand Up @@ -99,10 +95,8 @@ async def read_frames(self) -> AsyncGenerator[AnyServerFrame, None]:
parser = FrameParser()

while True:
with _reraise_connection_lost(ConnectionError, TimeoutError):
raw_frames = await asyncio.wait_for(
self._read_non_empty_bytes(self.read_max_chunk_size), timeout=self.read_timeout
)
with _reraise_connection_lost(ConnectionError):
raw_frames = await self._read_non_empty_bytes(self.read_max_chunk_size)
self.last_read_time = time.time()

for frame in cast("Iterator[AnyServerFrame]", parser.parse_frames_from_chunk(raw_frames)):
Expand Down
2 changes: 0 additions & 2 deletions packages/stompman/stompman/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class ConnectionManager:
connect_retry_interval: int
connect_timeout: int
ssl: Literal[True] | SSLContext | None
read_timeout: int
read_max_chunk_size: int
write_retry_attempts: int
check_server_alive_interval_factor: int
Expand Down Expand Up @@ -113,7 +112,6 @@ async def _create_connection_to_one_server(
port=server.port,
timeout=self.connect_timeout,
read_max_chunk_size=self.read_max_chunk_size,
read_timeout=self.read_timeout,
ssl=self.ssl,
):
return (connection, server)
Expand Down
2 changes: 0 additions & 2 deletions packages/stompman/test_stompman/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ async def connect(
port: int,
timeout: int,
read_max_chunk_size: int,
read_timeout: int,
ssl: Literal[True] | SSLContext | None,
) -> Self | None:
return cls()
Expand Down Expand Up @@ -76,7 +75,6 @@ class EnrichedConnectionManager(ConnectionManager):
connect_retry_attempts: int = 3
connect_retry_interval: int = 1
connect_timeout: int = 3
read_timeout: int = 4
read_max_chunk_size: int = 5
write_retry_attempts: int = 3
ssl: Literal[True] | SSLContext | None = None
Expand Down
14 changes: 1 addition & 13 deletions packages/stompman/test_stompman/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import socket
from collections.abc import Awaitable
from functools import partial
from typing import Any
from unittest import mock

Expand All @@ -21,9 +20,7 @@


async def make_connection() -> Connection | None:
return await Connection.connect(
host="localhost", port=12345, timeout=2, read_max_chunk_size=1024 * 1024, read_timeout=2, ssl=None
)
return await Connection.connect(host="localhost", port=12345, timeout=2, read_max_chunk_size=1024 * 1024, ssl=None)


async def make_mocked_connection(monkeypatch: pytest.MonkeyPatch, reader: object, writer: object) -> Connection:
Expand Down Expand Up @@ -151,15 +148,6 @@ async def test_connection_connect_connection_error(monkeypatch: pytest.MonkeyPat
assert not await make_connection()


async def test_read_frames_timeout_error(monkeypatch: pytest.MonkeyPatch) -> None:
connection = await make_mocked_connection(
monkeypatch, mock.AsyncMock(read=partial(asyncio.sleep, 5)), mock.AsyncMock()
)
mock_wait_for(monkeypatch)
with pytest.raises(ConnectionLostError):
[frame async for frame in connection.read_frames()]


async def test_read_frames_connection_error(monkeypatch: pytest.MonkeyPatch) -> None:
connection = await make_mocked_connection(
monkeypatch, mock.AsyncMock(read=mock.AsyncMock(side_effect=BrokenPipeError)), mock.AsyncMock()
Expand Down
4 changes: 0 additions & 4 deletions packages/stompman/test_stompman/test_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async def connect(
port: int,
timeout: int,
read_max_chunk_size: int,
read_timeout: int,
ssl: Literal[True] | SSLContext | None,
) -> Self | None:
assert (host, port) == (manager.servers[0].host, manager.servers[0].port)
Expand All @@ -55,7 +54,6 @@ async def connect(
port=port,
timeout=timeout,
read_max_chunk_size=read_max_chunk_size,
read_timeout=read_timeout,
ssl=ssl,
)
if attempts == ok_on_attempt
Expand Down Expand Up @@ -88,7 +86,6 @@ async def connect(
port: int,
timeout: int,
read_max_chunk_size: int,
read_timeout: int,
ssl: Literal[True] | SSLContext | None,
) -> Self | None:
return (
Expand All @@ -97,7 +94,6 @@ async def connect(
port=port,
timeout=timeout,
read_max_chunk_size=read_max_chunk_size,
read_timeout=read_timeout,
ssl=ssl,
)
if port == successful_server.port
Expand Down
4 changes: 1 addition & 3 deletions packages/stompman/test_stompman/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@

@asynccontextmanager
async def create_client(connection_parameters: stompman.ConnectionParameters) -> AsyncGenerator[stompman.Client, None]:
async with stompman.Client(
servers=[connection_parameters], read_timeout=10, connection_confirmation_timeout=10
) as client:
async with stompman.Client(servers=[connection_parameters], connection_confirmation_timeout=10) as client:
yield client


Expand Down