From da77eacca849dc7029970d5322a84396d383689b Mon Sep 17 00:00:00 2001 From: giyokun Date: Thu, 28 Aug 2025 17:31:07 +0900 Subject: [PATCH 01/16] Improve binary message handling replaced the parse frame method to deal with binary messages and properly account for content length. --- packages/stompman/stompman/serde.py | 68 ++++++++++++++++++----------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index fbfaa43b..08a475bd 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -141,53 +141,69 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte return frame_type(headers=headers_, body=body) if frame_type in FRAMES_WITH_BODY else frame_type(headers=headers_) # type: ignore[call-arg] -def parse_lines_into_frame(lines: deque[bytearray]) -> AnyClientFrame | AnyServerFrame: - command = bytes(lines.popleft()) - headers = {} - - while line := lines.popleft(): - header = parse_header(line) - if header and header[0] not in headers: - headers[header[0]] = header[1] - body = bytes(lines.popleft()) if lines else b"" - return make_frame_from_parts(command=command, headers=headers, body=body) - - @dataclass(kw_only=True, slots=True) class FrameParser: - _lines: deque[bytearray] = field(default_factory=deque, init=False) _current_line: bytearray = field(default_factory=bytearray, init=False) - _previous_byte: bytes = field(default=b"", init=False) _headers_processed: bool = field(default=False, init=False) + _content_len: int = field(default=0, init=0) + _body_len: int = field(default=0, init=0) + _headers: dict = field(default_factory=dict) + _command: bytes = field(default=False, init=False) + def _reset(self) -> None: self._headers_processed = False - self._lines.clear() self._current_line = bytearray() + self._body_len = 0 + self._content_len = 0 + self._headers = {} + self._command = "" def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: for byte in iter_bytes(chunk): if byte == NULL: if self._headers_processed: - self._lines.append(self._current_line) - yield parse_lines_into_frame(self._lines) - self._reset() + # receiving body. If no content-len then stop at the first NULL byte + # otherelse continue reading until reachign content length + if self._content_len == 0 or self._body_len == self._content_len: + yield make_frame_from_parts(command = self._command, headers = self._headers, body = self._current_line) + self._reset() + else: + # update the buffer and update the bytecount + self._current_line += byte + self._body_len += 1 + else: + # if receiving a null while processing header reset + self._reset() elif not self._headers_processed and byte == NEWLINE: - if self._current_line or self._lines: - if self._previous_byte == CARRIAGE: - self._current_line.pop() + # processing headers here + # when receiving a NEWLINE + if self._current_line or self._command: + # if we have received a command or just received a new line + if self._current_line and self._current_line[-1] == CARRIAGE: + self._current_line.pop() # remove the extraneous final byte self._headers_processed = not self._current_line # extra empty line after headers - if not self._lines and bytes(self._current_line) not in COMMANDS_TO_FRAMES: - self._reset() - else: - self._lines.append(self._current_line) + if self._current_line: # only continue if we have something + if not self._command: # command still empty, command comes first + self._command = bytes(self._current_line) + if self._command not in COMMANDS_TO_FRAMES: + self._reset() + else: # otherelse we need to parse headers + header = parse_header(self._current_line) + if header and header[0] not in self._headers: + self._headers[header[0]] = header[1] + if header[0] == "content-length": + self._content_len = int(header[1]) + # empty after processing self._current_line = bytearray() else: yield HeartbeatFrame() else: self._current_line += byte + # update the byte count if necessary + if self._headers_processed and self._content_len: + self._body_len += 1 - self._previous_byte = byte From 750be9c147171918674029c1828fcbfaa799631a Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:03:44 +0300 Subject: [PATCH 02/16] Add comprehensive agent context documentation for STOMPMAN project --- AGENTS.md | 195 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..342e8a13 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,195 @@ +# STOMPMAN Project Context for AI Agents + +## Project Overview + +**stompman** is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) that provides a typed, comprehensible API. It's designed to be: + +- Asynchronous (using Python's asyncio) +- Actively maintained (not abandoned) +- Type-safe with modern Python typing +- Compatible with STOMP 1.2 specification +- Tested with ActiveMQ Artemis and ActiveMQ Classic + +The project follows a monorepo structure with two main packages: +1. `stompman` - The core STOMP client library +2. `faststream-stomp` - A STOMP broker implementation for FastStream + +## Architecture and Key Components + +### Core Modules + +- `client.py` - Main `Client` class that manages connections, subscriptions, and transactions +- `frames.py` - Data classes representing STOMP protocol frames (Connect, Connected, Send, Subscribe, etc.) +- `serde.py` - Serialization/deserialization logic for STOMP frames +- `connection.py` - Low-level connection handling with abstract interface +- `connection_manager.py` - Manages connection lifecycle, retries, and failover +- `connection_lifespan.py` - Handles the connection lifecycle states +- `subscription.py` - Subscription management with auto/manual ACK modes +- `transaction.py` - Transaction support for atomic message operations +- `config.py` - Configuration data classes (ConnectionParameters, Heartbeat) +- `errors.py` - Custom exception hierarchy +- `logger.py` - Centralized logging + +### Key Features + +1. **Automatic Reconnection**: Handles connection failures with configurable retry logic +2. **Load Balancing**: Supports multiple server connections with automatic failover +3. **Heartbeat Management**: Automatic heartbeat sending/receiving to detect connection liveness +4. **Subscription Management**: Both auto-ACK and manual-ACK subscription modes +5. **Transaction Support**: Context manager-based transaction handling +6. **Type Safety**: Fully typed API with comprehensive type hints + +## Development Workflow + +### Prerequisites + +- Python 3.11+ +- uv (package manager) +- Docker (for integration tests) + +### Setup Commands + +```bash +# Install dependencies +just install + +# Alternative manual setup +uv lock --upgrade +uv sync --all-extras --all-packages --frozen +``` + +### Development Commands + +```bash +# Run linters +just lint +# Equivalent to: +# uv run ruff check . +# uv run ruff format . + +# Check types +just check-types +# Equivalent to: +# uv run mypy . + +# Run fast tests (unit tests only) +just test-fast + +# Run all tests (including integration tests with Docker) +just test + +# Run specific tests +just test-fast packages/stompman/test_stompman/test_frame_serde.py +``` + +### Testing + +The project uses pytest with comprehensive test coverage: + +- Unit tests for individual components +- Integration tests with real STOMP servers (ActiveMQ Artemis/Classic) +- Property-based testing with Hypothesis +- Test fixtures for connection parameters and asyncio backends + +Integration tests require Docker containers to be running: +```bash +just test +``` + +### Code Quality + +- Strict mypy type checking enabled +- Ruff linting with comprehensive ruleset +- 100% test coverage enforced +- Modern Python features (3.11+) required + +## Usage Patterns + +### Basic Client Usage + +```python +async with stompman.Client( + servers=[ + stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"), + ], + heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000), +) as client: + # Send messages + await client.send(b"hi there!", destination="DLQ", headers={"persistent": "true"}) + + # Subscribe to messages + await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print) +``` + +### Transaction Usage + +```python +async with client.begin() as transaction: + for _ in range(10): + await transaction.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"}) + await asyncio.sleep(0.1) +``` + +### Manual ACK Subscription + +```python +async def handle_message_from_dlq(message_frame: stompman.AckableMessageFrame) -> None: + print(message_frame.body) + await message_frame.ack() + +await client.subscribe_with_manual_ack("DLQ", handle_message_from_dlq, ack="client") +``` + +## Project Conventions + +### Code Style + +- Line length: 120 characters +- Strict typing with `from __future__ import annotations` +- Dataclasses with `slots=True` for performance +- Comprehensive docstrings for public APIs +- Modern Python syntax (structural pattern matching, etc.) + +### Testing Conventions + +- Test files named `test_*.py` +- Use of pytest parametrization for comprehensive test cases +- Property-based testing for serialization/deserialization +- Mocking only when necessary, preferring integration tests +- Test coverage reporting enabled + +### Git Workflow + +- Feature branches from main +- Squash merges for clean history +- Semantic commit messages +- Pull requests with CI checks + +## Common Development Tasks + +### Adding New Frame Types + +1. Add frame definition in `frames.py` +2. Update `COMMANDS_TO_FRAMES` and `FRAMES_TO_COMMANDS` mappings in `serde.py` +3. Add serialization/deserialization tests in `test_frame_serde.py` +4. Update exports in `__init__.py` + +### Modifying Connection Logic + +1. Changes usually belong in `connection.py` or `connection_manager.py` +2. Update corresponding tests in `test_connection.py` or `test_connection_manager.py` +3. Integration tests may need updates in `test_integration.py` + +### Adding Configuration Options + +1. Add fields to appropriate dataclass in `config.py` +2. Pass through to relevant components in `client.py` +3. Update README.md documentation +4. Add tests for new configuration behavior + +## Debugging Tips + +- Use `stompman.logger` for debugging connection issues +- Enable verbose logging with `logging.getLogger('stompman').setLevel(logging.DEBUG)` +- Check integration tests for real-world usage examples +- Use the examples/ directory for quick prototyping From db099617660c0e0b910180bb673054e38aa81a36 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:04:52 +0300 Subject: [PATCH 03/16] Revert "Improve binary message handling" This reverts commit da77eacca849dc7029970d5322a84396d383689b. --- packages/stompman/stompman/serde.py | 68 +++++++++++------------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 08a475bd..fbfaa43b 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -141,69 +141,53 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte return frame_type(headers=headers_, body=body) if frame_type in FRAMES_WITH_BODY else frame_type(headers=headers_) # type: ignore[call-arg] +def parse_lines_into_frame(lines: deque[bytearray]) -> AnyClientFrame | AnyServerFrame: + command = bytes(lines.popleft()) + headers = {} + + while line := lines.popleft(): + header = parse_header(line) + if header and header[0] not in headers: + headers[header[0]] = header[1] + body = bytes(lines.popleft()) if lines else b"" + return make_frame_from_parts(command=command, headers=headers, body=body) + + @dataclass(kw_only=True, slots=True) class FrameParser: + _lines: deque[bytearray] = field(default_factory=deque, init=False) _current_line: bytearray = field(default_factory=bytearray, init=False) + _previous_byte: bytes = field(default=b"", init=False) _headers_processed: bool = field(default=False, init=False) - _content_len: int = field(default=0, init=0) - _body_len: int = field(default=0, init=0) - _headers: dict = field(default_factory=dict) - _command: bytes = field(default=False, init=False) - def _reset(self) -> None: self._headers_processed = False + self._lines.clear() self._current_line = bytearray() - self._body_len = 0 - self._content_len = 0 - self._headers = {} - self._command = "" def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: for byte in iter_bytes(chunk): if byte == NULL: if self._headers_processed: - # receiving body. If no content-len then stop at the first NULL byte - # otherelse continue reading until reachign content length - if self._content_len == 0 or self._body_len == self._content_len: - yield make_frame_from_parts(command = self._command, headers = self._headers, body = self._current_line) - self._reset() - else: - # update the buffer and update the bytecount - self._current_line += byte - self._body_len += 1 - else: - # if receiving a null while processing header reset - self._reset() + self._lines.append(self._current_line) + yield parse_lines_into_frame(self._lines) + self._reset() elif not self._headers_processed and byte == NEWLINE: - # processing headers here - # when receiving a NEWLINE - if self._current_line or self._command: - # if we have received a command or just received a new line - if self._current_line and self._current_line[-1] == CARRIAGE: - self._current_line.pop() # remove the extraneous final byte + if self._current_line or self._lines: + if self._previous_byte == CARRIAGE: + self._current_line.pop() self._headers_processed = not self._current_line # extra empty line after headers - if self._current_line: # only continue if we have something - if not self._command: # command still empty, command comes first - self._command = bytes(self._current_line) - if self._command not in COMMANDS_TO_FRAMES: - self._reset() - else: # otherelse we need to parse headers - header = parse_header(self._current_line) - if header and header[0] not in self._headers: - self._headers[header[0]] = header[1] - if header[0] == "content-length": - self._content_len = int(header[1]) - # empty after processing + if not self._lines and bytes(self._current_line) not in COMMANDS_TO_FRAMES: + self._reset() + else: + self._lines.append(self._current_line) self._current_line = bytearray() else: yield HeartbeatFrame() else: self._current_line += byte - # update the byte count if necessary - if self._headers_processed and self._content_len: - self._body_len += 1 + self._previous_byte = byte From 84adeba34a1f6563673ab6b66b08d5e4a717c7c7 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:23:02 +0300 Subject: [PATCH 04/16] Moved frame parsing logic inline within FrameParser to improve code locality and reduce function call overhead. --- packages/stompman/stompman/serde.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index fbfaa43b..07f308c5 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -141,18 +141,6 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte return frame_type(headers=headers_, body=body) if frame_type in FRAMES_WITH_BODY else frame_type(headers=headers_) # type: ignore[call-arg] -def parse_lines_into_frame(lines: deque[bytearray]) -> AnyClientFrame | AnyServerFrame: - command = bytes(lines.popleft()) - headers = {} - - while line := lines.popleft(): - header = parse_header(line) - if header and header[0] not in headers: - headers[header[0]] = header[1] - body = bytes(lines.popleft()) if lines else b"" - return make_frame_from_parts(command=command, headers=headers, body=body) - - @dataclass(kw_only=True, slots=True) class FrameParser: _lines: deque[bytearray] = field(default_factory=deque, init=False) @@ -170,7 +158,15 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any if byte == NULL: if self._headers_processed: self._lines.append(self._current_line) - yield parse_lines_into_frame(self._lines) + command = bytes(self._lines.popleft()) + headers = {} + + while line := self._lines.popleft(): + header = parse_header(line) + if header and header[0] not in headers: + headers[header[0]] = header[1] + body = bytes(self._lines.popleft()) if self._lines else b"" + yield make_frame_from_parts(command=command, headers=headers, body=body) self._reset() elif not self._headers_processed and byte == NEWLINE: From cf3331f39b803730a45d8f4afacc16c6cd170820 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:49:11 +0300 Subject: [PATCH 05/16] Refactor FrameParser to handle STOMP frame parsing more robustly with improved buffer management and state tracking. --- packages/stompman/stompman/serde.py | 51 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 07f308c5..5fa40728 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -144,46 +144,53 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte @dataclass(kw_only=True, slots=True) class FrameParser: _lines: deque[bytearray] = field(default_factory=deque, init=False) - _current_line: bytearray = field(default_factory=bytearray, init=False) + _current_buf: bytearray = field(default_factory=bytearray, init=False) # TODO: rename _previous_byte: bytes = field(default=b"", init=False) _headers_processed: bool = field(default=False, init=False) + _command: bytes | None = field(default=None, init=False) + _headers: dict[str, str] = field(default_factory=dict, init=False) def _reset(self) -> None: self._headers_processed = False self._lines.clear() - self._current_line = bytearray() + self._current_buf = bytearray() + self._command = None + self._headers = {} - def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: - for byte in iter_bytes(chunk): + def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: # noqa: C901, PLR0912 + for byte in iter_bytes(chunk): # noqa: PLR1702 if byte == NULL: - if self._headers_processed: - self._lines.append(self._current_line) - command = bytes(self._lines.popleft()) - headers = {} - - while line := self._lines.popleft(): - header = parse_header(line) - if header and header[0] not in headers: - headers[header[0]] = header[1] - body = bytes(self._lines.popleft()) if self._lines else b"" - yield make_frame_from_parts(command=command, headers=headers, body=body) + if self._command and self._headers_processed: + if content_length := self._headers.get("content-length"): + if len(self._current_buf) != int(content_length): + self._current_buf += byte + continue + + yield make_frame_from_parts( + command=self._command, headers=self._headers, body=bytes(self._current_buf) + ) self._reset() elif not self._headers_processed and byte == NEWLINE: - if self._current_line or self._lines: + if self._current_buf or self._command or self._headers_processed: if self._previous_byte == CARRIAGE: - self._current_line.pop() - self._headers_processed = not self._current_line # extra empty line after headers + self._current_buf.pop() + self._headers_processed = not self._current_buf # extra empty line after headers - if not self._lines and bytes(self._current_line) not in COMMANDS_TO_FRAMES: + if not self._command and bytes(self._current_buf) not in COMMANDS_TO_FRAMES: self._reset() else: - self._lines.append(self._current_line) - self._current_line = bytearray() + if not self._command: + self._command = bytes(self._current_buf) + else: + header = parse_header(self._current_buf) + if header and header[0] not in self._headers: + self._headers[header[0]] = header[1] + self._current_buf = bytearray() else: yield HeartbeatFrame() else: - self._current_line += byte + self._current_buf += byte self._previous_byte = byte From 6e078f8b794b0c4ac827c23b2057256fbf66edcf Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:50:53 +0300 Subject: [PATCH 06/16] Refactor FrameParser logic to properly handle command validation and header processing during frame parsing. --- packages/stompman/stompman/serde.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 5fa40728..0f311e7c 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -172,20 +172,22 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any self._reset() elif not self._headers_processed and byte == NEWLINE: - if self._current_buf or self._command or self._headers_processed: + if self._current_buf or self._command: if self._previous_byte == CARRIAGE: self._current_buf.pop() self._headers_processed = not self._current_buf # extra empty line after headers - if not self._command and bytes(self._current_buf) not in COMMANDS_TO_FRAMES: - self._reset() - else: - if not self._command: - self._command = bytes(self._current_buf) + if not self._command: + current_buf_bytes = bytes(self._current_buf) + if current_buf_bytes not in COMMANDS_TO_FRAMES: + self._reset() else: - header = parse_header(self._current_buf) - if header and header[0] not in self._headers: - self._headers[header[0]] = header[1] + self._command = current_buf_bytes + self._current_buf = bytearray() + else: + header = parse_header(self._current_buf) + if header and header[0] not in self._headers: + self._headers[header[0]] = header[1] self._current_buf = bytearray() else: yield HeartbeatFrame() From 66409d6bf9eb0a0efe8801cb3d7a79def6440de3 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:52:44 +0300 Subject: [PATCH 07/16] Handle heartbeat frames properly in STOMP frame parser by yielding them at correct processing points. --- packages/stompman/stompman/serde.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 0f311e7c..e062b3ed 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -172,7 +172,9 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any self._reset() elif not self._headers_processed and byte == NEWLINE: - if self._current_buf or self._command: + if not self._current_buf and not self._command: + yield HeartbeatFrame() + else: if self._previous_byte == CARRIAGE: self._current_buf.pop() self._headers_processed = not self._current_buf # extra empty line after headers @@ -189,8 +191,6 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any if header and header[0] not in self._headers: self._headers[header[0]] = header[1] self._current_buf = bytearray() - else: - yield HeartbeatFrame() else: self._current_buf += byte From f528063650940cd70dc98af86e9fab20a994d60c Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:53:36 +0300 Subject: [PATCH 08/16] Add content length tracking to frame parser state --- packages/stompman/stompman/serde.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index e062b3ed..0735a5af 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -149,6 +149,7 @@ class FrameParser: _headers_processed: bool = field(default=False, init=False) _command: bytes | None = field(default=None, init=False) _headers: dict[str, str] = field(default_factory=dict, init=False) + _content_length: int | None = field(default=None, init=False) def _reset(self) -> None: self._headers_processed = False @@ -156,6 +157,7 @@ def _reset(self) -> None: self._current_buf = bytearray() self._command = None self._headers = {} + self._content_length = None def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: # noqa: C901, PLR0912 for byte in iter_bytes(chunk): # noqa: PLR1702 From e718774721d774a60ae43b3707f756957afafe2f Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 12:55:54 +0300 Subject: [PATCH 09/16] Refactor frame parsing logic to use cached content-length header for body validation --- packages/stompman/stompman/serde.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 0735a5af..fc6f9ed2 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -163,11 +163,9 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any for byte in iter_bytes(chunk): # noqa: PLR1702 if byte == NULL: if self._command and self._headers_processed: - if content_length := self._headers.get("content-length"): - if len(self._current_buf) != int(content_length): - self._current_buf += byte - continue - + if self._content_length is not None and len(self._current_buf) != self._content_length: + self._current_buf += byte + continue yield make_frame_from_parts( command=self._command, headers=self._headers, body=bytes(self._current_buf) ) @@ -192,6 +190,9 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any header = parse_header(self._current_buf) if header and header[0] not in self._headers: self._headers[header[0]] = header[1] + if header[0].lower() == "content-length": + with suppress(ValueError): + self._content_length = int(header[1]) self._current_buf = bytearray() else: From a511e52108be5a89969eb25c47608d8a694d46ce Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:12:46 +0300 Subject: [PATCH 10/16] Fix content-length validation order and enhance frame parsing robustness with comprehensive NULL byte and content-length tests. --- packages/stompman/stompman/serde.py | 7 ++++++- .../test_stompman/test_frame_serde.py | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index fc6f9ed2..26e1f01e 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -163,7 +163,7 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any for byte in iter_bytes(chunk): # noqa: PLR1702 if byte == NULL: if self._command and self._headers_processed: - if self._content_length is not None and len(self._current_buf) != self._content_length: + if self._content_length is not None and self._content_length != len(self._current_buf): self._current_buf += byte continue yield make_frame_from_parts( @@ -195,6 +195,11 @@ def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | Any self._content_length = int(header[1]) self._current_buf = bytearray() + elif not self._headers_processed: + self._current_buf += byte + elif self._content_length is not None: + if self._content_length != len(self._current_buf): + self._current_buf += byte else: self._current_buf += byte diff --git a/packages/stompman/test_stompman/test_frame_serde.py b/packages/stompman/test_stompman/test_frame_serde.py index b9a8652f..392d9934 100644 --- a/packages/stompman/test_stompman/test_frame_serde.py +++ b/packages/stompman/test_stompman/test_frame_serde.py @@ -222,6 +222,26 @@ def test_dump_frame(frame: AnyClientFrame, dumped_frame: bytes) -> None: ConnectedFrame(headers={"header": "1.2"}), ], ), + # Correct content-length with body containing NULL byte + ( + b"MESSAGE\ncontent-length:5\n\nBod\x00y\x00", + [MessageFrame(headers={"content-length": "5"}, body=b"Bod\x00y")], + ), + # Content-length shorter than actual body (should only read up to content-length) + ( + b"MESSAGE\ncontent-length:4\n\nBody\x00 with extra\x00\n", + [MessageFrame(headers={"content-length": "4"}, body=b"Body"), HeartbeatFrame()], + ), + # Content-length longer than actual body (should wait for more data) + ( + b"MESSAGE\ncontent-length:10\n\nShort", + [], + ), + # Content-length longer than actual body, then more data comes with NULL terminator + ( + b"MESSAGE\ncontent-length:10\n\nShortMOREDATA\x00", + [MessageFrame(headers={"content-length": "10"}, body=b"ShortMORED")], + ), ], ) def test_load_frames(raw_frames: bytes, loaded_frames: list[AnyServerFrame]) -> None: From e18d90935df7961044212d9a91e499ef2e4d7af1 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:12:54 +0300 Subject: [PATCH 11/16] Removed TODO comment from FrameParser class attribute declaration. --- packages/stompman/stompman/serde.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 26e1f01e..2e289b1a 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -144,7 +144,7 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte @dataclass(kw_only=True, slots=True) class FrameParser: _lines: deque[bytearray] = field(default_factory=deque, init=False) - _current_buf: bytearray = field(default_factory=bytearray, init=False) # TODO: rename + _current_buf: bytearray = field(default_factory=bytearray, init=False) _previous_byte: bytes = field(default=b"", init=False) _headers_processed: bool = field(default=False, init=False) _command: bytes | None = field(default=None, init=False) From dfbebd26eab68230e181b9f5a260a2589a04cfc6 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:13:05 +0300 Subject: [PATCH 12/16] Remove unused deque import and cleanup FrameParser lines handling --- packages/stompman/stompman/serde.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 2e289b1a..00b40796 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -1,5 +1,4 @@ import struct -from collections import deque from collections.abc import Iterator from contextlib import suppress from dataclasses import dataclass, field @@ -143,7 +142,6 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte @dataclass(kw_only=True, slots=True) class FrameParser: - _lines: deque[bytearray] = field(default_factory=deque, init=False) _current_buf: bytearray = field(default_factory=bytearray, init=False) _previous_byte: bytes = field(default=b"", init=False) _headers_processed: bool = field(default=False, init=False) @@ -153,7 +151,6 @@ class FrameParser: def _reset(self) -> None: self._headers_processed = False - self._lines.clear() self._current_buf = bytearray() self._command = None self._headers = {} From 9bb6c45d577f053f94d40e33d08950f0a45b5c01 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:13:45 +0300 Subject: [PATCH 13/16] Updated FrameParser to allow _previous_byte field to be None instead of empty bytes for better null handling. --- packages/stompman/stompman/serde.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 00b40796..56965692 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -143,7 +143,7 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte @dataclass(kw_only=True, slots=True) class FrameParser: _current_buf: bytearray = field(default_factory=bytearray, init=False) - _previous_byte: bytes = field(default=b"", init=False) + _previous_byte: bytes | None = field(default=None, init=False) _headers_processed: bool = field(default=False, init=False) _command: bytes | None = field(default=None, init=False) _headers: dict[str, str] = field(default_factory=dict, init=False) From 91c385957b267dce46d7097b4ac9dbe1c61dcf09 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:15:30 +0300 Subject: [PATCH 14/16] Refactor FrameParser to explicitly define fields and initialize them in __init__ instead of using dataclass field declarations. --- packages/stompman/stompman/serde.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index 56965692..b2f9bf0b 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -1,7 +1,7 @@ import struct from collections.abc import Iterator from contextlib import suppress -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Any, Final, cast from stompman.frames import ( @@ -140,18 +140,22 @@ def make_frame_from_parts(*, command: bytes, headers: dict[str, str], body: byte return frame_type(headers=headers_, body=body) if frame_type in FRAMES_WITH_BODY else frame_type(headers=headers_) # type: ignore[call-arg] -@dataclass(kw_only=True, slots=True) +@dataclass(kw_only=True, slots=True, init=False) class FrameParser: - _current_buf: bytearray = field(default_factory=bytearray, init=False) - _previous_byte: bytes | None = field(default=None, init=False) - _headers_processed: bool = field(default=False, init=False) - _command: bytes | None = field(default=None, init=False) - _headers: dict[str, str] = field(default_factory=dict, init=False) - _content_length: int | None = field(default=None, init=False) + _current_buf: bytearray + _previous_byte: bytes | None + _headers_processed: bool + _command: bytes | None + _headers: dict[str, str] + _content_length: int | None + + def __init__(self) -> None: + self._previous_byte = None + self._reset() def _reset(self) -> None: - self._headers_processed = False self._current_buf = bytearray() + self._headers_processed = False self._command = None self._headers = {} self._content_length = None From 44c76bcb992d27729d2e1f08a174a91de1f32b57 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 13:18:33 +0300 Subject: [PATCH 15/16] Remove AGENTS.md file detailing project context and architecture for AI agents. --- AGENTS.md | 195 ------------------------------------------------------ 1 file changed, 195 deletions(-) delete mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md deleted file mode 100644 index 342e8a13..00000000 --- a/AGENTS.md +++ /dev/null @@ -1,195 +0,0 @@ -# STOMPMAN Project Context for AI Agents - -## Project Overview - -**stompman** is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) that provides a typed, comprehensible API. It's designed to be: - -- Asynchronous (using Python's asyncio) -- Actively maintained (not abandoned) -- Type-safe with modern Python typing -- Compatible with STOMP 1.2 specification -- Tested with ActiveMQ Artemis and ActiveMQ Classic - -The project follows a monorepo structure with two main packages: -1. `stompman` - The core STOMP client library -2. `faststream-stomp` - A STOMP broker implementation for FastStream - -## Architecture and Key Components - -### Core Modules - -- `client.py` - Main `Client` class that manages connections, subscriptions, and transactions -- `frames.py` - Data classes representing STOMP protocol frames (Connect, Connected, Send, Subscribe, etc.) -- `serde.py` - Serialization/deserialization logic for STOMP frames -- `connection.py` - Low-level connection handling with abstract interface -- `connection_manager.py` - Manages connection lifecycle, retries, and failover -- `connection_lifespan.py` - Handles the connection lifecycle states -- `subscription.py` - Subscription management with auto/manual ACK modes -- `transaction.py` - Transaction support for atomic message operations -- `config.py` - Configuration data classes (ConnectionParameters, Heartbeat) -- `errors.py` - Custom exception hierarchy -- `logger.py` - Centralized logging - -### Key Features - -1. **Automatic Reconnection**: Handles connection failures with configurable retry logic -2. **Load Balancing**: Supports multiple server connections with automatic failover -3. **Heartbeat Management**: Automatic heartbeat sending/receiving to detect connection liveness -4. **Subscription Management**: Both auto-ACK and manual-ACK subscription modes -5. **Transaction Support**: Context manager-based transaction handling -6. **Type Safety**: Fully typed API with comprehensive type hints - -## Development Workflow - -### Prerequisites - -- Python 3.11+ -- uv (package manager) -- Docker (for integration tests) - -### Setup Commands - -```bash -# Install dependencies -just install - -# Alternative manual setup -uv lock --upgrade -uv sync --all-extras --all-packages --frozen -``` - -### Development Commands - -```bash -# Run linters -just lint -# Equivalent to: -# uv run ruff check . -# uv run ruff format . - -# Check types -just check-types -# Equivalent to: -# uv run mypy . - -# Run fast tests (unit tests only) -just test-fast - -# Run all tests (including integration tests with Docker) -just test - -# Run specific tests -just test-fast packages/stompman/test_stompman/test_frame_serde.py -``` - -### Testing - -The project uses pytest with comprehensive test coverage: - -- Unit tests for individual components -- Integration tests with real STOMP servers (ActiveMQ Artemis/Classic) -- Property-based testing with Hypothesis -- Test fixtures for connection parameters and asyncio backends - -Integration tests require Docker containers to be running: -```bash -just test -``` - -### Code Quality - -- Strict mypy type checking enabled -- Ruff linting with comprehensive ruleset -- 100% test coverage enforced -- Modern Python features (3.11+) required - -## Usage Patterns - -### Basic Client Usage - -```python -async with stompman.Client( - servers=[ - stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"), - ], - heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000), -) as client: - # Send messages - await client.send(b"hi there!", destination="DLQ", headers={"persistent": "true"}) - - # Subscribe to messages - await client.subscribe("DLQ", handle_message_from_dlq, on_suppressed_exception=print) -``` - -### Transaction Usage - -```python -async with client.begin() as transaction: - for _ in range(10): - await transaction.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"}) - await asyncio.sleep(0.1) -``` - -### Manual ACK Subscription - -```python -async def handle_message_from_dlq(message_frame: stompman.AckableMessageFrame) -> None: - print(message_frame.body) - await message_frame.ack() - -await client.subscribe_with_manual_ack("DLQ", handle_message_from_dlq, ack="client") -``` - -## Project Conventions - -### Code Style - -- Line length: 120 characters -- Strict typing with `from __future__ import annotations` -- Dataclasses with `slots=True` for performance -- Comprehensive docstrings for public APIs -- Modern Python syntax (structural pattern matching, etc.) - -### Testing Conventions - -- Test files named `test_*.py` -- Use of pytest parametrization for comprehensive test cases -- Property-based testing for serialization/deserialization -- Mocking only when necessary, preferring integration tests -- Test coverage reporting enabled - -### Git Workflow - -- Feature branches from main -- Squash merges for clean history -- Semantic commit messages -- Pull requests with CI checks - -## Common Development Tasks - -### Adding New Frame Types - -1. Add frame definition in `frames.py` -2. Update `COMMANDS_TO_FRAMES` and `FRAMES_TO_COMMANDS` mappings in `serde.py` -3. Add serialization/deserialization tests in `test_frame_serde.py` -4. Update exports in `__init__.py` - -### Modifying Connection Logic - -1. Changes usually belong in `connection.py` or `connection_manager.py` -2. Update corresponding tests in `test_connection.py` or `test_connection_manager.py` -3. Integration tests may need updates in `test_integration.py` - -### Adding Configuration Options - -1. Add fields to appropriate dataclass in `config.py` -2. Pass through to relevant components in `client.py` -3. Update README.md documentation -4. Add tests for new configuration behavior - -## Debugging Tips - -- Use `stompman.logger` for debugging connection issues -- Enable verbose logging with `logging.getLogger('stompman').setLevel(logging.DEBUG)` -- Check integration tests for real-world usage examples -- Use the examples/ directory for quick prototyping From 3abda9a24364b8610aac3e3ba3465758864f23ae Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 29 Aug 2025 14:12:15 +0300 Subject: [PATCH 16/16] Refactor FrameParser to improve frame parsing logic and readability. --- packages/stompman/stompman/serde.py | 95 +++++++++++++++++------------ 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/packages/stompman/stompman/serde.py b/packages/stompman/stompman/serde.py index b2f9bf0b..4491a05b 100644 --- a/packages/stompman/stompman/serde.py +++ b/packages/stompman/stompman/serde.py @@ -160,47 +160,62 @@ def _reset(self) -> None: self._headers = {} self._content_length = None - def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: # noqa: C901, PLR0912 - for byte in iter_bytes(chunk): # noqa: PLR1702 + def _handle_null_byte(self) -> Iterator[AnyClientFrame | AnyServerFrame]: + if not self._command or not self._headers_processed: + self._reset() + return + if self._content_length is not None and self._content_length != len(self._current_buf): + self._current_buf += NULL + return + yield make_frame_from_parts(command=self._command, headers=self._headers, body=bytes(self._current_buf)) + self._reset() + + def _handle_newline_byte(self) -> Iterator[HeartbeatFrame]: + if not self._current_buf and not self._command: + yield HeartbeatFrame() + return + if self._previous_byte == CARRIAGE: + self._current_buf.pop() + self._headers_processed = not self._current_buf # extra empty line after headers + + if self._command: + self._process_header() + else: + self._process_command() + + def _process_command(self) -> None: + current_buf_bytes = bytes(self._current_buf) + if current_buf_bytes not in COMMANDS_TO_FRAMES: + self._reset() + else: + self._command = current_buf_bytes + self._current_buf = bytearray() + + def _process_header(self) -> None: + header = parse_header(self._current_buf) + if not header: + self._current_buf = bytearray() + return + header_key, header_value = header + if header_key not in self._headers: + self._headers[header_key] = header_value + if header_key.lower() == "content-length": + with suppress(ValueError): + self._content_length = int(header_value) + self._current_buf = bytearray() + + def _handle_body_byte(self, byte: bytes) -> None: + if self._content_length is None or self._content_length != len(self._current_buf): + self._current_buf += byte + + def parse_frames_from_chunk(self, chunk: bytes) -> Iterator[AnyClientFrame | AnyServerFrame]: + for byte in iter_bytes(chunk): if byte == NULL: - if self._command and self._headers_processed: - if self._content_length is not None and self._content_length != len(self._current_buf): - self._current_buf += byte - continue - yield make_frame_from_parts( - command=self._command, headers=self._headers, body=bytes(self._current_buf) - ) - self._reset() - - elif not self._headers_processed and byte == NEWLINE: - if not self._current_buf and not self._command: - yield HeartbeatFrame() - else: - if self._previous_byte == CARRIAGE: - self._current_buf.pop() - self._headers_processed = not self._current_buf # extra empty line after headers - - if not self._command: - current_buf_bytes = bytes(self._current_buf) - if current_buf_bytes not in COMMANDS_TO_FRAMES: - self._reset() - else: - self._command = current_buf_bytes - self._current_buf = bytearray() - else: - header = parse_header(self._current_buf) - if header and header[0] not in self._headers: - self._headers[header[0]] = header[1] - if header[0].lower() == "content-length": - with suppress(ValueError): - self._content_length = int(header[1]) - self._current_buf = bytearray() - - elif not self._headers_processed: - self._current_buf += byte - elif self._content_length is not None: - if self._content_length != len(self._current_buf): - self._current_buf += byte + yield from self._handle_null_byte() + elif self._headers_processed: + self._handle_body_byte(byte) + elif byte == NEWLINE: + yield from self._handle_newline_byte() else: self._current_buf += byte