Skip to content

Commit 52d9352

Browse files
Introduce add_content_length=False param in send() (#119)
1 parent b349d6d commit 52d9352

File tree

6 files changed

+87
-5
lines changed

6 files changed

+87
-5
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
.venv
44
dist
55
uv.lock
6+
.vscode
7+
__pycache__

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ Also, I want to pointed out that:
144144

145145
- Protocol parsing is inspired by [aiostomp](https://github.com/pedrokiefer/aiostomp/blob/3449dcb53f43e5956ccc7662bb5b7d76bc6ef36b/aiostomp/protocol.py) (meaning: consumed by me and refactored from).
146146
- stompman is tested and used with [ActiveMQ Artemis](https://activemq.apache.org/components/artemis/) and [ActiveMQ Classic](https://activemq.apache.org/components/classic/).
147+
- Caveat: a message sent by a Stomp client is converted into a JMS `TextMessage`/`BytesMessage` based on the `content-length` header (see the docs [here](https://activemq.apache.org/components/classic/documentation/stomp)). In order to send a `TextMessage`, `Client.send` needs to be invoked with `add_content_length` header set to `False`
147148
- Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).
148149

149150
### FastStream STOMP broker

packages/stompman/stompman/client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,22 @@ async def _listen_to_frames(self) -> None:
115115
pass
116116

117117
async def send(
118-
self, body: bytes, destination: str, *, content_type: str | None = None, headers: dict[str, str] | None = None
118+
self,
119+
body: bytes,
120+
destination: str,
121+
*,
122+
content_type: str | None = None,
123+
add_content_length: bool = True,
124+
headers: dict[str, str] | None = None,
119125
) -> None:
120126
await self._connection_manager.write_frame_reconnecting(
121127
SendFrame.build(
122-
body=body, destination=destination, transaction=None, content_type=content_type, headers=headers
128+
body=body,
129+
destination=destination,
130+
transaction=None,
131+
content_type=content_type,
132+
add_content_length=add_content_length,
133+
headers=headers,
123134
)
124135
)
125136

packages/stompman/stompman/frames.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,13 @@ def build(
149149
destination: str,
150150
transaction: str | None,
151151
content_type: str | None,
152+
add_content_length: bool,
152153
headers: dict[str, str] | None,
153154
) -> Self:
154155
all_headers: SendHeaders = headers or {} # type: ignore[assignment]
155156
all_headers["destination"] = destination
156-
all_headers["content-length"] = str(len(body))
157+
if add_content_length:
158+
all_headers["content-length"] = str(len(body))
157159
if content_type is not None:
158160
all_headers["content-type"] = content_type
159161
if transaction is not None:

packages/stompman/stompman/transaction.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,21 @@ async def __aexit__(
3434
self._active_transactions.remove(self)
3535

3636
async def send(
37-
self, body: bytes, destination: str, *, content_type: str | None = None, headers: dict[str, str] | None = None
37+
self,
38+
body: bytes,
39+
destination: str,
40+
*,
41+
content_type: str | None = None,
42+
add_content_length: bool = True,
43+
headers: dict[str, str] | None = None,
3844
) -> None:
3945
frame = SendFrame.build(
40-
body=body, destination=destination, transaction=self.id, content_type=content_type, headers=headers
46+
body=body,
47+
destination=destination,
48+
transaction=self.id,
49+
content_type=content_type,
50+
add_content_length=add_content_length,
51+
headers=headers,
4152
)
4253
self.sent_frames.append(frame)
4354
await self._connection_manager.write_frame_reconnecting(frame)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import asyncio
2+
from typing import Any
3+
4+
import faker
5+
import pytest
6+
from stompman import (
7+
SendFrame,
8+
)
9+
from stompman.frames import SendHeaders
10+
11+
from test_stompman.conftest import (
12+
EnrichedClient,
13+
create_spying_connection,
14+
enrich_expected_frames,
15+
get_read_frames_with_lifespan,
16+
)
17+
18+
pytestmark = pytest.mark.anyio
19+
20+
21+
@pytest.mark.parametrize(
22+
("args", "expected_body", "expected_headers"),
23+
[
24+
(
25+
{"body": b"Some body", "destination": "Some/queue"},
26+
b"Some body",
27+
{"content-length": "9", "destination": "Some/queue"},
28+
),
29+
(
30+
{"body": b"Some body", "destination": "Some/queue", "add_content_length": True},
31+
b"Some body",
32+
{"content-length": "9", "destination": "Some/queue"},
33+
),
34+
(
35+
{"body": b"Some body", "destination": "Some/queue", "content_type": "text/plain"},
36+
b"Some body",
37+
{"content-length": "9", "destination": "Some/queue", "content-type": "text/plain"},
38+
),
39+
(
40+
{"body": b"Some body", "destination": "Some/queue", "add_content_length": False},
41+
b"Some body",
42+
{"destination": "Some/queue"},
43+
),
44+
],
45+
)
46+
async def test_send_message(args: dict[str, Any], expected_body: bytes, expected_headers: SendHeaders) -> None:
47+
connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([]))
48+
49+
async with EnrichedClient(connection_class=connection_class) as client:
50+
await client.send(**args)
51+
await asyncio.sleep(0)
52+
53+
assert collected_frames == enrich_expected_frames(
54+
SendFrame(headers=expected_headers, body=expected_body),
55+
)

0 commit comments

Comments
 (0)