diff --git a/Cargo.lock b/Cargo.lock index 3b12d31e56db4..1eda8e84a832e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7936,9 +7936,18 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" name = "mysten-common" version = "0.1.0" dependencies = [ + "anyhow", + "fastcrypto", "futures", + "mysten-metrics", "parking_lot 0.12.1", + "prometheus", + "reqwest 0.12.5", + "snap", + "sui-tls", + "sui-types", "tokio", + "tracing", ] [[package]] @@ -12871,6 +12880,7 @@ dependencies = [ "lru 0.10.0", "maplit", "move-core-types", + "mysten-common", "mysten-metrics", "num_enum 0.6.1", "once_cell", @@ -12881,7 +12891,6 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "shared-crypto", - "snap", "sui-authority-aggregation", "sui-config", "sui-json-rpc-api", @@ -12889,7 +12898,6 @@ dependencies = [ "sui-keys", "sui-sdk 1.34.0", "sui-test-transaction-builder", - "sui-tls", "sui-types", "tap", "telemetry-subscribers", @@ -14147,7 +14155,6 @@ dependencies = [ "prometheus", "reqwest 0.12.5", "serde", - "snap", "sui-archival", "sui-config", "sui-core", diff --git a/crates/mysten-common/Cargo.toml b/crates/mysten-common/Cargo.toml index 0c8951b79adf2..22c5b718ea598 100644 --- a/crates/mysten-common/Cargo.toml +++ b/crates/mysten-common/Cargo.toml @@ -7,6 +7,15 @@ edition = "2021" publish = false [dependencies] +snap.workspace = true tokio.workspace = true futures.workspace = true parking_lot.workspace = true +reqwest.workspace = true +fastcrypto.workspace = true +mysten-metrics.workspace = true +sui-tls.workspace = true +sui-types.workspace = true +tracing.workspace = true +prometheus.workspace = true +anyhow.workspace = true diff --git a/crates/mysten-common/src/lib.rs b/crates/mysten-common/src/lib.rs index 3c4df66b8a4a0..7ed00de71fbc4 100644 --- a/crates/mysten-common/src/lib.rs +++ b/crates/mysten-common/src/lib.rs @@ -1,4 +1,5 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +pub mod metrics; pub mod sync; diff --git a/crates/mysten-common/src/metrics.rs b/crates/mysten-common/src/metrics.rs new file mode 100644 index 0000000000000..05043de3c9f90 --- /dev/null +++ b/crates/mysten-common/src/metrics.rs @@ -0,0 +1,97 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use mysten_metrics::RegistryService; +use prometheus::Encoder; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, info}; + +pub struct MetricsPushClient { + certificate: std::sync::Arc, + client: reqwest::Client, +} + +impl MetricsPushClient { + pub fn new(metrics_key: sui_types::crypto::NetworkKeyPair) -> Self { + use fastcrypto::traits::KeyPair; + let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new( + metrics_key.private(), + sui_tls::SUI_VALIDATOR_SERVER_NAME, + )); + let identity = certificate.reqwest_identity(); + let client = reqwest::Client::builder() + .identity(identity) + .build() + .unwrap(); + + Self { + certificate, + client, + } + } + + pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate { + &self.certificate + } + + pub fn client(&self) -> &reqwest::Client { + &self.client + } +} + +pub async fn push_metrics( + client: &MetricsPushClient, + url: &reqwest::Url, + registry: &RegistryService, +) -> Result<(), anyhow::Error> { + info!(push_url =% url, "pushing metrics to remote"); + + // now represents a collection timestamp for all of the metrics we send to the proxy + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let mut metric_families = registry.gather_all(); + for mf in metric_families.iter_mut() { + for m in mf.mut_metric() { + m.set_timestamp_ms(now); + } + } + + let mut buf: Vec = vec![]; + let encoder = prometheus::ProtobufEncoder::new(); + encoder.encode(&metric_families, &mut buf)?; + + let mut s = snap::raw::Encoder::new(); + let compressed = s.compress_vec(&buf).map_err(|err| { + error!("unable to snappy encode; {err}"); + err + })?; + + let response = client + .client() + .post(url.to_owned()) + .header(reqwest::header::CONTENT_ENCODING, "snappy") + .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT) + .body(compressed) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = match response.text().await { + Ok(body) => body, + Err(error) => format!("couldn't decode response body; {error}"), + }; + return Err(anyhow::anyhow!( + "metrics push failed: [{}]:{}", + status, + body + )); + } + + debug!("successfully pushed metrics to {url}"); + + Ok(()) +} diff --git a/crates/sui-bridge/Cargo.toml b/crates/sui-bridge/Cargo.toml index ff500f371c180..700d4979f9b6f 100644 --- a/crates/sui-bridge/Cargo.toml +++ b/crates/sui-bridge/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] ethers = "2.0" -snap = "1.1.0" tokio = { workspace = true, features = ["full"] } sui-types.workspace = true sui-authority-aggregation.workspace = true @@ -26,7 +25,6 @@ mysten-metrics.workspace = true sui-sdk.workspace = true sui-keys.workspace = true sui-config.workspace = true -sui-tls.workspace = true clap.workspace = true tracing.workspace = true bin-version.workspace = true @@ -46,6 +44,7 @@ rand.workspace = true lru.workspace = true shared-crypto.workspace = true backoff.workspace = true +mysten-common.workspace = true enum_dispatch.workspace = true sui-json-rpc-api.workspace = true sui-test-transaction-builder.workspace = true diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 2ec82d5723099..8a2f44aa957d7 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -2,16 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config::MetricsConfig; +use mysten_common::metrics::{push_metrics, MetricsPushClient}; use mysten_metrics::RegistryService; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, Encoder, HistogramVec, IntCounter, IntCounterVec, IntGauge, + register_int_gauge_with_registry, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use sui_types::crypto::NetworkKeyPair; -use tracing::error; const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, @@ -20,39 +20,6 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 200., 250., 300., 350., 400., ]; -pub struct MetricsPushClient { - certificate: std::sync::Arc, - client: reqwest::Client, -} - -impl MetricsPushClient { - pub fn new(metrics_key: sui_types::crypto::NetworkKeyPair) -> Self { - use fastcrypto::traits::KeyPair; - let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new( - metrics_key.private(), - sui_tls::SUI_VALIDATOR_SERVER_NAME, - )); - let identity = certificate.reqwest_identity(); - let client = reqwest::Client::builder() - .identity(identity) - .build() - .unwrap(); - - Self { - certificate, - client, - } - } - - pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate { - &self.certificate - } - - pub fn client(&self) -> &reqwest::Client { - &self.client - } -} - /// Starts a task to periodically push metrics to a configured endpoint if a metrics push endpoint /// is configured. pub fn start_metrics_push_task( @@ -80,62 +47,6 @@ pub fn start_metrics_push_task( let mut client = MetricsPushClient::new(metrics_key_pair.copy()); - // TODO (johnm) split this out into mysten-common - async fn push_metrics( - client: &MetricsPushClient, - url: &reqwest::Url, - registry: &RegistryService, - ) -> Result<(), anyhow::Error> { - // now represents a collection timestamp for all of the metrics we send to the proxy - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64; - - let mut metric_families = registry.gather_all(); - for mf in metric_families.iter_mut() { - for m in mf.mut_metric() { - m.set_timestamp_ms(now); - } - } - - let mut buf: Vec = vec![]; - let encoder = prometheus::ProtobufEncoder::new(); - encoder.encode(&metric_families, &mut buf)?; - - let mut s = snap::raw::Encoder::new(); - let compressed = s.compress_vec(&buf).map_err(|err| { - error!("unable to snappy encode; {err}"); - err - })?; - - let response = client - .client() - .post(url.to_owned()) - .header(reqwest::header::CONTENT_ENCODING, "snappy") - .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT) - .body(compressed) - .send() - .await?; - - if !response.status().is_success() { - let status = response.status(); - let body = match response.text().await { - Ok(body) => body, - Err(error) => format!("couldn't decode response body; {error}"), - }; - return Err(anyhow::anyhow!( - "metrics push failed: [{}]:{}", - status, - body - )); - } - - tracing::debug!("successfully pushed metrics to {url}"); - - Ok(()) - } - tokio::spawn(async move { tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service"); diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index 4d46c42852e5a..a9342adbb5a7c 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -27,7 +27,6 @@ tower.workspace = true reqwest.workspace = true tap.workspace = true serde.workspace = true -snap.workspace = true bin-version.workspace = true url.workspace = true humantime.workspace = true diff --git a/crates/sui-node/src/metrics.rs b/crates/sui-node/src/metrics.rs index c12acde0c2328..8cd5891d3e6f6 100644 --- a/crates/sui-node/src/metrics.rs +++ b/crates/sui-node/src/metrics.rs @@ -1,50 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use mysten_common::metrics::{push_metrics, MetricsPushClient}; use mysten_network::metrics::MetricsCallbackProvider; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, - register_int_gauge_vec_with_registry, Encoder, HistogramVec, IntCounterVec, IntGaugeVec, - Registry, PROTOBUF_FORMAT, + register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry, }; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use sui_network::tonic::Code; use mysten_metrics::RegistryService; -use tracing::error; - -pub struct MetricsPushClient { - certificate: std::sync::Arc, - client: reqwest::Client, -} - -impl MetricsPushClient { - pub fn new(network_key: sui_types::crypto::NetworkKeyPair) -> Self { - use fastcrypto::traits::KeyPair; - let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new( - network_key.private(), - sui_tls::SUI_VALIDATOR_SERVER_NAME, - )); - let identity = certificate.reqwest_identity(); - let client = reqwest::Client::builder() - .identity(identity) - .build() - .unwrap(); - - Self { - certificate, - client, - } - } - - pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate { - &self.certificate - } - - pub fn client(&self) -> &reqwest::Client { - &self.client - } -} /// Starts a task to periodically push metrics to a configured endpoint if a metrics push endpoint /// is configured. @@ -72,61 +38,6 @@ pub fn start_metrics_push_task(config: &sui_config::NodeConfig, registry: Regist let config_copy = config.clone(); let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy()); - async fn push_metrics( - client: &MetricsPushClient, - url: &reqwest::Url, - registry: &RegistryService, - ) -> Result<(), anyhow::Error> { - // now represents a collection timestamp for all of the metrics we send to the proxy - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64; - - let mut metric_families = registry.gather_all(); - for mf in metric_families.iter_mut() { - for m in mf.mut_metric() { - m.set_timestamp_ms(now); - } - } - - let mut buf: Vec = vec![]; - let encoder = prometheus::ProtobufEncoder::new(); - encoder.encode(&metric_families, &mut buf)?; - - let mut s = snap::raw::Encoder::new(); - let compressed = s.compress_vec(&buf).map_err(|err| { - error!("unable to snappy encode; {err}"); - err - })?; - - let response = client - .client() - .post(url.to_owned()) - .header(reqwest::header::CONTENT_ENCODING, "snappy") - .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT) - .body(compressed) - .send() - .await?; - - if !response.status().is_success() { - let status = response.status(); - let body = match response.text().await { - Ok(body) => body, - Err(error) => format!("couldn't decode response body; {error}"), - }; - return Err(anyhow::anyhow!( - "metrics push failed: [{}]:{}", - status, - body - )); - } - - tracing::debug!("successfully pushed metrics to {url}"); - - Ok(()) - } - tokio::spawn(async move { tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");