Skip to content

Commit 797e070

Browse files
committed
Merge branch 'main' into fix-acks-on-activemq-classic
2 parents de61139 + 5d996b5 commit 797e070

File tree

8 files changed

+44
-40
lines changed

8 files changed

+44
-40
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ stompman takes care of cleaning up resources automatically. When you leave the c
127127
Also, I want to pointed out that:
128128

129129
- Protocol parsing is inspired by [aiostomp](https://github.com/pedrokiefer/aiostomp/blob/3449dcb53f43e5956ccc7662bb5b7d76bc6ef36b/aiostomp/protocol.py) (meaning: consumed by me and refactored from).
130-
- stompman is tested and used with [Artemis ActiveMQ](https://activemq.apache.org/components/artemis/).
130+
- stompman is tested and used with [ActiveMQ Artemis](https://activemq.apache.org/components/artemis/) and [ActiveMQ Classic](https://activemq.apache.org/components/classic/).
131131
- Specification says that headers in CONNECT and CONNECTED frames shouldn't be escaped for backwards compatibility. stompman escapes headers in CONNECT frame (outcoming), but does not unescape headers in CONNECTED (outcoming).
132132

133133
### Examples

docker-compose.yml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@ services:
55
args:
66
PYTHON_IMAGE: ${PYTHON_IMAGE:-python:3.13-slim-bullseye}
77
depends_on:
8-
artemis:
8+
activemq-artemis:
9+
condition: service_started
10+
activemq-classic:
911
condition: service_started
10-
environment:
11-
ARTEMIS_HOST: artemis
1212

13-
artemis:
13+
activemq-artemis:
1414
image: apache/activemq-artemis:2.37.0-alpine
1515
environment:
1616
ARTEMIS_USER: admin
1717
ARTEMIS_PASSWORD: ":=123"
1818
ports:
1919
- 8161:8161
2020
- 61616:61616
21+
22+
activemq-classic:
23+
image: apache/activemq-classic:6.1.2
24+
environment:
25+
ACTIVEMQ_CONNECTION_USER: admin
26+
ACTIVEMQ_CONNECTION_PASSWORD: ":=123"

examples/consumer.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
import asyncio
2-
import os
32

43
import stompman
54

6-
server = stompman.ConnectionParameters(
7-
host=os.environ.get("ARTEMIS_HOST", "0.0.0.0"), # noqa: S104
8-
port=61616,
9-
login="admin",
10-
passcode=":=123",
11-
)
5+
server = stompman.ConnectionParameters(host="0.0.0.0", port=61616, login="admin", passcode=":=123") # noqa: S104
126

137

148
async def handle_message(message_frame: stompman.MessageFrame) -> None:

examples/producer.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
import asyncio
2-
import os
32

43
import stompman
54

6-
server = stompman.ConnectionParameters(
7-
host=os.environ.get("ARTEMIS_HOST", "0.0.0.0"), # noqa: S104
8-
port=61616,
9-
login="admin",
10-
passcode=":=123",
11-
)
5+
server = stompman.ConnectionParameters(host="0.0.0.0", port=61616, login="admin", passcode=":=123") # noqa: S104
126

137

148
async def main() -> None:

stompman/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import asyncio
22
import inspect
3-
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
3+
from collections.abc import AsyncGenerator, Awaitable, Callable
44
from contextlib import AsyncExitStack, asynccontextmanager
55
from dataclasses import dataclass, field
66
from functools import partial
77
from ssl import SSLContext
88
from types import TracebackType
9-
from typing import ClassVar, Literal, Self
9+
from typing import Any, ClassVar, Literal, Self
1010

1111
from stompman.config import ConnectionParameters, Heartbeat
1212
from stompman.connection import AbstractConnection, Connection
@@ -30,8 +30,8 @@ class Client:
3030
PROTOCOL_VERSION: ClassVar = "1.2" # https://stomp.github.io/stomp-specification-1.2.html
3131

3232
servers: list[ConnectionParameters] = field(kw_only=False)
33-
on_error_frame: Callable[[ErrorFrame], None] | None = None
34-
on_heartbeat: Callable[[], None] | Callable[[], Awaitable[None]] | None = None
33+
on_error_frame: Callable[[ErrorFrame], Any] | None = None
34+
on_heartbeat: Callable[[], Any] | Callable[[], Awaitable[Any]] | None = None
3535

3636
heartbeat: Heartbeat = field(default=Heartbeat(1000, 1000))
3737
ssl: Literal[True] | SSLContext | None = None
@@ -146,11 +146,11 @@ async def begin(self) -> AsyncGenerator[Transaction, None]:
146146
async def subscribe(
147147
self,
148148
destination: str,
149-
handler: Callable[[MessageFrame], Coroutine[None, None, None]],
149+
handler: Callable[[MessageFrame], Awaitable[Any]],
150150
*,
151151
ack: AckMode = "client-individual",
152152
headers: dict[str, str] | None = None,
153-
on_suppressed_exception: Callable[[Exception, MessageFrame], None],
153+
on_suppressed_exception: Callable[[Exception, MessageFrame], Any],
154154
suppressed_exception_classes: tuple[type[Exception], ...] = (Exception,),
155155
) -> "Subscription":
156156
subscription = Subscription(

stompman/connection_lifespan.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from collections.abc import Callable
33
from contextlib import suppress
44
from dataclasses import dataclass
5-
from typing import Protocol
5+
from typing import Any, Protocol
66
from uuid import uuid4
77

88
from stompman.config import ConnectionParameters, Heartbeat
@@ -37,7 +37,7 @@ class ConnectionLifespan(AbstractConnectionLifespan):
3737
disconnect_confirmation_timeout: int
3838
active_subscriptions: ActiveSubscriptions
3939
active_transactions: ActiveTransactions
40-
set_heartbeat_interval: Callable[[float], None]
40+
set_heartbeat_interval: Callable[[float], Any]
4141

4242
async def _establish_connection(self) -> StompProtocolConnectionIssue | None:
4343
await self.connection.write_frame(

stompman/subscription.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from collections.abc import Callable, Coroutine
1+
from collections.abc import Awaitable, Callable
22
from dataclasses import dataclass, field
3+
from typing import Any
34
from uuid import uuid4
45

56
from stompman.connection import AbstractConnection
@@ -21,9 +22,9 @@ class Subscription:
2122
id: str = field(default_factory=lambda: _make_subscription_id(), init=False) # noqa: PLW0108
2223
destination: str
2324
headers: dict[str, str] | None
24-
handler: Callable[[MessageFrame], Coroutine[None, None, None]]
25+
handler: Callable[[MessageFrame], Awaitable[Any]]
2526
ack: AckMode
26-
on_suppressed_exception: Callable[[Exception, MessageFrame], None]
27+
on_suppressed_exception: Callable[[Exception, MessageFrame], Any]
2728
suppressed_exception_classes: tuple[type[Exception], ...]
2829
_connection_manager: ConnectionManager
2930
_active_subscriptions: ActiveSubscriptions

tests/integration.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import asyncio
2-
import os
32
from collections.abc import AsyncGenerator, Callable
43
from contextlib import asynccontextmanager
54
from itertools import starmap
6-
from typing import Final
5+
from typing import Final, cast
76
from uuid import uuid4
87

98
import pytest
@@ -21,23 +20,29 @@
2120
parse_header,
2221
)
2322

24-
pytestmark = pytest.mark.anyio
23+
DESTINATION: Final = "DLQ"
24+
2525

26-
CONNECTION_PARAMETERS: Final = stompman.ConnectionParameters(
27-
host=os.environ["ARTEMIS_HOST"], port=61616, login="admin", passcode=":=123"
26+
@pytest.fixture(
27+
params=[
28+
stompman.ConnectionParameters(host="activemq-artemis", port=61616, login="admin", passcode=":=123"),
29+
stompman.ConnectionParameters(host="activemq-classic", port=61613, login="admin", passcode=":=123"),
30+
]
2831
)
29-
DESTINATION: Final = "DLQ"
32+
def connection_parameters(request: pytest.FixtureRequest) -> stompman.ConnectionParameters:
33+
return cast(stompman.ConnectionParameters, request.param)
3034

3135

3236
@asynccontextmanager
33-
async def create_client() -> AsyncGenerator[stompman.Client, None]:
37+
async def create_client(connection_parameters: stompman.ConnectionParameters) -> AsyncGenerator[stompman.Client, None]:
3438
async with stompman.Client(
35-
servers=[CONNECTION_PARAMETERS], read_timeout=10, connection_confirmation_timeout=10
39+
servers=[connection_parameters], read_timeout=10, connection_confirmation_timeout=10
3640
) as client:
3741
yield client
3842

3943

40-
async def test_ok() -> None:
44+
@pytest.mark.anyio
45+
async def test_ok(connection_parameters: stompman.ConnectionParameters) -> None:
4146
async def produce() -> None:
4247
for message in messages[200:]:
4348
await producer.send(body=message, destination=DESTINATION, headers={"hello": "from outside transaction"})
@@ -65,7 +70,11 @@ async def handle_message(frame: stompman.MessageFrame) -> None: # noqa: RUF029
6570

6671
messages = [str(uuid4()).encode() for _ in range(10000)]
6772

68-
async with create_client() as consumer, create_client() as producer, asyncio.TaskGroup() as task_group:
73+
async with (
74+
create_client(connection_parameters) as consumer,
75+
create_client(connection_parameters) as producer,
76+
asyncio.TaskGroup() as task_group,
77+
):
6978
task_group.create_task(consume())
7079
task_group.create_task(produce())
7180

0 commit comments

Comments
 (0)