2
2
pub mod constants;
3
3
pub mod error;
4
4
pub mod http_utils;
5
+ pub mod metrics;
5
6
pub mod server;
6
7
pub mod server_helpers;
7
8
pub mod types;
9
+ use crate :: metrics:: METRICS ;
8
10
use anyhow:: Result ;
11
+ use axum:: {
12
+ body:: Body ,
13
+ extract:: State ,
14
+ http:: { header:: CONTENT_TYPE , StatusCode } ,
15
+ response:: { IntoResponse , Response } ,
16
+ } ;
17
+ use prometheus_client:: { encoding:: text:: encode, registry:: Registry } ;
9
18
use rmcp:: {
10
19
transport:: sse_server:: { SseServer , SseServerConfig } ,
11
20
ServiceExt ,
12
21
} ;
13
22
pub use server:: SubgraphServer ;
14
- use std:: { env, net:: SocketAddr , time:: Duration } ;
23
+ use std:: { env, net:: SocketAddr , sync :: Arc , time:: Duration } ;
15
24
use tokio:: io;
16
25
use tokio_util:: sync:: CancellationToken ;
17
26
use tracing:: info;
@@ -24,7 +33,32 @@ async fn main() -> Result<()> {
24
33
. unwrap_or_else ( |e| eprintln ! ( "env_logger init failed: {}" , e) ) ;
25
34
26
35
if args. iter ( ) . any ( |arg| arg == "--sse" ) {
27
- start_sse_server ( ) . await
36
+ let shutdown_token = CancellationToken :: new ( ) ;
37
+
38
+ let sse_server_handle = tokio:: spawn ( start_sse_server ( shutdown_token. clone ( ) ) ) ;
39
+ let metrics_server_handle = tokio:: spawn ( start_metrics_server ( shutdown_token. clone ( ) ) ) ;
40
+
41
+ let mut sigterm =
42
+ tokio:: signal:: unix:: signal ( tokio:: signal:: unix:: SignalKind :: terminate ( ) ) ?;
43
+ tokio:: select! {
44
+ _ = tokio:: signal:: ctrl_c( ) => {
45
+ info!( "Ctrl+C (SIGINT) received, initiating graceful shutdown..." ) ;
46
+ } ,
47
+ _ = sigterm. recv( ) => {
48
+ info!( "SIGTERM received, initiating graceful shutdown..." ) ;
49
+ }
50
+ } ;
51
+
52
+ info ! ( "Signalling services to shut down..." ) ;
53
+ shutdown_token. cancel ( ) ;
54
+
55
+ let _ = sse_server_handle. await ?;
56
+ let _ = metrics_server_handle. await ?;
57
+
58
+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
59
+
60
+ info ! ( "All services shutdown complete." ) ;
61
+ Ok ( ( ) )
28
62
} else {
29
63
start_stdio_server ( ) . await
30
64
}
@@ -40,7 +74,7 @@ async fn start_stdio_server() -> Result<()> {
40
74
Ok ( ( ) )
41
75
}
42
76
43
- async fn start_sse_server ( ) -> Result < ( ) > {
77
+ async fn start_sse_server ( shutdown_token : CancellationToken ) -> Result < ( ) > {
44
78
info ! ( "Starting SSE Subgraph MCP Server" ) ;
45
79
let host = env:: var ( "HOST" ) . unwrap_or_else ( |_| "0.0.0.0" . to_string ( ) ) ;
46
80
let port = env:: var ( "PORT" ) . unwrap_or_else ( |_| "8000" . to_string ( ) ) ;
@@ -51,13 +85,11 @@ async fn start_sse_server() -> Result<()> {
51
85
let sse_path = env:: var ( "SSE_PATH" ) . unwrap_or_else ( |_| "/sse" . to_string ( ) ) ;
52
86
let post_path = env:: var ( "POST_PATH" ) . unwrap_or_else ( |_| "/messages" . to_string ( ) ) ;
53
87
54
- let server_shutdown_token = CancellationToken :: new ( ) ;
55
-
56
88
let config = SseServerConfig {
57
89
bind : bind_addr,
58
90
sse_path,
59
91
post_path,
60
- ct : server_shutdown_token . clone ( ) ,
92
+ ct : shutdown_token . clone ( ) ,
61
93
sse_keep_alive : Some ( Duration :: from_secs ( 30 ) ) ,
62
94
} ;
63
95
@@ -67,23 +99,59 @@ async fn start_sse_server() -> Result<()> {
67
99
let service_shutdown_token = sse_server. with_service ( SubgraphServer :: new) ;
68
100
info ! ( "Subgraph MCP Service attached to SSE server" ) ;
69
101
70
- let mut sigterm = tokio :: signal :: unix :: signal ( tokio :: signal :: unix :: SignalKind :: terminate ( ) ) ? ;
102
+ shutdown_token . cancelled ( ) . await ;
71
103
72
- tokio:: select! {
73
- _ = tokio:: signal:: ctrl_c( ) => {
74
- info!( "Ctrl+C (SIGINT) received, initiating graceful shutdown..." ) ;
75
- } ,
76
- _ = sigterm. recv( ) => {
77
- info!( "SIGTERM received, initiating graceful shutdown..." ) ;
78
- }
79
- } ;
80
-
81
- info ! ( "Signalling service and server to shut down..." ) ;
104
+ info ! ( "SSE Server shutdown signal received. Giving tasks a moment to finish..." ) ;
82
105
service_shutdown_token. cancel ( ) ;
83
- server_shutdown_token. cancel ( ) ;
84
-
85
106
tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
86
107
87
108
info ! ( "SSE Server shutdown complete." ) ;
88
109
Ok ( ( ) )
89
110
}
111
+
112
+ async fn metrics_handler ( State ( registry) : State < Arc < Registry > > ) -> impl IntoResponse {
113
+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
114
+
115
+ let mut buffer = String :: new ( ) ;
116
+ encode ( & mut buffer, & registry) . unwrap ( ) ;
117
+ Response :: builder ( )
118
+ . status ( StatusCode :: OK )
119
+ . header (
120
+ CONTENT_TYPE ,
121
+ "application/openmetrics-text; version=1.0.0; charset=utf-8" ,
122
+ )
123
+ . body ( Body :: from ( buffer) )
124
+ . unwrap ( )
125
+ }
126
+
127
+ async fn start_metrics_server ( shutdown_token : CancellationToken ) -> Result < ( ) > {
128
+ let mut registry = <Registry as Default >:: default ( ) ;
129
+ METRICS . register ( & mut registry) ;
130
+ let registry = Arc :: new ( registry) ;
131
+
132
+ let host = env:: var ( "METRICS_HOST" ) . unwrap_or_else ( |_| "0.0.0.0" . to_string ( ) ) ;
133
+ let port = env:: var ( "METRICS_PORT" ) . unwrap_or_else ( |_| "9091" . to_string ( ) ) ;
134
+ let bind_addr: SocketAddr = format ! ( "{}:{}" , host, port) . parse ( ) . map_err ( |e| {
135
+ anyhow:: anyhow!(
136
+ "Invalid METRICS BIND address format '{}:{}': {}" ,
137
+ host,
138
+ port,
139
+ e
140
+ )
141
+ } ) ?;
142
+
143
+ let app = axum:: Router :: new ( )
144
+ . route ( "/metrics" , axum:: routing:: get ( metrics_handler) )
145
+ . with_state ( registry) ;
146
+
147
+ info ! ( "Metrics server listening on {}" , bind_addr) ;
148
+ let listener = tokio:: net:: TcpListener :: bind ( bind_addr) . await ?;
149
+ axum:: serve ( listener, app)
150
+ . with_graceful_shutdown ( async move {
151
+ shutdown_token. cancelled ( ) . await ;
152
+ info ! ( "Metrics server shutting down." ) ;
153
+ } )
154
+ . await ?;
155
+
156
+ Ok ( ( ) )
157
+ }
0 commit comments