Skip to content

Commit c82f4c7

Browse files
authored
faststream-stomp: Add OpenTelemetry and Prometheus middlewares (#100)
1 parent 57b306c commit c82f4c7

File tree

6 files changed

+150
-15
lines changed

6 files changed

+150
-15
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import stompman
2+
from faststream.broker.message import StreamMessage
3+
from faststream.opentelemetry import TelemetrySettingsProvider
4+
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
5+
from faststream.opentelemetry.middleware import TelemetryMiddleware
6+
from faststream.types import AnyDict
7+
from opentelemetry.metrics import Meter, MeterProvider
8+
from opentelemetry.semconv.trace import SpanAttributes
9+
from opentelemetry.trace import TracerProvider
10+
11+
from faststream_stomp.publisher import StompProducerPublishKwargs
12+
13+
14+
class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame]):
15+
messaging_system = "stomp"
16+
17+
def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> "AnyDict":
18+
return {
19+
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
20+
SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
21+
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
22+
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
23+
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.headers["destination"],
24+
}
25+
26+
def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame]) -> str: # noqa: PLR6301
27+
return msg.raw_message.headers["destination"]
28+
29+
def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override]
30+
publish_attrs = {
31+
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
32+
SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"],
33+
}
34+
if kwargs["correlation_id"]:
35+
publish_attrs[SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID] = kwargs["correlation_id"]
36+
return publish_attrs
37+
38+
def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301
39+
return kwargs["destination"]
40+
41+
42+
class StompTelemetryMiddleware(TelemetryMiddleware):
43+
def __init__(
44+
self,
45+
*,
46+
tracer_provider: TracerProvider | None = None,
47+
meter_provider: MeterProvider | None = None,
48+
meter: Meter | None = None,
49+
) -> None:
50+
super().__init__(
51+
settings_provider_factory=lambda _: StompTelemetrySettingsProvider(),
52+
tracer_provider=tracer_provider,
53+
meter_provider=meter_provider,
54+
meter=meter,
55+
include_messages_counters=False,
56+
)

packages/faststream-stomp/faststream_stomp/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async def parse_message(message: stompman.MessageFrame) -> StreamMessage[stompma
1212
body=message.body,
1313
headers=cast("dict[str, str]", message.headers),
1414
content_type=message.headers.get("content-type"),
15-
message_id=message.headers.get("message-id", gen_cor_id()),
15+
message_id=message.headers["message-id"],
1616
correlation_id=cast("str", message.headers.get("correlation-id", gen_cor_id())),
1717
)
1818

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
import stompman
6+
from faststream.prometheus import ConsumeAttrs, MetricsSettingsProvider
7+
from faststream.prometheus.middleware import BasePrometheusMiddleware
8+
from faststream.types import EMPTY
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Sequence
12+
13+
from faststream.broker.message import StreamMessage
14+
from prometheus_client import CollectorRegistry
15+
16+
from faststream_stomp.publisher import StompProducerPublishKwargs
17+
18+
__all__ = ["StompMetricsSettingsProvider", "StompPrometheusMiddleware"]
19+
20+
21+
class StompMetricsSettingsProvider(MetricsSettingsProvider[stompman.MessageFrame]):
22+
messaging_system = "stomp"
23+
24+
def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> ConsumeAttrs: # noqa: PLR6301
25+
return {
26+
"destination_name": msg.raw_message.headers["destination"],
27+
"message_size": len(msg.body),
28+
"messages_count": 1,
29+
}
30+
31+
def get_publish_destination_name_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301
32+
return kwargs["destination"]
33+
34+
35+
class StompPrometheusMiddleware(BasePrometheusMiddleware):
36+
def __init__(
37+
self,
38+
*,
39+
registry: CollectorRegistry,
40+
app_name: str = EMPTY,
41+
metrics_prefix: str = "faststream",
42+
received_messages_size_buckets: Sequence[float] | None = None,
43+
) -> None:
44+
super().__init__(
45+
settings_provider_factory=lambda _: StompMetricsSettingsProvider(),
46+
registry=registry,
47+
app_name=app_name,
48+
metrics_prefix=metrics_prefix,
49+
received_messages_size_buckets=received_messages_size_buckets,
50+
)

packages/faststream-stomp/faststream_stomp/publisher.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections.abc import Sequence
22
from functools import partial
33
from itertools import chain
4-
from typing import Any
4+
from typing import Any, TypedDict, Unpack
55

66
import stompman
77
from faststream.asyncapi.schema import Channel, CorrelationId, Message, Operation
@@ -14,26 +14,25 @@
1414
from faststream.types import AsyncFunc, SendableMessage
1515

1616

17+
class StompProducerPublishKwargs(TypedDict):
18+
destination: str
19+
correlation_id: str | None
20+
headers: dict[str, str] | None
21+
22+
1723
class StompProducer(ProducerProto):
1824
_parser: AsyncCallable
1925
_decoder: AsyncCallable
2026

2127
def __init__(self, client: stompman.Client) -> None:
2228
self.client = client
2329

24-
async def publish( # type: ignore[override]
25-
self,
26-
message: SendableMessage,
27-
*,
28-
destination: str,
29-
correlation_id: str | None,
30-
headers: dict[str, str] | None,
31-
) -> None:
30+
async def publish(self, message: SendableMessage, **kwargs: Unpack[StompProducerPublishKwargs]) -> None: # type: ignore[override]
3231
body, content_type = encode_message(message)
33-
all_headers = headers.copy() if headers else {}
34-
if correlation_id:
35-
all_headers["correlation-id"] = correlation_id
36-
await self.client.send(body, destination, content_type=content_type, headers=all_headers)
32+
all_headers = kwargs["headers"].copy() if kwargs["headers"] else {}
33+
if kwargs["correlation_id"]:
34+
all_headers["correlation-id"] = kwargs["correlation_id"]
35+
await self.client.send(body, kwargs["destination"], content_type=content_type, headers=all_headers)
3736

3837
async def request( # type: ignore[override]
3938
self, message: SendableMessage, *, correlation_id: str | None, headers: dict[str, str] | None

packages/faststream-stomp/pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "faststream-stomp"
33
description = "FastStream STOMP broker"
44
authors = [{ name = "Lev Vereshchagin", email = "mail@vrslev.com" }]
5-
dependencies = ["faststream>=0.5", "stompman"]
5+
dependencies = ["faststream~=0.5", "stompman"]
66
requires-python = ">=3.11"
77
readme = "README.md"
88
license = { text = "MIT" }
@@ -23,6 +23,9 @@ repository = "https://github.com/vrslev/stompman"
2323
requires = ["hatchling", "hatch-vcs"]
2424
build-backend = "hatchling.build"
2525

26+
[dependency-groups]
27+
dev = ["faststream[otel,prometheus]~=0.5"]
28+
2629
[tool.hatch.version]
2730
source = "vcs"
2831
raw-options.root = "../.."

packages/faststream-stomp/test_faststream_stomp/test_main.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
from faststream import FastStream
88
from faststream.asyncapi import get_app_schema
99
from faststream.broker.message import gen_cor_id
10+
from faststream_stomp.opentelemetry import StompTelemetryMiddleware
11+
from faststream_stomp.prometheus import StompPrometheusMiddleware
12+
from opentelemetry.sdk.metrics import MeterProvider
13+
from opentelemetry.sdk.trace import TracerProvider
14+
from prometheus_client import CollectorRegistry
1015
from test_stompman.conftest import build_dataclass
1116

1217
pytestmark = pytest.mark.anyio
@@ -81,3 +86,25 @@ def test_asyncapi_schema(faker: faker.Faker, broker: faststream_stomp.StompBroke
8186
)
8287
)
8388
get_app_schema(FastStream(broker))
89+
90+
91+
async def test_opentelemetry_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
92+
broker.add_middleware(StompTelemetryMiddleware(tracer_provider=TracerProvider(), meter_provider=MeterProvider()))
93+
94+
@broker.subscriber(destination := faker.pystr())
95+
def _() -> None: ...
96+
97+
async with faststream_stomp.TestStompBroker(broker):
98+
await broker.start()
99+
await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())
100+
101+
102+
async def test_prometheus_publish(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None:
103+
broker.add_middleware(StompPrometheusMiddleware(registry=CollectorRegistry()))
104+
105+
@broker.subscriber(destination := faker.pystr())
106+
def _() -> None: ...
107+
108+
async with faststream_stomp.TestStompBroker(broker):
109+
await broker.start()
110+
await broker.publish(faker.pystr(), destination, correlation_id=gen_cor_id())

0 commit comments

Comments
 (0)