Skip to content

Implement broker health check #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions microbootstrap/bootstrappers/faststream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import dataclasses
import json
import typing

Expand All @@ -7,6 +8,7 @@
import typing_extensions
from faststream.asgi import AsgiFastStream, AsgiResponse
from faststream.asgi import get as handle_get
from health_checks.http_based import BaseHTTPHealthCheck

from microbootstrap.bootstrappers.base import ApplicationBootstrapper
from microbootstrap.config.faststream import FastStreamConfig
Expand Down Expand Up @@ -96,8 +98,17 @@ def get_config_type(cls) -> type[FastStreamPrometheusConfig]:
return FastStreamPrometheusConfig


@dataclasses.dataclass
class FastStreamHealthCheck(BaseHTTPHealthCheck):
application: AsgiFastStream | None = None

async def update_health_status(self) -> bool:
return await self.application.broker.ping(timeout=5) if self.application and self.application.broker else False


@FastStreamBootstrapper.use_instrument()
class FastStreamHealthChecksInstrument(HealthChecksInstrument):
def bootstrap(self) -> None: ...
def bootstrap_before(self) -> dict[str, typing.Any]:
@handle_get
async def check_health(scope: typing.Any) -> AsgiResponse: # noqa: ANN401, ARG001
Expand All @@ -109,3 +120,11 @@ async def check_health(scope: typing.Any) -> AsgiResponse: # noqa: ANN401, ARG0
)

return {"asgi_routes": ((self.instrument_config.health_checks_path, check_health),)}

def bootstrap_after(self, application: AsgiFastStream) -> AsgiFastStream: # type: ignore[override]
self.health_check = FastStreamHealthCheck(
service_version=self.instrument_config.service_version,
service_name=self.instrument_config.service_name,
application=application,
)
return application
7 changes: 6 additions & 1 deletion microbootstrap/instruments/health_checks_instrument.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations
import typing

from health_checks.http_based import DefaultHTTPHealthCheck

from microbootstrap.instruments.base import BaseInstrumentConfig, Instrument


if typing.TYPE_CHECKING:
from health_checks.base import HealthCheck


class HealthChecksConfig(BaseInstrumentConfig):
service_name: str = "micro-service"
service_version: str = "1.0.0"
Expand All @@ -19,7 +24,7 @@ class HealthChecksInstrument(Instrument[HealthChecksConfig]):
ready_condition = "Set health_checks_enabled to True"

def bootstrap(self) -> None:
self.health_check = DefaultHTTPHealthCheck(
self.health_check: HealthCheck = DefaultHTTPHealthCheck(
service_version=self.instrument_config.service_version,
service_name=self.instrument_config.service_name,
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ litestar = [
"prometheus-client>=0.20",
]
granian = ["granian[reload]>=1"]
faststream = ["faststream~=0.5", "health-checks>=1", "prometheus-client>=0.20"]
faststream = ["faststream~=0.5", "prometheus-client>=0.20"]

[dependency-groups]
dev = [
Expand Down
34 changes: 24 additions & 10 deletions tests/bootstrappers/test_faststream.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,31 @@ def test_faststream_configure_application_lifespan(broker: RedisBroker, magic_mo
assert magic_mock.called


def test_faststream_health_check(broker: RedisBroker) -> None:
test_health_path: typing.Final = "/test-health-path"
application: typing.Final = (
FastStreamBootstrapper(FastStreamSettings())
.configure_application(FastStreamConfig(broker=broker))
.configure_instruments(HealthChecksConfig(health_checks_path=test_health_path))
.bootstrap()
)
class TestFastStreamHealthCheck:
def test_500(self, broker: RedisBroker) -> None:
test_health_path: typing.Final = "/test-health-path"
application: typing.Final = (
FastStreamBootstrapper(FastStreamSettings())
.configure_application(FastStreamConfig(broker=broker))
.configure_instruments(HealthChecksConfig(health_checks_path=test_health_path))
.bootstrap()
)

response: typing.Final = TestClient(app=application).get(test_health_path)
assert response.status_code == status.HTTP_200_OK
response: typing.Final = TestClient(app=application).get(test_health_path)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR

async def test_ok(self, broker: RedisBroker) -> None:
test_health_path: typing.Final = "/test-health-path"
application: typing.Final = (
FastStreamBootstrapper(FastStreamSettings())
.configure_application(FastStreamConfig(broker=broker))
.configure_instruments(HealthChecksConfig(health_checks_path=test_health_path))
.bootstrap()
)

async with TestRedisBroker(broker):
response: typing.Final = TestClient(app=application).get(test_health_path)
assert response.status_code == status.HTTP_200_OK


async def test_faststream_opentelemetry(
Expand Down