Skip to content

Commit 0aa4182

Browse files
authored
Change API for subscribing and listening (#43)
1 parent 8e86e23 commit 0aa4182

17 files changed

+949
-801
lines changed

.github/dependabot.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@ updates:
99
directory: /
1010
schedule:
1111
interval: monthly
12+
13+
- package-ecosystem: docker
14+
directory: /
15+
schedule:
16+
interval: monthly

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ARG PYTHON_VERSION
2-
FROM ghcr.io/astral-sh/uv:latest as uv
2+
FROM ghcr.io/astral-sh/uv:0.2.27 as uv
33
FROM python:${PYTHON_VERSION}-slim-bullseye
44

55
WORKDIR /app

README.md

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ async with stompman.Client(
2424
stompman.ConnectionParameters(host="171.0.0.1", port=61616, login="user1", passcode="passcode1"),
2525
stompman.ConnectionParameters(host="172.0.0.1", port=61616, login="user2", passcode="passcode2"),
2626
],
27+
28+
# Handlers:
29+
on_error_frame=lambda error_frame: print(error_frame.body),
30+
on_unhandled_message_frame=lambda message_frame: print(message_frame.body),
31+
on_heartbeat=lambda: print("Server sent a heartbeat"),
32+
2733
# Optional parameters with sensible defaults:
2834
heartbeat=stompman.Heartbeat(will_send_interval_ms=1000, want_to_receive_interval_ms=1000),
2935
connect_retry_attempts=3,
@@ -40,7 +46,7 @@ async with stompman.Client(
4046
To send a message, use the following code:
4147

4248
```python
43-
await client.send(body=b"hi there!", destination="DLQ", headers={"persistent": "true"})
49+
await client.send(b"hi there!", destination="DLQ", headers={"persistent": "true"})
4450
```
4551

4652
Or, to send messages in a transaction:
@@ -54,61 +60,48 @@ async with client.begin() as transaction:
5460

5561
### Listening for Messages
5662

57-
Now, let's subscribe to a queue and listen for messages.
58-
59-
Notice that `listen()` is not bound to a destination: it will listen to all subscribed destinations. If you want separate subscribtions, create separate clients for that.
63+
Now, let's subscribe to a destination and listen for messages:
6064

6165
```python
62-
async with client.subscribe("DLQ"):
63-
async for event in client.listen():
64-
...
66+
async def handle_message_from_dlq(message_frame: stompman.MessageFrame) -> None:
67+
print(message_frame.body)
68+
69+
70+
await client.subscribe("DLQ", handle_message_from_dlq):
6571
```
6672

67-
`...`—and that's where it gets interesting.
73+
Entered `stompman.Client` will block forever waiting for messages if there are any active subscriptions.
6874

69-
Before learning how to processing messages from server, we need to understand how other libraries do it. They use callbacks. Damn callbacks in asynchronous programming.
75+
Sometimes it's useful to avoid that:
7076

71-
I wanted to avoid them, and came up with an elegant solution: combining async generator and match statement. Here how it looks like:
77+
```python
78+
dlq_subscription = await client.subscribe("DLQ", handle_message_from_dlq)
79+
await dlq_subscription.unsubscribe()
80+
```
81+
82+
By default, subscription have ACK mode "client-individual". If handler successfully processes the message, an `ACK` frame will be sent. If handler raises an exception, a `NACK` frame will be sent. You can catch (and log) exceptions using `on_suppressed_exception` parameter:
7283

7384
```python
74-
async for event in client.listen():
75-
match event:
76-
case stompman.MessageEvent(body=body):
77-
print(f"message: {body!s}")
78-
await event.ack()
79-
case stompman.ErrorEvent(message_header=short_description, body=body):
80-
print(f"{short_description}:\n{body!s}")
85+
await client.subscribe(
86+
"DLQ",
87+
handle_message_from_dlq,
88+
on_suppressed_exception=lambda exception, message_frame: print(exception, message_frame),
89+
)
8190
```
8291

83-
More complex example, that involves handling all possible events, and auto-acknowledgement:
92+
You can change the ack mode used by specifying the `ack` parameter:
8493

8594
```python
86-
async with asyncio.TaskGroup() as task_group:
87-
async for event in client.listen():
88-
match event:
89-
case stompman.MessageEvent(body=body):
90-
task_group.create_task(
91-
event.with_auto_ack(
92-
handle_message(body),
93-
on_suppressed_exception=lambda _exception, event: log.exception(
94-
"Failed to process message", stompman_event=event
95-
),
96-
)
97-
)
98-
case stompman.ErrorEvent():
99-
log.error("Received an error from server", stompman_event=event)
100-
case stompman.HeartbeatEvent():
101-
task_group.create_task(update_healthcheck_status())
102-
103-
104-
async def handle_message(event: stompman.MessageEvent) -> None:
105-
validated_message = MyMessageModel.model_validate_json(event.body)
106-
await run_business_logic(validated_message)
95+
# Server will assume that all messages sent to the subscription before the ACK'ed message are received and processed:
96+
await client.subscribe("DLQ", handle_message_from_dlq, ack="client")
97+
98+
# Server will assume that messages are received as soon as it send them to client:
99+
await client.subscribe("DLQ", handle_message_from_dlq, ack="auto")
107100
```
108101

109102
### Cleaning Up
110103

111-
stompman takes care of cleaning up resources automatically. When you leave the context of async context managers `stompman.Client()`, `client.subscribe()`, or `client.begin()`, the necessary frames will be sent to the server.
104+
stompman takes care of cleaning up resources automatically. When you leave the context of async context managers `stompman.Client()`, or `client.begin()`, the necessary frames will be sent to the server.
112105

113106
### Handling Connectivity Issues
114107

@@ -127,17 +120,12 @@ stompman takes care of cleaning up resources automatically. When you leave the c
127120

128121
### ...and caveats
129122

130-
- stompman only runs on Python 3.11 and newer.
123+
- stompman supports Python 3.11 and newer.
131124
- It implements [STOMP 1.2](https://stomp.github.io/stomp-specification-1.2.html) — the latest version of the protocol.
132-
- The client-individual ack mode is used, which means that server requires `ack` or `nack`. In contrast, with `client` ack mode server assumes you don't care about messages that occured before you connected. And, with `auto` ack mode server assumes client successfully received the message.
133-
- Heartbeats are required, and sent automatically on `listen()` (defaults to 1 second).
125+
- Heartbeats are required, and sent automatically in background (defaults to 1 second).
134126

135127
Also, I want to pointed out that:
136128

137129
- Protocol parsing is inspired by [aiostomp](https://github.com/pedrokiefer/aiostomp/blob/3449dcb53f43e5956ccc7662bb5b7d76bc6ef36b/aiostomp/protocol.py) (meaning: consumed by me and refactored from).
138130
- stompman is tested and used with [Artemis ActiveMQ](https://activemq.apache.org/components/artemis/).
139-
- Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman doesn't escape headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).
140-
141-
## No docs
142-
143-
I try to keep it simple and easy to understand. May be counter-intuitive for some, but concise high-quality code speaks for itself. There're no comments and little indirection. Read the source if you wish, leave an issue if it's not enough or you want to add or fix something.
131+
- Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ dev-dependencies = [
2828
"ruff~=0.4.9",
2929
"uvloop~=0.19.0",
3030
"hypothesis~=6.103.2",
31+
"polyfactory",
32+
"faker",
3133
]
3234

3335
[build-system]

stompman/__init__.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
1-
from stompman.client import (
2-
AnyListeningEvent,
3-
Client,
4-
ConnectionParameters,
5-
ErrorEvent,
6-
Heartbeat,
7-
HeartbeatEvent,
8-
MessageEvent,
9-
)
1+
from stompman.client import Client, ConnectionParameters, Heartbeat, Subscription, Transaction
102
from stompman.connection import AbstractConnection, Connection
113
from stompman.errors import (
124
ConnectionConfirmationTimeoutError,
@@ -19,6 +11,7 @@
1911
AbortFrame,
2012
AckFrame,
2113
AnyClientFrame,
14+
AnyRealServerFrame,
2215
AnyServerFrame,
2316
BeginFrame,
2417
CommitFrame,
@@ -40,7 +33,7 @@
4033
"AbstractConnection",
4134
"AckFrame",
4235
"AnyClientFrame",
43-
"AnyListeningEvent",
36+
"AnyRealServerFrame",
4437
"AnyServerFrame",
4538
"BeginFrame",
4639
"Client",
@@ -53,18 +46,17 @@
5346
"ConnectionParameters",
5447
"DisconnectFrame",
5548
"Error",
56-
"ErrorEvent",
5749
"ErrorFrame",
5850
"FailedAllConnectAttemptsError",
5951
"Heartbeat",
60-
"HeartbeatEvent",
6152
"HeartbeatFrame",
62-
"MessageEvent",
6353
"MessageFrame",
6454
"NackFrame",
6555
"ReceiptFrame",
6656
"SendFrame",
6757
"SubscribeFrame",
58+
"Subscription",
59+
"Transaction",
6860
"UnsubscribeFrame",
6961
"UnsupportedProtocolVersionError",
7062
]

0 commit comments

Comments
 (0)