diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index d2f134df..9f760f19 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -32,3 +32,8 @@ jobs: continue-on-error: true env: CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + - name: Publish inspector + run: cargo publish --manifest-path inspector/Cargo.toml + continue-on-error: true + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index b93733c8..98204930 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,8 +38,9 @@ dependencies = [ [[package]] name = "alto-chain" -version = "0.0.2" +version = "0.0.3" dependencies = [ + "alto-client", "alto-types", "axum", "bytes", @@ -69,7 +70,7 @@ dependencies = [ [[package]] name = "alto-client" -version = "0.0.2" +version = "0.0.3" dependencies = [ "alto-types", "bytes", @@ -83,9 +84,27 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "alto-inspector" +version = "0.0.3" +dependencies = [ + "alto-client", + "alto-types", + "bytes", + "clap", + "commonware-cryptography", + "commonware-utils", + "futures", + "rand", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "alto-types" -version = "0.0.2" +version = "0.0.3" dependencies = [ "bytes", "commonware-cryptography", diff --git a/Cargo.toml b/Cargo.toml index 95121ec6..cb0e0f0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,14 @@ members = [ "chain", "client", + "inspector", "types", ] resolver = "2" [workspace.dependencies] -alto-client = { version = "0.0.2", path = "client" } -alto-types = { version = "0.0.2", path = "types" } +alto-client = { version = "0.0.3", path = "client" } +alto-types = { version = "0.0.3", path = "types" } commonware-consensus = { version = "0.0.40" } commonware-cryptography = { version = "0.0.40" } commonware-deployer = { version = "0.0.40" } @@ -30,6 +31,7 @@ tracing = "0.1.41" tracing-subscriber = "0.3.19" governor = "0.6.3" prometheus-client = "0.22.3" +clap = "4.5.18" [profile.bench] # Because we enable overflow checks in "release," we should benchmark with them. diff --git a/README.md b/README.md index bb945cf3..1f61185a 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ * [chain](./chain/README.md): A minimal blockchain built with the [Commonware Library](https://github.com/commonwarexyz/monorepo). * [client](./client/README.md): Client for interacting with `alto`. +* [inspector](./inspector/README.md): Monitor `alto` activity. * [types](./types/README.md): Common types used throughout `alto`. ## Licensing diff --git a/chain/Cargo.toml b/chain/Cargo.toml index e4517cb5..3542575d 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-chain" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" @@ -12,6 +12,7 @@ documentation = "https://docs.rs/alto-chain" [dependencies] alto-types = { workspace = true } +alto-client = { workspace = true } commonware-consensus = { workspace = true } commonware-cryptography = { workspace = true } commonware-deployer = { workspace = true } @@ -30,8 +31,8 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["fmt", "json"] } governor = { workspace = true } prometheus-client = { workspace = true } +clap = { workspace = true } sysinfo = "0.33.1" -clap = "4.5.18" axum = "0.8.1" uuid = "1.15.1" serde = { version = "1.0.218", features = ["derive"] } diff --git a/chain/README.md b/chain/README.md index 60f1e159..159b4fc2 100644 --- a/chain/README.md +++ b/chain/README.md @@ -22,9 +22,17 @@ cargo install commonware-deployer ### Create Deployer Artifacts ```bash -cargo run --bin setup -- --peers 10 --bootstrappers 2 --regions us-west-1,us-east-1,eu-west-1,ap-northeast-1,eu-north-1,ap-south-1,sa-east-1,eu-central-1,ap-northeast-2,ap-southeast-2 --instance-type c7g.xlarge --storage-size 10 --storage-class gp3 --worker-threads 4 --message-backlog 16384 --mailbox-size 16384 --dashboard dashboard.json --output assets +cargo run --bin setup -- generate --peers 10 --bootstrappers 2 --regions us-west-1,us-east-1,eu-west-1,ap-northeast-1,eu-north-1,ap-south-1,sa-east-1,eu-central-1,ap-northeast-2,ap-southeast-2 --instance-type c7g.xlarge --storage-size 10 --storage-class gp3 --worker-threads 4 --message-backlog 16384 --mailbox-size 16384 --dashboard dashboard.json --output assets ``` +### [Optional] Configure Indexer Upload + +```bash +cargo run --bin setup -- indexer --dir assets --url +``` + +_The indexer URL is configured separately because it is typically only known after the threshold key is generated (derived in `setup generate`)._ + ### Build Validator Binary #### Build Cross-Platform Compiler diff --git a/chain/dashboard.json b/chain/dashboard.json index 25327174..a908890b 100644 --- a/chain/dashboard.json +++ b/chain/dashboard.json @@ -312,7 +312,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "engine_syncer_latest_height{job=~\"$Validator\", region=~\"$Region\"}", + "expr": "engine_syncer_finalized_height{job=~\"$Validator\", region=~\"$Region\"}", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "{{job}} ({{instance}})", diff --git a/chain/src/actors/application/actor.rs b/chain/src/actors/application/actor.rs index 38a58b66..89094f93 100644 --- a/chain/src/actors/application/actor.rs +++ b/chain/src/actors/application/actor.rs @@ -4,7 +4,7 @@ use super::{ Config, }; use crate::actors::syncer; -use alto_types::{Block, Finalization, Notarization}; +use alto_types::{Block, Finalization, Notarization, Seed}; use commonware_consensus::threshold_simplex::Prover; use commonware_cryptography::{sha256::Digest, Hasher, Sha256}; use commonware_macros::select; @@ -215,21 +215,23 @@ impl Actor { } Message::Prepared { proof, payload } => { // Parse the proof - let (view, parent, _, signature, _) = + let (view, parent, _, signature, seed) = self.prover.deserialize_notarization(proof).unwrap(); let notarization = Notarization::new(view, parent, payload, signature.into()); + let seed = Seed::new(view, seed.into()); // Send the notarization to the syncer - syncer.notarized(notarization).await; + syncer.notarized(notarization, seed).await; } Message::Finalized { proof, payload } => { // Parse the proof - let (view, parent, _, signature, _) = + let (view, parent, _, signature, seed) = self.prover.deserialize_finalization(proof.clone()).unwrap(); let finalization = Finalization::new(view, parent, payload, signature.into()); + let seed = Seed::new(view, seed.into()); // Send the finalization to the syncer - syncer.finalized(finalization).await; + syncer.finalized(finalization, seed).await; } } } diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 5a918631..d6b5f997 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -6,11 +6,15 @@ use super::{ ingress::{Mailbox, Message}, Config, }; -use crate::actors::syncer::{ - handler, - key::{self, MultiIndex, Value}, +use crate::{ + actors::syncer::{ + handler, + key::{self, MultiIndex, Value}, + }, + Indexer, }; use alto_types::{Block, Finalization, Finalized, Notarized}; +use bytes::Bytes; use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest}; use commonware_macros::select; use commonware_p2p::{utils::requester, Receiver, Recipients, Sender}; @@ -42,7 +46,7 @@ use std::{ use tracing::{debug, info, warn}; /// Application actor. -pub struct Actor> { +pub struct Actor, I: Indexer> { context: R, public_key: PublicKey, public: bls12381::PublicKey, @@ -51,6 +55,7 @@ pub struct Actor, // Blocks verified stored by view<>digest verified: Archive, @@ -64,18 +69,18 @@ pub struct Actor, - // Indexer storage - indexer: Metadata>, + // Finalizer storage + finalizer: Metadata>, // Latest height metric - latest_height: Gauge, + finalized_height: Gauge, // Indexed height metric - indexed_height: Gauge, + contiguous_height: Gauge, } -impl> Actor { +impl, I: Indexer> Actor { /// Create a new application actor. - pub async fn init(context: R, config: Config) -> (Self, Mailbox) { + pub async fn init(context: R, config: Config) -> (Self, Mailbox) { // Initialize verified blocks let verified_journal = Journal::init( context.with_label("verified_journal"), @@ -168,28 +173,28 @@ impl> Actor> Actor> Actor> Actor(1); - self.context.with_label("indexer").spawn({ + let (mut finalizer_sender, mut finalizer_receiver) = mpsc::channel::<()>(1); + self.context.with_label("finalizer").spawn({ let mut resolver = resolver.clone(); let last_view_processed = last_view_processed.clone(); let verified = verified.clone(); @@ -287,14 +293,12 @@ impl> Actor> Actor> Actor> Actor> Actor { + Message::Notarized { proof, seed } => { + // Upload seed to indexer (if available) + if let Some(indexer) = self.indexer.as_ref() { + self.context.with_label("indexer").spawn({ + let indexer = indexer.clone(); + let view = proof.view; + move |_| async move { + let seed = seed.serialize().into(); + let result = indexer.seed_upload(seed).await; + if let Err(e) = result { + warn!(?e, "failed to upload seed"); + return; + } + debug!(view, "seed uploaded to indexer"); + } + }); + } + // Check if in buffer let mut block = None; if let Some(buffered) = buffer.get(&proof.payload) { @@ -490,11 +514,29 @@ impl> Actor> Actor { + Message::Finalized { proof, seed } => { + // Upload seed to indexer (if available) + if let Some(indexer) = self.indexer.as_ref() { + self.context.with_label("indexer").spawn({ + let indexer = indexer.clone(); + let view = proof.view; + move |_| async move { + let seed = seed.serialize().into(); + let result = indexer.seed_upload(seed).await; + if let Err(e) = result { + warn!(?e, "failed to upload seed"); + return; + } + debug!(view, "seed uploaded to indexer"); + } + }); + } + // Check if in buffer let mut block = None; if let Some(buffered) = buffer.get(&proof.payload){ @@ -554,14 +613,32 @@ impl> Actor> Actor { // Parse block @@ -802,8 +879,8 @@ impl> Actor { pub partition_prefix: String, pub public_key: PublicKey, @@ -29,4 +31,6 @@ pub struct Config { pub backfill_quota: Quota, pub activity_timeout: u64, + + pub indexer: Option, } diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index f98e0dc2..473f7c63 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -1,5 +1,5 @@ use alto_chain::Config; -use clap::{value_parser, Arg, Command}; +use clap::{value_parser, Arg, ArgMatches, Command}; use commonware_cryptography::{ bls12381::{ dkg::ops, @@ -10,98 +10,165 @@ use commonware_cryptography::{ use commonware_deployer::ec2; use commonware_utils::{hex, quorum}; use rand::{rngs::OsRng, seq::IteratorRandom}; -use tracing::info; +use std::fs; +use tracing::{error, info}; use uuid::Uuid; const BINARY_NAME: &str = "validator"; const PORT: u16 = 4545; fn main() { - // Parse arguments - let matches = Command::new("setup") - .about("Generate configuration files for an alto chain.") - .arg( - Arg::new("peers") - .long("peers") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("bootstrappers") - .long("bootstrappers") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("regions") - .long("regions") - .required(true) - .value_delimiter(',') - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("instance_type") - .long("instance-type") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("storage_size") - .long("storage-size") - .required(true) - .value_parser(value_parser!(i32)), - ) - .arg( - Arg::new("storage_class") - .long("storage-class") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("worker-threads") - .long("worker-threads") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("message-backlog") - .long("message-backlog") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("mailbox-size") - .long("mailbox-size") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("dashboard") - .long("dashboard") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("output") - .long("output") - .required(true) - .value_parser(value_parser!(String)), + // Initialize logger + tracing_subscriber::fmt().init(); + + // Define the main command with subcommands + let app = Command::new("setup") + .about("Manage configuration files for an alto chain.") + .subcommand( + Command::new("generate") + .about("Generate configuration files for an alto chain") + .arg( + Arg::new("peers") + .long("peers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("bootstrappers") + .long("bootstrappers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("regions") + .long("regions") + .required(true) + .value_delimiter(',') + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("instance_type") + .long("instance-type") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("storage_size") + .long("storage-size") + .required(true) + .value_parser(value_parser!(i32)), + ) + .arg( + Arg::new("storage_class") + .long("storage-class") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("worker_threads") + .long("worker-threads") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("message_backlog") + .long("message-backlog") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("mailbox_size") + .long("mailbox-size") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("dashboard") + .long("dashboard") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("output") + .long("output") + .required(true) + .value_parser(value_parser!(String)), + ), ) - .get_matches(); + .subcommand( + Command::new("indexer") + .about("Add indexer support for an alto chain.") + .arg( + Arg::new("dir") + .long("dir") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("url") + .long("url") + .required(true) + .value_parser(value_parser!(String)), + ), + ); - // Create logger - tracing_subscriber::fmt().init(); + // Parse arguments + let matches = app.get_matches(); + + // Handle subcommands + match matches.subcommand() { + Some(("generate", sub_matches)) => generate(sub_matches), + Some(("indexer", sub_matches)) => indexer(sub_matches), + _ => { + eprintln!("Invalid subcommand. Use 'generate' or 'indexer'."); + std::process::exit(1); + } + } +} + +fn generate(sub_matches: &ArgMatches) { + // Extract arguments + let peers = *sub_matches.get_one::("peers").unwrap(); + let bootstrappers = *sub_matches.get_one::("bootstrappers").unwrap(); + let regions = sub_matches + .get_many::("regions") + .unwrap() + .cloned() + .collect::>(); + let instance_type = sub_matches + .get_one::("instance_type") + .unwrap() + .clone(); + let storage_size = *sub_matches.get_one::("storage_size").unwrap(); + let storage_class = sub_matches + .get_one::("storage_class") + .unwrap() + .clone(); + let worker_threads = *sub_matches.get_one::("worker_threads").unwrap(); + let message_backlog = *sub_matches.get_one::("message_backlog").unwrap(); + let mailbox_size = *sub_matches.get_one::("mailbox_size").unwrap(); + let dashboard = sub_matches.get_one::("dashboard").unwrap().clone(); + let output = sub_matches.get_one::("output").unwrap().clone(); + + // Construct output path + let raw_current_dir = std::env::current_dir().unwrap(); + let current_dir = raw_current_dir.to_str().unwrap(); + let output = format!("{}/{}", current_dir, output); + + // Check if output directory exists + if fs::metadata(&output).is_ok() { + error!("output directory already exists: {}", output); + std::process::exit(1); + } // Generate UUID let tag = Uuid::new_v4().to_string(); info!(tag, "generated deployment tag"); // Generate peers - let peers = *matches.get_one::("peers").unwrap(); - let bootstrappers = *matches.get_one::("bootstrappers").unwrap(); assert!( bootstrappers <= peers, - "bootstrappers must be less than peers" + "bootstrappers must be less than or equal to peers" ); let mut peer_schemes = (0..peers) .map(|_| Ed25519::new(&mut OsRng)) @@ -124,25 +191,14 @@ fn main() { let (identity, shares) = ops::generate_shares(&mut OsRng, None, peers_u32, threshold); info!( identity = hex(&poly::public(&identity).serialize()), - "generated consensus key" + "generated network key" ); // Generate instance configurations - let regions = matches - .get_many::("regions") - .unwrap() - .cloned() - .collect::>(); assert!( regions.len() <= peers, "must be at least one peer per specified region" ); - let instance_type = matches.get_one::("instance_type").unwrap(); - let storage_size = *matches.get_one::("storage_size").unwrap(); - let storage_class = matches.get_one::("storage_class").unwrap(); - let worker_threads = *matches.get_one::("worker-threads").unwrap(); - let message_backlog = *matches.get_one::("message-backlog").unwrap(); - let mailbox_size = *matches.get_one::("mailbox-size").unwrap(); let mut instance_configs = Vec::new(); let mut peer_configs = Vec::new(); for (index, scheme) in peer_schemes.iter().enumerate() { @@ -163,6 +219,8 @@ fn main() { message_backlog, mailbox_size, + + indexer: None, }; peer_configs.push((peer_config_file.clone(), peer_config)); @@ -199,28 +257,90 @@ fn main() { }; // Write configuration files - let raw_current_dir = std::env::current_dir().unwrap(); - let current_dir = raw_current_dir.to_str().unwrap(); - let output = matches.get_one::("output").unwrap(); - let output = format!("{}/{}", current_dir, output); - assert!( - !std::path::Path::new(&output).exists(), - "output directory already exists" - ); - std::fs::create_dir_all(output.clone()).unwrap(); - let dashboard = matches.get_one::("dashboard").unwrap().clone(); - std::fs::copy( + fs::create_dir_all(&output).unwrap(); + fs::copy( format!("{}/{}", current_dir, dashboard), format!("{}/dashboard.json", output), ) .unwrap(); for (peer_config_file, peer_config) in peer_configs { let path = format!("{}/{}", output, peer_config_file); - let file = std::fs::File::create(path).unwrap(); + let file = fs::File::create(&path).unwrap(); serde_yaml::to_writer(file, &peer_config).unwrap(); } let path = format!("{}/config.yaml", output); - let file = std::fs::File::create(path.clone()).unwrap(); + let file = fs::File::create(&path).unwrap(); serde_yaml::to_writer(file, &config).unwrap(); info!(path, "wrote configuration files"); } + +fn indexer(sub_matches: &ArgMatches) { + // Extract arguments + let dir = sub_matches.get_one::("dir").unwrap().clone(); + let url = sub_matches.get_one::("url").unwrap().clone(); + + // Construct directory path + let raw_current_dir = std::env::current_dir().unwrap(); + let current_dir = raw_current_dir.to_str().unwrap(); + let dir = format!("{}/{}", current_dir, dir); + + // Check if directory exists + if fs::metadata(&dir).is_err() { + error!("directory does not exist: {}", dir); + std::process::exit(1); + } + + // Iterate over all peer configuration files and add indexer URL + for entry in fs::read_dir(&dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.ends_with(".yaml") && file_name != "config.yaml" { + let relative_path = path.strip_prefix(&dir).unwrap(); + match fs::read_to_string(&path) { + Ok(content) => match serde_yaml::from_str::(&content) { + Ok(mut config) => { + config.indexer = Some(url.clone()); + match serde_yaml::to_string(&config) { + Ok(updated_content) => { + if let Err(e) = fs::write(&path, updated_content) { + error!( + path = ?relative_path, + error = ?e, + "failed to write", + ); + } else { + info!(path = ?relative_path, "updated"); + } + } + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to serialize config", + ); + } + } + } + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to parse" + ); + } + }, + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to read", + ); + } + } + } + } + } + } +} diff --git a/chain/src/bin/validator.rs b/chain/src/bin/validator.rs index 3866aa55..030fcc75 100644 --- a/chain/src/bin/validator.rs +++ b/chain/src/bin/validator.rs @@ -1,4 +1,5 @@ use alto_chain::{engine, Config}; +use alto_client::Client; use alto_types::P2P_NAMESPACE; use axum::{routing::get, serve, Extension, Router}; use clap::{Arg, Command}; @@ -91,11 +92,12 @@ fn main() { let threshold = quorum(peers_u32).expect("unable to derive quorum"); let identity = from_hex_formatted(&config.identity).expect("Could not parse identity"); let identity = poly::Public::deserialize(&identity, threshold).expect("Identity is invalid"); + let identity_public = poly::public(&identity); let public_key = signer.public_key(); let ip = peers.get(&public_key).expect("Could not find self in IPs"); info!( ?public_key, - identity = hex(&poly::public(&identity).serialize()), + identity = hex(&identity_public.serialize()), ?ip, port = config.port, "loaded config" @@ -177,6 +179,12 @@ fn main() { // Create network let p2p = network.start(); + // Create indexer + let mut indexer = None; + if let Some(uri) = config.indexer { + indexer = Some(Client::new(&uri, identity_public.into())); + } + // Create engine let config = engine::Config { partition_prefix: "engine".to_string(), @@ -195,6 +203,7 @@ fn main() { max_fetch_size: MAX_FETCH_SIZE, fetch_concurrent: FETCH_CONCURRENT, fetch_rate_per_peer: resolver_limit, + indexer, }; let engine = engine::Engine::new(context.with_label("engine"), config).await; diff --git a/chain/src/engine.rs b/chain/src/engine.rs index e630f245..2bb8dabc 100644 --- a/chain/src/engine.rs +++ b/chain/src/engine.rs @@ -1,4 +1,7 @@ -use crate::actors::{application, syncer}; +use crate::{ + actors::{application, syncer}, + Indexer, +}; use alto_types::NAMESPACE; use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover}; use commonware_cryptography::{ @@ -17,7 +20,7 @@ use rand::{CryptoRng, Rng}; use std::time::Duration; use tracing::{error, warn}; -pub struct Config { +pub struct Config { pub partition_prefix: String, pub signer: Ed25519, pub identity: Poly, @@ -35,13 +38,19 @@ pub struct Config { pub max_fetch_size: usize, pub fetch_concurrent: usize, pub fetch_rate_per_peer: Quota, + + pub indexer: Option, } -pub struct Engine + Metrics> { +pub struct Engine< + B: Blob, + E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics, + I: Indexer, +> { context: E, application: application::Actor, - syncer: syncer::Actor, + syncer: syncer::Actor, syncer_mailbox: syncer::Mailbox, consensus: Consensus< B, @@ -55,8 +64,10 @@ pub struct Engine, } -impl + Metrics> Engine { - pub async fn new(context: E, cfg: Config) -> Self { +impl + Metrics, I: Indexer> + Engine +{ + pub async fn new(context: E, cfg: Config) -> Self { // Create the application let public = public(&cfg.identity); let (application, supervisor, application_mailbox) = application::Actor::new( @@ -81,6 +92,7 @@ impl + Metri mailbox_size: cfg.mailbox_size, backfill_quota: cfg.backfill_quota, activity_timeout: cfg.activity_timeout, + indexer: cfg.indexer, }, ) .await; diff --git a/chain/src/lib.rs b/chain/src/lib.rs index fbe28b05..dd73bb76 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -1,8 +1,63 @@ +use std::future::Future; + +use bytes::Bytes; +use commonware_cryptography::bls12381; use serde::{Deserialize, Serialize}; pub mod actors; pub mod engine; +/// Trait for interacting with an indexer. +pub trait Indexer: Clone + Send + Sync + 'static { + type Error: std::error::Error + Send + Sync + 'static; + + /// Create a new indexer with the given URI and public key. + fn new(uri: &str, public: bls12381::PublicKey) -> Self; + + /// Upload a seed to the indexer. + fn seed_upload(&self, seed: Bytes) -> impl Future> + Send; + + /// Upload a notarization to the indexer. + fn notarization_upload( + &self, + notarized: Bytes, + ) -> impl Future> + Send; + + /// Upload a finalization to the indexer. + fn finalization_upload( + &self, + finalized: Bytes, + ) -> impl Future> + Send; +} + +impl Indexer for alto_client::Client { + type Error = alto_client::Error; + fn new(uri: &str, public: bls12381::PublicKey) -> Self { + Self::new(uri, public) + } + + fn seed_upload( + &self, + seed: bytes::Bytes, + ) -> impl Future> + Send { + self.seed_upload(seed) + } + + fn notarization_upload( + &self, + notarization: bytes::Bytes, + ) -> impl Future> + Send { + self.notarization_upload(notarization) + } + + fn finalization_upload( + &self, + finalization: bytes::Bytes, + ) -> impl Future> + Send { + self.finalization_upload(finalization) + } +} + #[derive(Deserialize, Serialize)] pub struct Config { pub private_key: String, @@ -18,11 +73,15 @@ pub struct Config { pub message_backlog: usize, pub mailbox_size: usize, + + pub indexer: Option, } #[cfg(test)] mod tests { use super::*; + use alto_types::{Finalized, Notarized, Seed}; + use bls12381::primitives::poly; use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme}; use commonware_macros::test_traced; use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender}; @@ -31,17 +90,61 @@ mod tests { Clock, Metrics, Runner, Spawner, }; use commonware_utils::quorum; - use engine::Engine; + use engine::{Config, Engine}; use governor::Quota; use rand::{rngs::StdRng, Rng, SeedableRng}; - use std::time::Duration; use std::{ collections::{HashMap, HashSet}, num::NonZeroU32, sync::{Arc, Mutex}, }; + use std::{sync::atomic::AtomicBool, time::Duration}; use tracing::info; + /// MockIndexer is a simple indexer implementation for testing. + #[derive(Clone)] + struct MockIndexer { + public: bls12381::PublicKey, + + seed_seen: Arc, + notarization_seen: Arc, + finalization_seen: Arc, + } + + impl Indexer for MockIndexer { + type Error = std::io::Error; + + fn new(_: &str, public: bls12381::PublicKey) -> Self { + MockIndexer { + public, + seed_seen: Arc::new(AtomicBool::new(false)), + notarization_seen: Arc::new(AtomicBool::new(false)), + finalization_seen: Arc::new(AtomicBool::new(false)), + } + } + + async fn seed_upload(&self, seed: Bytes) -> Result<(), Self::Error> { + Seed::deserialize(Some(&self.public), &seed).unwrap(); + self.seed_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Self::Error> { + Notarized::deserialize(Some(&self.public), ¬arized).unwrap(); + self.notarization_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Self::Error> { + Finalized::deserialize(Some(&self.public), &finalized).unwrap(); + self.finalization_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + } + /// Registers all validators using the oracle. async fn register_validators( oracle: &mut Oracle, @@ -163,7 +266,7 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, identity: public.clone(), @@ -180,6 +283,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -196,7 +300,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -214,16 +318,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < required_container { - failed = true; + if value >= required_container { + success = true; break; } } } - if !failed { + if !success { break; } @@ -320,7 +424,7 @@ mod tests { // Configure engine let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), identity: public.clone(), @@ -337,6 +441,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -353,7 +458,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = true; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -371,16 +476,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < initial_container_required { - failed = true; + if value >= initial_container_required { + success = true; break; } } } - if !failed { + if success { break; } @@ -402,7 +507,7 @@ mod tests { let share = shares[0]; let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), identity: public.clone(), @@ -419,6 +524,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -433,7 +539,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -451,16 +557,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < final_container_required { - failed = true; + if value >= final_container_required { + success = true; break; } } } - if !failed { + if success { break; } @@ -534,7 +640,7 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, identity: public.clone(), @@ -551,6 +657,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -570,7 +677,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -588,16 +695,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < required_container { - failed = true; + if value >= required_container { + success = true; break; } } } - if !failed { + if success { break; } @@ -620,4 +727,139 @@ mod tests { assert!(runs > 1); info!(runs, "unclean shutdown recovery worked"); } + + #[test_traced] + fn test_indexer() { + // Create context + let n = 5; + let threshold = quorum(n).unwrap(); + let required_container = 10; + let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + simulated::Config { + max_size: 1024 * 1024, + }, + ); + + // Start network + network.start(); + + // Register participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + let mut registrations = register_validators(&mut oracle, &validators).await; + + // Link all validators + let link = Link { + latency: 10.0, + jitter: 1.0, + success_rate: 1.0, + }; + link_validators(&mut oracle, &validators, link, None).await; + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + + // Define mock indexer + let indexer = MockIndexer::new("", poly::public(&public).into()); + + // Create instances + let mut public_keys = HashSet::new(); + for (idx, scheme) in schemes.into_iter().enumerate() { + // Create scheme context + let public_key = scheme.public_key(); + public_keys.insert(public_key.clone()); + + // Configure engine + let uid = format!("validator-{}", public_key); + let config: Config = engine::Config { + partition_prefix: uid.clone(), + signer: scheme, + identity: public.clone(), + share: shares[idx], + participants: validators.clone(), + mailbox_size: 1024, + backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()), + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + nullify_retry: Duration::from_secs(10), + fetch_timeout: Duration::from_secs(1), + activity_timeout: 10, + max_fetch_count: 10, + max_fetch_size: 1024 * 512, + fetch_concurrent: 10, + fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: Some(indexer.clone()), + }; + let engine = Engine::new(context.with_label(&uid), config).await; + + // Get networking + let (voter, resolver, broadcast, backfill) = + registrations.remove(&public_key).unwrap(); + + // Start engine + engine.start(voter, resolver, broadcast, backfill); + } + + // Poll metrics + loop { + let metrics = context.encode(); + + // Iterate over all lines + let mut success = false; + for line in metrics.lines() { + // Ensure it is a metrics line + if !line.starts_with("validator-") { + continue; + } + + // Split metric and value + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + // If ends with peers_blocked, ensure it is zero + if metric.ends_with("_peers_blocked") { + let value = value.parse::().unwrap(); + assert_eq!(value, 0); + } + + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { + let value = value.parse::().unwrap(); + if value >= required_container { + success = true; + break; + } + } + } + if success { + break; + } + + // Still waiting for all validators to complete + context.sleep(Duration::from_secs(1)).await; + } + + // Check indexer uploads + assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed)); + assert!(indexer + .notarization_seen + .load(std::sync::atomic::Ordering::Relaxed)); + assert!(indexer + .finalization_seen + .load(std::sync::atomic::Ordering::Relaxed)); + }); + } } diff --git a/client/Cargo.toml b/client/Cargo.toml index 40bbb016..a1ce81c6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-client" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" diff --git a/client/src/consensus.rs b/client/src/consensus.rs index 810a25fa..f06aa9de 100644 --- a/client/src/consensus.rs +++ b/client/src/consensus.rs @@ -1,7 +1,6 @@ use crate::{Client, Error, IndexQuery, Query}; -use alto_types::{ - Block, Finalization, Finalized, Kind, Notarization, Notarized, Nullification, Seed, -}; +use alto_types::{Block, Finalized, Kind, Notarized, Seed}; +use bytes::Bytes; use futures::{channel::mpsc::unbounded, Stream, StreamExt}; use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage}; @@ -13,14 +12,6 @@ fn seed_get_path(base: String, query: &IndexQuery) -> String { format!("{}/seed/{}", base, query.serialize()) } -fn nullification_upload_path(base: String) -> String { - format!("{}/nullification", base) -} - -fn nullification_get_path(base: String, query: &IndexQuery) -> String { - format!("{}/nullification/{}", base, query.serialize()) -} - fn notarization_upload_path(base: String) -> String { format!("{}/notarization", base) } @@ -43,7 +34,7 @@ fn block_get_path(base: String, query: &Query) -> String { format!("{}/block/{}", base, query.serialize()) } -fn register_path(base: String) -> String { +fn listen_path(base: String) -> String { format!("{}/consensus/ws", base) } @@ -54,18 +45,16 @@ pub enum Payload { pub enum Message { Seed(Seed), - Nullification(Nullification), Notarization(Notarized), Finalization(Finalized), } impl Client { - pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> { - let request = seed.serialize(); - let client = reqwest::Client::new(); - let result = client + pub async fn seed_upload(&self, seed: Bytes) -> Result<(), Error> { + let result = self + .client .post(seed_upload_path(self.uri.clone())) - .body(request) + .body(seed) .send() .await .map_err(Error::Reqwest)?; @@ -77,8 +66,8 @@ impl Client { pub async fn seed_get(&self, query: IndexQuery) -> Result { // Get the seed - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(seed_get_path(self.uri.clone(), &query)) .send() .await @@ -101,58 +90,11 @@ impl Client { Ok(result) } - pub async fn nullification_upload(&self, nullification: Nullification) -> Result<(), Error> { - let request = nullification.serialize(); - let client = reqwest::Client::new(); - let result = client - .post(nullification_upload_path(self.uri.clone())) - .body(request) - .send() - .await - .map_err(Error::Reqwest)?; - if !result.status().is_success() { - return Err(Error::Failed(result.status())); - } - Ok(()) - } - - pub async fn nullification_get(&self, query: IndexQuery) -> Result { - // Get the nullification - let client = reqwest::Client::new(); - let result = client - .get(nullification_get_path(self.uri.clone(), &query)) - .send() - .await - .map_err(Error::Reqwest)?; - if !result.status().is_success() { - return Err(Error::Failed(result.status())); - } - let bytes = result.bytes().await.map_err(Error::Reqwest)?; - let result = - Nullification::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?; - - // Verify the nullification matches the query - match query { - IndexQuery::Latest => {} - IndexQuery::Index(index) => { - if result.view != index { - return Err(Error::InvalidData); - } - } - } - Ok(result) - } - - pub async fn notarization_upload( - &self, - proof: Notarization, - block: Block, - ) -> Result<(), Error> { - let request = Notarized::new(proof, block).serialize(); - let client = reqwest::Client::new(); - let result = client + pub async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Error> { + let result = self + .client .post(notarization_upload_path(self.uri.clone())) - .body(request) + .body(notarized) .send() .await .map_err(Error::Reqwest)?; @@ -164,8 +106,8 @@ impl Client { pub async fn notarization_get(&self, query: IndexQuery) -> Result { // Get the notarization - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(notarization_get_path(self.uri.clone(), &query)) .send() .await @@ -189,16 +131,11 @@ impl Client { Ok(result) } - pub async fn finalization_upload( - &self, - proof: Finalization, - block: Block, - ) -> Result<(), Error> { - let request = Finalized::new(proof, block).serialize(); - let client = reqwest::Client::new(); - let result = client + pub async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Error> { + let result = self + .client .post(finalization_upload_path(self.uri.clone())) - .body(request) + .body(finalized) .send() .await .map_err(Error::Reqwest)?; @@ -210,8 +147,8 @@ impl Client { pub async fn finalization_get(&self, query: IndexQuery) -> Result { // Get the finalization - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(finalization_get_path(self.uri.clone(), &query)) .send() .await @@ -237,8 +174,8 @@ impl Client { pub async fn block_get(&self, query: Query) -> Result { // Get the block - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(block_get_path(self.uri.clone(), &query)) .send() .await @@ -274,9 +211,9 @@ impl Client { Ok(result) } - pub async fn register(&self) -> Result>, Error> { + pub async fn listen(&self) -> Result>, Error> { // Connect to the websocket endpoint - let (stream, _) = connect_async(register_path(self.ws_uri.clone())) + let (stream, _) = connect_async(listen_path(self.ws_uri.clone())) .await .map_err(Error::from)?; let (_, read) = stream.split(); @@ -313,16 +250,7 @@ impl Client { let _ = sender.unbounded_send(Err(Error::InvalidData)); } } - Kind::Nullification => { - if let Some(nullification) = - Nullification::deserialize(Some(&public), data) - { - let _ = sender - .unbounded_send(Ok(Message::Nullification(nullification))); - } else { - let _ = sender.unbounded_send(Err(Error::InvalidData)); - } - } + Kind::Nullification => {} // Ignore nullifications Kind::Finalization => { if let Some(payload) = Finalized::deserialize(Some(&public), data) { let _ = diff --git a/client/src/lib.rs b/client/src/lib.rs index fe7e1c0a..f2068b13 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -51,10 +51,13 @@ pub enum Error { InvalidData, } +#[derive(Clone)] pub struct Client { uri: String, ws_uri: String, public: bls12381::PublicKey, + + client: reqwest::Client, } impl Client { @@ -65,6 +68,8 @@ impl Client { uri, ws_uri, public, + + client: reqwest::Client::new(), } } } diff --git a/client/src/utils.rs b/client/src/utils.rs index 8d2d6a19..6bd3c31d 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -6,8 +6,8 @@ fn healthy_path(base: String) -> String { impl Client { pub async fn health(&self) -> Result<(), Error> { - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(healthy_path(self.uri.clone())) .send() .await diff --git a/inspector/Cargo.toml b/inspector/Cargo.toml new file mode 100644 index 00000000..12508706 --- /dev/null +++ b/inspector/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "alto-inspector" +version = "0.0.3" +publish = true +edition = "2021" +license = "MIT OR Apache-2.0" +description = "Monitor alto activity." +readme = "README.md" +homepage = "https://alto.commonware.xyz" +repository = "https://github.com/commonwarexyz/alto/tree/main/inspector" +documentation = "https://docs.rs/alto-inspector" + +[dependencies] +commonware-cryptography = { workspace = true } +commonware-utils = { workspace = true } +alto-types = { workspace = true } +alto-client = { workspace = true } +bytes = { workspace = true } +rand = { workspace = true } +thiserror = { workspace = true } +clap = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tokio = { version = "1.40.0", features = ["full"] } +futures = { workspace = true } + +[[bin]] +name = "inspector" +path = "src/main.rs" diff --git a/inspector/README.md b/inspector/README.md new file mode 100644 index 00000000..5f8d870a --- /dev/null +++ b/inspector/README.md @@ -0,0 +1,82 @@ +# alto-inspector + +[![Crates.io](https://img.shields.io/crates/v/alto-inspector.svg)](https://crates.io/crates/alto-inspector) +[![Docs.rs](https://docs.rs/alto-inspector/badge.svg)](https://docs.rs/alto-inspector) + +Monitor `alto` activity. + +## Status + +`alto-inspector` is **ALPHA** software and is not yet recommended for production use. Developers should expect breaking changes and occasional instability. + +## Installation + +### Local + +```bash +cargo install --path . --force +``` + +### Crates.io + +```bash +cargo install alto-inspector +``` + +## Usage + +_Use `-v` or `--verbose` to enable verbose logging (like request latency). Use `--prepare` to initialize the connection before making the request (for accurate latency measurement)._ + +### Get the latest seed + +```bash +inspector get seed latest --indexer --identity +``` + +### Get the notarization for view 100 + +```bash +inspector get notarization 100 --indexer --identity +``` + +### Get the notarizations between views 100 to 110 + +```bash +inspector get notarization 100..110 --indexer --identity +``` + +### Get the finalization for view 50 + +```bash +inspector get finalization 50 --indexer --identity +``` + +### Get the latest finalized block + +```bash +inspector get block latest --indexer --identity +``` + +### Get the block at height 10 + +```bash +inspector get block 10 --indexer --identity +``` + +### Get the blocks between heights 10 and 20 + +```bash +inspector get block 10..20 --indexer --identity +``` + +### Get the block with a specific digest + +```bash +inspector -- get block 0x65016ff40e824e21fffe903953c07b6d604dbcf39f681c62e7b3ed57ab1d1994 --indexer --identity +``` + +### Listen for consensus events + +```bash +inspector listen --indexer --identity +``` \ No newline at end of file diff --git a/inspector/src/main.rs b/inspector/src/main.rs new file mode 100644 index 00000000..b700c302 --- /dev/null +++ b/inspector/src/main.rs @@ -0,0 +1,247 @@ +use alto_client::{ + consensus::{Message, Payload}, + Client, IndexQuery, Query, +}; +use clap::{value_parser, Arg, Command}; +use commonware_cryptography::bls12381::PublicKey; +use commonware_utils::from_hex_formatted; +use futures::StreamExt; +use tracing::{info, warn, Level}; +use utils::{ + log_block, log_finalization, log_latency, log_notarization, log_seed, parse_index_query, + parse_query, IndexQueryKind, QueryKind, +}; + +mod utils; + +#[tokio::main] +async fn main() { + let matches = Command::new("inspector") + .about("Monitor alto activity.") + .arg( + Arg::new("verbose") + .short('v') + .long("verbose") + .help("Enable debug logging") + .global(true) + .action(clap::ArgAction::SetTrue), + ) + .subcommand( + Command::new("listen") + .about("Listen for consensus messages") + .arg( + Arg::new("indexer") + .long("indexer") + .required(true) + .value_parser(value_parser!(String)) + .help("URL of the indexer to connect to"), + ) + .arg( + Arg::new("identity") + .long("identity") + .required(true) + .value_parser(value_parser!(String)) + .help("Hex-encoded public key of the identity"), + ), + ) + .subcommand( + Command::new("get") + .about("Get specific consensus data") + .arg( + Arg::new("type") + .required(true) + .value_parser(["seed", "notarization", "finalization", "block"]) + .help("Type of data to retrieve"), + ) + .arg( + Arg::new("query") + .required(true) + .value_parser(value_parser!(String)) + .help("Query parameter (e.g., 'latest', number, range like '23..45', or hex digest for block)"), + ) + .arg( + Arg::new("indexer") + .long("indexer") + .required(true) + .value_parser(value_parser!(String)) + .help("URL of the indexer to connect to"), + ) + .arg( + Arg::new("identity") + .long("identity") + .required(true) + .value_parser(value_parser!(String)) + .help("Hex-encoded public key of the identity"), + ) + .arg( + Arg::new("prepare") + .long("prepare") + .help("Prepare the connection for some request to get a more accurate latency observation") + .required(false) + .action(clap::ArgAction::SetTrue), + ), + ) + .get_matches(); + + let log_level = if matches.get_flag("verbose") { + Level::DEBUG + } else { + Level::INFO + }; + tracing_subscriber::fmt().with_max_level(log_level).init(); + + if let Some(matches) = matches.subcommand_matches("listen") { + let indexer = matches.get_one::("indexer").unwrap(); + let identity = matches.get_one::("identity").unwrap(); + let identity = from_hex_formatted(identity).expect("Failed to decode identity"); + let identity = PublicKey::try_from(identity).expect("Invalid identity"); + let client = Client::new(indexer, identity); + + let mut stream = client.listen().await.expect("Failed to connect to indexer"); + info!("listening for consensus messages..."); + while let Some(message) = stream.next().await { + let message = message.expect("Failed to receive message"); + match message { + Message::Seed(seed) => log_seed(seed), + Message::Notarization(notarized) => log_notarization(notarized), + Message::Finalization(finalized) => log_finalization(finalized), + } + } + } else if let Some(matches) = matches.subcommand_matches("get") { + let type_ = matches.get_one::("type").unwrap(); + let query_str = matches.get_one::("query").unwrap(); + let indexer = matches.get_one::("indexer").unwrap(); + let identity = matches.get_one::("identity").unwrap(); + let identity = from_hex_formatted(identity).expect("Failed to decode identity"); + let identity = PublicKey::try_from(identity).expect("Invalid identity"); + let client = Client::new(indexer, identity); + let prepare_flag = matches.get_flag("prepare"); + + if prepare_flag { + client.health().await.expect("Failed to prepare connection"); + info!("connection prepared"); + } + + match type_.as_str() { + "seed" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let seed = client.seed_get(query).await.expect("Failed to get seed"); + log_latency(start); + log_seed(seed); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.seed_get(query).await { + Ok(seed) => { + log_latency(start); + log_seed(seed); + } + Err(e) => { + warn!(view, error=?e, "failed to get seed"); + } + } + } + } + } + } + "notarization" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let notarized = client + .notarization_get(query) + .await + .expect("Failed to get notarization"); + log_latency(start); + log_notarization(notarized); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.notarization_get(query).await { + Ok(notarized) => { + log_latency(start); + log_notarization(notarized); + } + Err(e) => { + warn!(view, error=?e, "failed to get notarization"); + } + } + } + } + } + } + "finalization" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let finalized = client + .finalization_get(query) + .await + .expect("Failed to get finalization"); + log_latency(start); + log_finalization(finalized); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.finalization_get(query).await { + Ok(finalized) => { + log_latency(start); + log_finalization(finalized); + } + Err(e) => { + warn!(view, error=?e, "failed to get finalization"); + } + } + } + } + } + } + "block" => { + let query_kind = parse_query(query_str).expect("Invalid query"); + match query_kind { + QueryKind::Single(query) => { + let start = std::time::Instant::now(); + let payload = client.block_get(query).await.expect("Failed to get block"); + log_latency(start); + match payload { + Payload::Finalized(finalized) => log_finalization(*finalized), + Payload::Block(block) => log_block(block), + } + } + QueryKind::Range(start_height, end_height) => { + for height in start_height..end_height { + let start = std::time::Instant::now(); + let query = Query::Index(height); + match client.block_get(query).await { + Ok(payload) => { + log_latency(start); + match payload { + Payload::Finalized(finalized) => { + log_finalization(*finalized) + } + Payload::Block(block) => log_block(block), + } + } + Err(e) => { + warn!(height, error=?e, "failed to get block"); + } + } + } + } + } + } + _ => unreachable!(), + } + } +} diff --git a/inspector/src/utils.rs b/inspector/src/utils.rs new file mode 100644 index 00000000..7b4b0009 --- /dev/null +++ b/inspector/src/utils.rs @@ -0,0 +1,135 @@ +use alto_client::{IndexQuery, Query}; +use alto_types::{Finalized, Notarized, Seed}; +use commonware_cryptography::sha256::Digest; +use commonware_utils::{SizedSerialize, SystemTimeExt}; +use std::time; +use tracing::{debug, info}; + +// Define enums for query kinds +pub enum IndexQueryKind { + Single(IndexQuery), + Range(u64, u64), +} + +pub enum QueryKind { + Single(Query), + Range(u64, u64), +} + +// Parse IndexQuery for seed, notarization, and finalization +pub fn parse_index_query(query: &str) -> Option { + if query == "latest" { + Some(IndexQueryKind::Single(IndexQuery::Latest)) + } else if let Some((start, end)) = parse_range(query) { + Some(IndexQueryKind::Range(start, end)) + } else if let Ok(index) = query.parse::() { + Some(IndexQueryKind::Single(IndexQuery::Index(index))) + } else { + None + } +} + +// Parse Query for block +pub fn parse_query(query: &str) -> Option { + if query == "latest" { + Some(QueryKind::Single(Query::Latest)) + } else if let Some((start, end)) = parse_range(query) { + Some(QueryKind::Range(start, end)) + } else if let Ok(index) = query.parse::() { + Some(QueryKind::Single(Query::Index(index))) + } else { + let bytes = commonware_utils::from_hex(query)?; + let digest: [u8; Digest::SERIALIZED_LEN] = bytes.try_into().ok()?; + Some(QueryKind::Single(Query::Digest(digest.into()))) + } +} + +// Helper function to parse range queries +fn parse_range(query: &str) -> Option<(u64, u64)> { + let parts: Vec<&str> = query.split("..").collect(); + if parts.len() == 2 { + let start = parts[0].parse::().ok()?; + let end = parts[1].parse::().ok()?; + if start <= end { + Some((start, end)) + } else { + None + } + } else { + None + } +} + +// Existing logging functions remain unchanged +const MS_PER_SECOND: u64 = 1000; +const MS_PER_HOUR: u64 = 3_600_000; +const MS_PER_DAY: u64 = 86_400_000; + +pub fn format_age(age: u64) -> String { + if age < MS_PER_SECOND { + format!("{}ms", age) + } else if age < MS_PER_HOUR { + let seconds = age as f64 / MS_PER_SECOND as f64; + format!("{:.1}s", seconds) + } else if age < MS_PER_DAY { + let hours = age as f64 / MS_PER_HOUR as f64; + format!("{:.1}h", hours) + } else { + let days = age / MS_PER_DAY; + let remaining_ms = age % MS_PER_DAY; + let hours = remaining_ms / MS_PER_HOUR; + format!("{}d {}h", days, hours) + } +} + +pub fn log_seed(seed: Seed) { + info!(view = seed.view, signature = ?seed.signature, "seed"); +} + +pub fn log_notarization(notarized: Notarized) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(notarized.block.timestamp); + let age_str = format_age(age_ms); + info!( + view = notarized.proof.view, + height = notarized.block.height, + timestamp = notarized.block.timestamp, + age = %age_str, + digest = ?notarized.block.digest(), + "notarized" + ); +} + +pub fn log_finalization(finalized: Finalized) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(finalized.block.timestamp); + let age_str = format_age(age_ms); + info!( + view = finalized.proof.view, + height = finalized.block.height, + timestamp = finalized.block.timestamp, + age = %age_str, + digest = ?finalized.block.digest(), + "finalized" + ); +} + +pub fn log_block(block: alto_types::Block) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(block.timestamp); + let age_str = format_age(age_ms); + info!( + height = block.height, + timestamp = block.timestamp, + age = %age_str, + digest = ?block.digest(), + "block" + ); +} + +pub fn log_latency(start: time::Instant) { + let elapsed = start.elapsed(); + let elapsed_ms = elapsed.as_millis(); + let elapsed_str = format_age(elapsed_ms as u64); + debug!(elapsed = %elapsed_str, "latency"); +} diff --git a/scripts/bump_versions.sh b/scripts/bump_versions.sh new file mode 100755 index 00000000..ba3f4719 --- /dev/null +++ b/scripts/bump_versions.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +# +# Increment patch versions for crates/packages whose name starts with "alto-" +# in both package declarations and [workspace.dependencies]. + +set -euo pipefail + +# Function: bump the patch number in e.g., 0.0.14 -> 0.0.15 +bump_version() { + local old="$1" + local major minor patch + IFS='.' read -r major minor patch <<< "${old}" + patch=$((patch + 1)) + echo "${major}.${minor}.${patch}" +} + +# Recursively find all Cargo.toml files +find . -name "Cargo.toml" | while read -r cargo_file; do + # We'll store updated file content in an array + content=() + changed=false + + # Read the file line by line + name="" + while IFS= read -r line; do + # 1) Match workspace deps like: alto-foo = { version = "0.0.3", path = "foo" } + if [[ "${line}" =~ ^[[:space:]]*(alto-[^[:space:]]+)[[:space:]]*=\ {[[:space:]]*version[[:space:]]*=[[:space:]]*\"([0-9]+\.[0-9]+\.[0-9]+)\" ]]; then + old="${BASH_REMATCH[2]}" + new="$(bump_version "${old}")" + line="${line/${old}/${new}}" + changed=true + fi + + # 2) Check for package name lines like: name = "alto-foo" + if [[ "${line}" =~ ^[[:space:]]*name[[:space:]]*=[[:space:]]*\"(alto-[^\"]+)\" ]]; then + name="${BASH_REMATCH[1]}" + else + # 3) If name is set, we may be on a version line + if [[ -n "${name}" && "${line}" =~ ^[[:space:]]*version[[:space:]]*=[[:space:]]*\"([0-9]+\.[0-9]+\.[0-9]+)\" ]]; then + old="${BASH_REMATCH[1]}" + new="$(bump_version "${old}")" + line="${line/${old}/${new}}" + changed=true + name="" + fi + fi + + content+=("${line}") + done < "${cargo_file}" + + # If we changed anything, overwrite the file + if ${changed}; then + for line in "${content[@]}"; do + printf "%s\n" "${line}" + done > "${cargo_file}" + echo "Updated ${cargo_file}" + fi +done \ No newline at end of file diff --git a/types/Cargo.toml b/types/Cargo.toml index 0bfae034..d93ea756 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-types" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0"