Skip to content

Commit 6ba707b

Browse files
authored
Quorum: Add ws and other metrics (#2809)
1 parent 98180a0 commit 6ba707b

File tree

6 files changed

+113
-17
lines changed

6 files changed

+113
-17
lines changed

apps/quorum/Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/quorum/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "quorum"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
edition = "2021"
55

66
[dependencies]
@@ -24,3 +24,4 @@ serde_json = "1.0.140"
2424
futures = "0.3.31"
2525
serde_wormhole = "0.1.0"
2626
axum-prometheus = "0.8.0"
27+
metrics = "0.24.2"

apps/quorum/src/api.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use secp256k1::{
1111
use serde::Deserialize;
1212
use serde_wormhole::RawMessage;
1313
use sha3::{Digest, Keccak256};
14-
use std::{net::SocketAddr, time::Duration};
14+
use std::{
15+
net::SocketAddr,
16+
time::{Duration, Instant},
17+
};
1518
use wormhole_sdk::{
1619
vaa::{Body, Header, Signature},
1720
GuardianAddress, GuardianSetInfo, Vaa,
@@ -139,6 +142,11 @@ async fn handle_observation(
139142
state.guardian_set.clone(),
140143
state.observation_lifetime,
141144
)?;
145+
metrics::counter!(
146+
"verified_observations_total",
147+
&[("gaurdian_index", verifier_index.to_string())]
148+
)
149+
.increment(1);
142150
let new_signature = Signature {
143151
signature: params.signature,
144152
index: verifier_index.try_into()?,
@@ -169,6 +177,7 @@ async fn handle_observation(
169177
body,
170178
)
171179
.into();
180+
metrics::counter!("new_vaa_total").increment(1);
172181
if let Err(e) = state
173182
.ws
174183
.broadcast_sender
@@ -193,9 +202,14 @@ async fn post_observation(
193202
tokio::spawn({
194203
let state = state.clone();
195204
async move {
205+
let start = Instant::now();
206+
let mut status = "success";
196207
if let Err(e) = handle_observation(state, params).await {
208+
status = "error";
197209
tracing::warn!(error = ?e, "Failed to handle observation");
198210
}
211+
metrics::histogram!("handle_observation_duration_seconds", &[("status", status)])
212+
.record(start.elapsed().as_secs_f64());
199213
}
200214
});
201215
Json(())
@@ -580,7 +594,9 @@ mod test {
580594
let update = subscriber
581595
.try_recv()
582596
.expect("Failed to receive update from subscriber");
583-
let UpdateEvent::NewVaa(vaa) = update;
597+
let UpdateEvent::NewVaa(vaa) = update else {
598+
panic!("Expected NewVaa event, got {:?}", update);
599+
};
584600
let vaa: Vaa<&RawMessage> =
585601
serde_wormhole::from_slice(&vaa).expect("Failed to deserialize VAA");
586602
// Check if the vaa signatures are sorted

apps/quorum/src/metrics_server.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::{future::Future, time::Duration};
2+
13
use axum::{routing::get, Router};
24
use axum_prometheus::{
35
metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle},
@@ -18,6 +20,27 @@ pub fn setup_metrics_recorder() -> anyhow::Result<PrometheusHandle> {
1820
.map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err))
1921
}
2022

23+
const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1);
24+
pub async fn metric_collector<F, Fut>(service_name: String, update_metrics: F)
25+
where
26+
F: Fn() -> Fut,
27+
Fut: Future<Output = ()> + Send + 'static,
28+
{
29+
let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL);
30+
loop {
31+
tokio::select! {
32+
_ = metric_interval.tick() => {
33+
update_metrics().await;
34+
}
35+
_ = wait_for_exit() => {
36+
tracing::info!("Received exit signal, stopping metric collector for {}...", service_name);
37+
break;
38+
}
39+
}
40+
}
41+
tracing::info!("Shutting down metric collector for {}...", service_name);
42+
}
43+
2144
pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> {
2245
tracing::info!("Starting Metrics Server...");
2346

apps/quorum/src/server.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use wormhole_sdk::{vaa::Signature, GuardianSetInfo};
1414

1515
use crate::{
1616
api::{self},
17-
metrics_server::{self, setup_metrics_recorder},
17+
metrics_server::{self, metric_collector, setup_metrics_recorder},
1818
pythnet::fetch_guardian_set,
1919
ws::WsState,
2020
};
@@ -181,6 +181,15 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
181181
run_options.clone(),
182182
state.clone()
183183
)),
184+
metric_collector("state".to_string(), || {
185+
let state = state.clone();
186+
async move {
187+
let verification = state.verification.read().await;
188+
metrics::gauge!("pending_vaas").set(verification.len() as f64);
189+
metrics::gauge!("pending_verified_observations")
190+
.set(verification.values().flatten().count() as f64);
191+
}
192+
}),
184193
);
185194

186195
Ok(())

apps/quorum/src/ws.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ async fn websocket_handler(state: axum::extract::State<State>, stream: WebSocket
5454
#[derive(Clone, PartialEq, Debug)]
5555
pub enum UpdateEvent {
5656
NewVaa(Vec<u8>),
57+
Ping,
5758
}
5859

5960
pub type SubscriberId = usize;
@@ -117,8 +118,7 @@ impl Subscriber {
117118
return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
118119
}
119120
self.responded_to_ping = false;
120-
self.sender.send(Message::Ping(vec![].into())).await?;
121-
Ok(())
121+
self.handle_update(UpdateEvent::Ping).await
122122
},
123123
_ = wait_for_exit() => {
124124
self.sender.close().await?;
@@ -134,13 +134,35 @@ impl Subscriber {
134134
}
135135

136136
async fn handle_update(&mut self, event: UpdateEvent) -> Result<()> {
137-
match event.clone() {
138-
UpdateEvent::NewVaa(vaa) => self.handle_new_vaa(vaa).await,
139-
}
137+
let start = std::time::Instant::now();
138+
let update_name;
139+
let result = match event.clone() {
140+
UpdateEvent::NewVaa(vaa) => {
141+
update_name = "new_vaa";
142+
self.handle_new_vaa(vaa).await
143+
}
144+
UpdateEvent::Ping => {
145+
update_name = "ping";
146+
self.sender.send(Message::Ping(vec![].into())).await?;
147+
Ok(())
148+
}
149+
};
150+
let status = match &result {
151+
Ok(_) => "success",
152+
Err(_) => "error",
153+
};
154+
let label = [("status", status), ("name", update_name)];
155+
metrics::counter!("ws_server_update_total", &label).increment(1);
156+
metrics::histogram!("ws_server_update_duration_seconds", &label,)
157+
.record(start.elapsed().as_secs_f64());
158+
result
140159
}
141160

142161
async fn handle_client_message(&mut self, message: Message) -> Result<()> {
143-
match message {
162+
let start = std::time::Instant::now();
163+
let message_type;
164+
165+
let result: anyhow::Result<()> = match message {
144166
Message::Close(_) => {
145167
// Closing the connection. We don't remove it from the subscribers
146168
// list, instead when the Subscriber struct is dropped the channel
@@ -149,15 +171,39 @@ impl Subscriber {
149171
// Send the close message to gracefully shut down the connection
150172
// Otherwise the client might get an abnormal Websocket closure
151173
// error.
174+
message_type = "close";
152175
self.sender.close().await?;
153176
self.closed = true;
154-
return Ok(());
177+
Ok(())
178+
}
179+
Message::Text(_) => {
180+
message_type = "text";
181+
Ok(())
182+
}
183+
Message::Binary(_) => {
184+
message_type = "binary";
185+
Ok(())
186+
}
187+
Message::Ping(_) => {
188+
message_type = "ping";
189+
Ok(())
190+
}
191+
Message::Pong(_) => {
192+
message_type = "pong";
193+
self.responded_to_ping = true;
194+
Ok(())
155195
}
156-
Message::Text(_) => {}
157-
Message::Binary(_) => {}
158-
Message::Ping(_) => {}
159-
Message::Pong(_) => self.responded_to_ping = true,
160196
};
161-
Ok(())
197+
198+
let status = match &result {
199+
Ok(_) => "success",
200+
Err(_) => "error",
201+
};
202+
let label = [("status", status), ("message_type", message_type)];
203+
metrics::counter!("ws_client_message_total", &label).increment(1);
204+
metrics::histogram!("ws_client_message_duration_seconds", &label,)
205+
.record(start.elapsed().as_secs_f64());
206+
207+
result
162208
}
163209
}

0 commit comments

Comments
 (0)