From 8d390dbbbfa34643bd582fcef47c4f1e89121ac2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:17:27 -0700 Subject: [PATCH 01/19] add client import --- Cargo.lock | 1 + chain/Cargo.toml | 1 + chain/src/bin/setup.rs | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index b93733c8..5034395b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,7 @@ dependencies = [ name = "alto-chain" version = "0.0.2" dependencies = [ + "alto-client", "alto-types", "axum", "bytes", diff --git a/chain/Cargo.toml b/chain/Cargo.toml index e4517cb5..f0489d99 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -12,6 +12,7 @@ documentation = "https://docs.rs/alto-chain" [dependencies] alto-types = { workspace = true } +alto-client = { workspace = true } commonware-consensus = { workspace = true } commonware-cryptography = { workspace = true } commonware-deployer = { workspace = true } diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index f98e0dc2..f5d5134b 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -124,7 +124,7 @@ fn main() { let (identity, shares) = ops::generate_shares(&mut OsRng, None, peers_u32, threshold); info!( identity = hex(&poly::public(&identity).serialize()), - "generated consensus key" + "generated network key" ); // Generate instance configurations From c44018a45d4d81e0b8b78fc5e3b64899011acde0 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:20:52 -0700 Subject: [PATCH 02/19] add indexer URL to readme --- chain/README.md | 6 ++++++ chain/src/bin/setup.rs | 9 +++++++++ chain/src/lib.rs | 2 ++ 3 files changed, 17 insertions(+) diff --git a/chain/README.md b/chain/README.md index 60f1e159..d7aa081b 100644 --- a/chain/README.md +++ b/chain/README.md @@ -25,6 +25,12 @@ cargo install commonware-deployer cargo run --bin setup -- --peers 10 --bootstrappers 2 --regions us-west-1,us-east-1,eu-west-1,ap-northeast-1,eu-north-1,ap-south-1,sa-east-1,eu-central-1,ap-northeast-2,ap-southeast-2 --instance-type c7g.xlarge --storage-size 10 --storage-class gp3 --worker-threads 4 --message-backlog 16384 --mailbox-size 16384 --dashboard dashboard.json --output assets ``` +#### [Optional] Upload Consensus Artifacts to Indexer + +```bash +--indexer +``` + ### Build Validator Binary #### Build Cross-Platform Compiler diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index f5d5134b..08753201 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -87,6 +87,12 @@ fn main() { .required(true) .value_parser(value_parser!(String)), ) + .arg( + Arg::new("indexer") + .long("indexer") + .required(false) + .value_parser(value_parser!(String)), + ) .get_matches(); // Create logger @@ -143,6 +149,7 @@ fn main() { let worker_threads = *matches.get_one::("worker-threads").unwrap(); let message_backlog = *matches.get_one::("message-backlog").unwrap(); let mailbox_size = *matches.get_one::("mailbox-size").unwrap(); + let indexer = matches.get_one::("indexer").cloned(); let mut instance_configs = Vec::new(); let mut peer_configs = Vec::new(); for (index, scheme) in peer_schemes.iter().enumerate() { @@ -163,6 +170,8 @@ fn main() { message_backlog, mailbox_size, + + indexer: indexer.clone(), }; peer_configs.push((peer_config_file.clone(), peer_config)); diff --git a/chain/src/lib.rs b/chain/src/lib.rs index fbe28b05..e2903a5d 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -18,6 +18,8 @@ pub struct Config { pub message_backlog: usize, pub mailbox_size: usize, + + pub indexer: Option, } #[cfg(test)] From 6e08640fa255b3d2b53103733cb27a73e155d679 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:26:09 -0700 Subject: [PATCH 03/19] push through indexer --- chain/src/bin/validator.rs | 1 + chain/src/engine.rs | 2 ++ chain/src/lib.rs | 4 ++++ 3 files changed, 7 insertions(+) diff --git a/chain/src/bin/validator.rs b/chain/src/bin/validator.rs index 3866aa55..677afcf8 100644 --- a/chain/src/bin/validator.rs +++ b/chain/src/bin/validator.rs @@ -195,6 +195,7 @@ fn main() { max_fetch_size: MAX_FETCH_SIZE, fetch_concurrent: FETCH_CONCURRENT, fetch_rate_per_peer: resolver_limit, + indexer: config.indexer, }; let engine = engine::Engine::new(context.with_label("engine"), config).await; diff --git a/chain/src/engine.rs b/chain/src/engine.rs index e630f245..8d84d7fb 100644 --- a/chain/src/engine.rs +++ b/chain/src/engine.rs @@ -35,6 +35,8 @@ pub struct Config { pub max_fetch_size: usize, pub fetch_concurrent: usize, pub fetch_rate_per_peer: Quota, + + pub indexer: Option, } pub struct Engine + Metrics> { diff --git a/chain/src/lib.rs b/chain/src/lib.rs index e2903a5d..3a38bb1a 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -182,6 +182,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -339,6 +340,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -421,6 +423,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -553,6 +556,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, }; let engine = Engine::new(context.with_label(&uid), config).await; From 435a0b757c985d2ef71b3466ae6e0b4ed1f62d9f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:33:06 -0700 Subject: [PATCH 04/19] push into syncer (need blocks) --- chain/src/actors/syncer/actor.rs | 12 ++++++++++-- chain/src/actors/syncer/mod.rs | 2 ++ chain/src/engine.rs | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 5a918631..9aeb6573 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -10,6 +10,7 @@ use crate::actors::syncer::{ handler, key::{self, MultiIndex, Value}, }; +use alto_client::Client; use alto_types::{Block, Finalization, Finalized, Notarized}; use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest}; use commonware_macros::select; @@ -51,6 +52,7 @@ pub struct Actor, // Blocks verified stored by view<>digest verified: Archive, @@ -194,6 +196,13 @@ impl> Actor> Actor> Actor, } diff --git a/chain/src/engine.rs b/chain/src/engine.rs index 8d84d7fb..89fa0437 100644 --- a/chain/src/engine.rs +++ b/chain/src/engine.rs @@ -83,6 +83,7 @@ impl + Metri mailbox_size: cfg.mailbox_size, backfill_quota: cfg.backfill_quota, activity_timeout: cfg.activity_timeout, + indexer: cfg.indexer, }, ) .await; From 714b8cd0cd4ed21deb4ae7db70c2f88ecbc16f5c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:41:59 -0700 Subject: [PATCH 05/19] spike on indexer --- chain/src/actors/syncer/actor.rs | 30 +++++++++++++++++++++++++++++- client/src/lib.rs | 1 + 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 9aeb6573..c65b8c9b 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -497,12 +497,26 @@ impl> Actor> Actor Date: Sun, 9 Mar 2025 18:43:10 -0700 Subject: [PATCH 06/19] add logs --- chain/src/actors/syncer/actor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index c65b8c9b..8a1ac583 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -515,6 +515,7 @@ impl> Actor> Actor Date: Sun, 9 Mar 2025 18:46:50 -0700 Subject: [PATCH 07/19] reuse reqwest client --- client/src/consensus.rs | 36 ++++++++++++++++++------------------ client/src/lib.rs | 4 ++++ 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/client/src/consensus.rs b/client/src/consensus.rs index 810a25fa..9b36cb84 100644 --- a/client/src/consensus.rs +++ b/client/src/consensus.rs @@ -62,8 +62,8 @@ pub enum Message { impl Client { pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> { let request = seed.serialize(); - let client = reqwest::Client::new(); - let result = client + let result = self + .client .post(seed_upload_path(self.uri.clone())) .body(request) .send() @@ -77,8 +77,8 @@ impl Client { pub async fn seed_get(&self, query: IndexQuery) -> Result { // Get the seed - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(seed_get_path(self.uri.clone(), &query)) .send() .await @@ -103,8 +103,8 @@ impl Client { pub async fn nullification_upload(&self, nullification: Nullification) -> Result<(), Error> { let request = nullification.serialize(); - let client = reqwest::Client::new(); - let result = client + let result = self + .client .post(nullification_upload_path(self.uri.clone())) .body(request) .send() @@ -118,8 +118,8 @@ impl Client { pub async fn nullification_get(&self, query: IndexQuery) -> Result { // Get the nullification - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(nullification_get_path(self.uri.clone(), &query)) .send() .await @@ -149,8 +149,8 @@ impl Client { block: Block, ) -> Result<(), Error> { let request = Notarized::new(proof, block).serialize(); - let client = reqwest::Client::new(); - let result = client + let result = self + .client .post(notarization_upload_path(self.uri.clone())) .body(request) .send() @@ -164,8 +164,8 @@ impl Client { pub async fn notarization_get(&self, query: IndexQuery) -> Result { // Get the notarization - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(notarization_get_path(self.uri.clone(), &query)) .send() .await @@ -195,8 +195,8 @@ impl Client { block: Block, ) -> Result<(), Error> { let request = Finalized::new(proof, block).serialize(); - let client = reqwest::Client::new(); - let result = client + let result = self + .client .post(finalization_upload_path(self.uri.clone())) .body(request) .send() @@ -210,8 +210,8 @@ impl Client { pub async fn finalization_get(&self, query: IndexQuery) -> Result { // Get the finalization - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(finalization_get_path(self.uri.clone(), &query)) .send() .await @@ -237,8 +237,8 @@ impl Client { pub async fn block_get(&self, query: Query) -> Result { // Get the block - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(block_get_path(self.uri.clone(), &query)) .send() .await diff --git a/client/src/lib.rs b/client/src/lib.rs index 7fc15424..f2068b13 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -56,6 +56,8 @@ pub struct Client { uri: String, ws_uri: String, public: bls12381::PublicKey, + + client: reqwest::Client, } impl Client { @@ -66,6 +68,8 @@ impl Client { uri, ws_uri, public, + + client: reqwest::Client::new(), } } } From cfd99986800a70f8e10596a78c1740774db63c1d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:53:18 -0700 Subject: [PATCH 08/19] perform seed upload --- chain/src/actors/application/actor.rs | 12 +++++----- chain/src/actors/syncer/actor.rs | 32 +++++++++++++++++++++++++-- chain/src/actors/syncer/ingress.rs | 12 +++++----- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/chain/src/actors/application/actor.rs b/chain/src/actors/application/actor.rs index 38a58b66..89094f93 100644 --- a/chain/src/actors/application/actor.rs +++ b/chain/src/actors/application/actor.rs @@ -4,7 +4,7 @@ use super::{ Config, }; use crate::actors::syncer; -use alto_types::{Block, Finalization, Notarization}; +use alto_types::{Block, Finalization, Notarization, Seed}; use commonware_consensus::threshold_simplex::Prover; use commonware_cryptography::{sha256::Digest, Hasher, Sha256}; use commonware_macros::select; @@ -215,21 +215,23 @@ impl Actor { } Message::Prepared { proof, payload } => { // Parse the proof - let (view, parent, _, signature, _) = + let (view, parent, _, signature, seed) = self.prover.deserialize_notarization(proof).unwrap(); let notarization = Notarization::new(view, parent, payload, signature.into()); + let seed = Seed::new(view, seed.into()); // Send the notarization to the syncer - syncer.notarized(notarization).await; + syncer.notarized(notarization, seed).await; } Message::Finalized { proof, payload } => { // Parse the proof - let (view, parent, _, signature, _) = + let (view, parent, _, signature, seed) = self.prover.deserialize_finalization(proof.clone()).unwrap(); let finalization = Finalization::new(view, parent, payload, signature.into()); + let seed = Seed::new(view, seed.into()); // Send the finalization to the syncer - syncer.finalized(finalization).await; + syncer.finalized(finalization, seed).await; } } } diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 8a1ac583..6abb16b0 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -478,7 +478,21 @@ impl> Actor { + Message::Notarized { proof, seed } => { + // Upload seed to indexer (if available) + if let Some(client) = self.client.as_ref() { + let client = client.clone(); + let view = proof.view; + self.context.with_label("indexer").spawn( + move |_| async move { + let result = client.seed_upload(seed).await; + if let Err(e) = result { + warn!(?e, "failed to upload seed"); + } + debug!(view, "seed uploaded to indexer"); + }); + } + // Check if in buffer let mut block = None; if let Some(buffered) = buffer.get(&proof.payload) { @@ -529,7 +543,21 @@ impl> Actor { + Message::Finalized { proof, seed } => { + // Upload seed to indexer (if available) + if let Some(client) = self.client.as_ref() { + let client = client.clone(); + let view = proof.view; + self.context.with_label("indexer").spawn( + move |_| async move { + let result = client.seed_upload(seed).await; + if let Err(e) = result { + warn!(?e, "failed to upload seed"); + } + debug!(view, "seed uploaded to indexer"); + }); + } + // Check if in buffer let mut block = None; if let Some(buffered) = buffer.get(&proof.payload){ diff --git a/chain/src/actors/syncer/ingress.rs b/chain/src/actors/syncer/ingress.rs index 140401cc..7cc0cc24 100644 --- a/chain/src/actors/syncer/ingress.rs +++ b/chain/src/actors/syncer/ingress.rs @@ -1,4 +1,4 @@ -use alto_types::{Block, Finalization, Notarization}; +use alto_types::{Block, Finalization, Notarization, Seed}; use commonware_cryptography::sha256::Digest; use futures::{ channel::{mpsc, oneshot}, @@ -21,9 +21,11 @@ pub enum Message { }, Notarized { proof: Notarization, + seed: Seed, }, Finalized { proof: Finalization, + seed: Seed, }, } @@ -68,16 +70,16 @@ impl Mailbox { .expect("Failed to send lock"); } - pub async fn notarized(&mut self, proof: Notarization) { + pub async fn notarized(&mut self, proof: Notarization, seed: Seed) { self.sender - .send(Message::Notarized { proof }) + .send(Message::Notarized { proof, seed }) .await .expect("Failed to send lock"); } - pub async fn finalized(&mut self, proof: Finalization) { + pub async fn finalized(&mut self, proof: Finalization, seed: Seed) { self.sender - .send(Message::Finalized { proof }) + .send(Message::Finalized { proof, seed }) .await .expect("Failed to send lock"); } From 19b40acf98457205299edf8c94110f8f586109b3 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 18:56:51 -0700 Subject: [PATCH 09/19] remove nullification --- client/src/consensus.rs | 66 ++--------------------------------------- 1 file changed, 2 insertions(+), 64 deletions(-) diff --git a/client/src/consensus.rs b/client/src/consensus.rs index 9b36cb84..ab4666ba 100644 --- a/client/src/consensus.rs +++ b/client/src/consensus.rs @@ -1,7 +1,5 @@ use crate::{Client, Error, IndexQuery, Query}; -use alto_types::{ - Block, Finalization, Finalized, Kind, Notarization, Notarized, Nullification, Seed, -}; +use alto_types::{Block, Finalization, Finalized, Kind, Notarization, Notarized, Seed}; use futures::{channel::mpsc::unbounded, Stream, StreamExt}; use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage}; @@ -13,14 +11,6 @@ fn seed_get_path(base: String, query: &IndexQuery) -> String { format!("{}/seed/{}", base, query.serialize()) } -fn nullification_upload_path(base: String) -> String { - format!("{}/nullification", base) -} - -fn nullification_get_path(base: String, query: &IndexQuery) -> String { - format!("{}/nullification/{}", base, query.serialize()) -} - fn notarization_upload_path(base: String) -> String { format!("{}/notarization", base) } @@ -54,7 +44,6 @@ pub enum Payload { pub enum Message { Seed(Seed), - Nullification(Nullification), Notarization(Notarized), Finalization(Finalized), } @@ -101,48 +90,6 @@ impl Client { Ok(result) } - pub async fn nullification_upload(&self, nullification: Nullification) -> Result<(), Error> { - let request = nullification.serialize(); - let result = self - .client - .post(nullification_upload_path(self.uri.clone())) - .body(request) - .send() - .await - .map_err(Error::Reqwest)?; - if !result.status().is_success() { - return Err(Error::Failed(result.status())); - } - Ok(()) - } - - pub async fn nullification_get(&self, query: IndexQuery) -> Result { - // Get the nullification - let result = self - .client - .get(nullification_get_path(self.uri.clone(), &query)) - .send() - .await - .map_err(Error::Reqwest)?; - if !result.status().is_success() { - return Err(Error::Failed(result.status())); - } - let bytes = result.bytes().await.map_err(Error::Reqwest)?; - let result = - Nullification::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?; - - // Verify the nullification matches the query - match query { - IndexQuery::Latest => {} - IndexQuery::Index(index) => { - if result.view != index { - return Err(Error::InvalidData); - } - } - } - Ok(result) - } - pub async fn notarization_upload( &self, proof: Notarization, @@ -313,16 +260,7 @@ impl Client { let _ = sender.unbounded_send(Err(Error::InvalidData)); } } - Kind::Nullification => { - if let Some(nullification) = - Nullification::deserialize(Some(&public), data) - { - let _ = sender - .unbounded_send(Ok(Message::Nullification(nullification))); - } else { - let _ = sender.unbounded_send(Err(Error::InvalidData)); - } - } + Kind::Nullification => {} // Ignore nullifications Kind::Finalization => { if let Some(payload) = Finalized::deserialize(Some(&public), data) { let _ = From 7f7511e5ba7361851de87a45d99ec5a8e2f3b9c2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 19:12:58 -0700 Subject: [PATCH 10/19] add minimal listener --- .github/workflows/publish.yml | 5 +++ Cargo.lock | 18 +++++++++ Cargo.toml | 2 + README.md | 1 + chain/Cargo.toml | 2 +- inspector/Cargo.toml | 25 +++++++++++++ inspector/README.md | 10 +++++ inspector/src/main.rs | 69 +++++++++++++++++++++++++++++++++++ 8 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 inspector/Cargo.toml create mode 100644 inspector/README.md create mode 100644 inspector/src/main.rs diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index d2f134df..9f760f19 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -32,3 +32,8 @@ jobs: continue-on-error: true env: CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + - name: Publish inspector + run: cargo publish --manifest-path inspector/Cargo.toml + continue-on-error: true + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index 5034395b..868eb25a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,24 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "alto-inspector" +version = "0.0.2" +dependencies = [ + "alto-client", + "alto-types", + "bytes", + "clap", + "commonware-cryptography", + "commonware-utils", + "futures", + "rand", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "alto-types" version = "0.0.2" diff --git a/Cargo.toml b/Cargo.toml index 95121ec6..2dcc16a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "chain", "client", + "inspector", "types", ] resolver = "2" @@ -30,6 +31,7 @@ tracing = "0.1.41" tracing-subscriber = "0.3.19" governor = "0.6.3" prometheus-client = "0.22.3" +clap = "4.5.18" [profile.bench] # Because we enable overflow checks in "release," we should benchmark with them. diff --git a/README.md b/README.md index bb945cf3..1f61185a 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ * [chain](./chain/README.md): A minimal blockchain built with the [Commonware Library](https://github.com/commonwarexyz/monorepo). * [client](./client/README.md): Client for interacting with `alto`. +* [inspector](./inspector/README.md): Monitor `alto` activity. * [types](./types/README.md): Common types used throughout `alto`. ## Licensing diff --git a/chain/Cargo.toml b/chain/Cargo.toml index f0489d99..a7b0ba35 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -31,8 +31,8 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["fmt", "json"] } governor = { workspace = true } prometheus-client = { workspace = true } +clap = { workspace = true } sysinfo = "0.33.1" -clap = "4.5.18" axum = "0.8.1" uuid = "1.15.1" serde = { version = "1.0.218", features = ["derive"] } diff --git a/inspector/Cargo.toml b/inspector/Cargo.toml new file mode 100644 index 00000000..026c4d12 --- /dev/null +++ b/inspector/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "alto-inspector" +version = "0.0.2" +publish = true +edition = "2021" +license = "MIT OR Apache-2.0" +description = "Monitor alto activity." +readme = "README.md" +homepage = "https://alto.commonware.xyz" +repository = "https://github.com/commonwarexyz/alto/tree/main/inspector" +documentation = "https://docs.rs/alto-inspector" + +[dependencies] +commonware-cryptography = { workspace = true } +commonware-utils = { workspace = true } +alto-types = { workspace = true } +alto-client = { workspace = true } +bytes = { workspace = true } +rand = { workspace = true } +thiserror = { workspace = true } +clap = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tokio = { version = "1.40.0", features = ["full"] } +futures = { workspace = true } diff --git a/inspector/README.md b/inspector/README.md new file mode 100644 index 00000000..1c77675f --- /dev/null +++ b/inspector/README.md @@ -0,0 +1,10 @@ +# alto-inspector + +[![Crates.io](https://img.shields.io/crates/v/alto-inspector.svg)](https://crates.io/crates/alto-inspector) +[![Docs.rs](https://docs.rs/alto-inspector/badge.svg)](https://docs.rs/alto-inspector) + +Monitor `alto` activity. + +## Status + +`alto-inspector` is **ALPHA** software and is not yet recommended for production use. Developers should expect breaking changes and occasional instability. \ No newline at end of file diff --git a/inspector/src/main.rs b/inspector/src/main.rs new file mode 100644 index 00000000..679ea56c --- /dev/null +++ b/inspector/src/main.rs @@ -0,0 +1,69 @@ +use alto_client::{consensus::Message, Client}; +use clap::{value_parser, Arg, Command}; +use commonware_cryptography::bls12381::PublicKey; +use commonware_utils::from_hex_formatted; +use futures::StreamExt; +use tracing::info; + +#[tokio::main] +async fn main() { + // Parse arguments + let matches = Command::new("inspector") + .about("Monitor alto activity.") + .arg( + Arg::new("indexer") + .long("indexer") + .required(false) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("identity") + .long("identity") + .required(false) + .value_parser(value_parser!(String)), + ) + .get_matches(); + + // Create logger + tracing_subscriber::fmt().init(); + + // Parse the identity + let identity = matches.get_one::("identity").unwrap(); + let identity = from_hex_formatted(identity).unwrap(); + let identity = PublicKey::try_from(identity).expect("Invalid identity"); + + // Connect to the indexer + let indexer = matches.get_one::("indexer").unwrap(); + let client = Client::new(indexer, identity); + + // Stream the chain + let mut stream = client + .register() + .await + .expect("Failed to connect to indexer"); + while let Some(message) = stream.next().await { + let message = message.expect("Failed to receive message"); + match message { + Message::Seed(seed) => { + info!(view = seed.view, "seed"); + } + Message::Notarization(notarized) => { + info!( + view = notarized.proof.view, + height = notarized.block.height, + timestamp = notarized.block.timestamp, + digest = ?notarized.block.digest(), + "notarized"); + } + Message::Finalization(finalized) => { + info!( + view = finalized.proof.view, + height = finalized.block.height, + timestamp = finalized.block.timestamp, + digest = ?finalized.block.digest(), + "finalized" + ); + } + } + } +} From b1639d27ec28eeac1f46da64e31dee5d974544ae Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 9 Mar 2025 22:24:08 -0700 Subject: [PATCH 11/19] inspector tweaks --- inspector/README.md | 8 +++++++- inspector/src/main.rs | 14 ++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/inspector/README.md b/inspector/README.md index 1c77675f..ff53af80 100644 --- a/inspector/README.md +++ b/inspector/README.md @@ -7,4 +7,10 @@ Monitor `alto` activity. ## Status -`alto-inspector` is **ALPHA** software and is not yet recommended for production use. Developers should expect breaking changes and occasional instability. \ No newline at end of file +`alto-inspector` is **ALPHA** software and is not yet recommended for production use. Developers should expect breaking changes and occasional instability. + +## Usage + +```bash +cargo run --release -- --indexer --identity +``` \ No newline at end of file diff --git a/inspector/src/main.rs b/inspector/src/main.rs index 679ea56c..3421038c 100644 --- a/inspector/src/main.rs +++ b/inspector/src/main.rs @@ -1,8 +1,9 @@ use alto_client::{consensus::Message, Client}; use clap::{value_parser, Arg, Command}; use commonware_cryptography::bls12381::PublicKey; -use commonware_utils::from_hex_formatted; +use commonware_utils::{from_hex_formatted, SystemTimeExt}; use futures::StreamExt; +use std::time; use tracing::info; #[tokio::main] @@ -13,13 +14,13 @@ async fn main() { .arg( Arg::new("indexer") .long("indexer") - .required(false) + .required(true) .value_parser(value_parser!(String)), ) .arg( Arg::new("identity") .long("identity") - .required(false) + .required(true) .value_parser(value_parser!(String)), ) .get_matches(); @@ -41,25 +42,30 @@ async fn main() { .register() .await .expect("Failed to connect to indexer"); + info!("connected to indexer"); while let Some(message) = stream.next().await { let message = message.expect("Failed to receive message"); match message { Message::Seed(seed) => { - info!(view = seed.view, "seed"); + info!(view = seed.view, signature = ?seed.signature, "seed"); } Message::Notarization(notarized) => { + let now = time::SystemTime::now().epoch_millis(); info!( view = notarized.proof.view, height = notarized.block.height, timestamp = notarized.block.timestamp, + age = now - notarized.block.timestamp, digest = ?notarized.block.digest(), "notarized"); } Message::Finalization(finalized) => { + let now = time::SystemTime::now().epoch_millis(); info!( view = finalized.proof.view, height = finalized.block.height, timestamp = finalized.block.timestamp, + age = now - finalized.block.timestamp, digest = ?finalized.block.digest(), "finalized" ); From 5888899f21f5ddda8d327543301340ff2d35f368 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 15:30:31 -0700 Subject: [PATCH 12/19] make indexer --- chain/README.md | 8 +- chain/src/bin/setup.rs | 329 +++++++++++++++++++++++++++-------------- 2 files changed, 225 insertions(+), 112 deletions(-) diff --git a/chain/README.md b/chain/README.md index d7aa081b..159b4fc2 100644 --- a/chain/README.md +++ b/chain/README.md @@ -22,15 +22,17 @@ cargo install commonware-deployer ### Create Deployer Artifacts ```bash -cargo run --bin setup -- --peers 10 --bootstrappers 2 --regions us-west-1,us-east-1,eu-west-1,ap-northeast-1,eu-north-1,ap-south-1,sa-east-1,eu-central-1,ap-northeast-2,ap-southeast-2 --instance-type c7g.xlarge --storage-size 10 --storage-class gp3 --worker-threads 4 --message-backlog 16384 --mailbox-size 16384 --dashboard dashboard.json --output assets +cargo run --bin setup -- generate --peers 10 --bootstrappers 2 --regions us-west-1,us-east-1,eu-west-1,ap-northeast-1,eu-north-1,ap-south-1,sa-east-1,eu-central-1,ap-northeast-2,ap-southeast-2 --instance-type c7g.xlarge --storage-size 10 --storage-class gp3 --worker-threads 4 --message-backlog 16384 --mailbox-size 16384 --dashboard dashboard.json --output assets ``` -#### [Optional] Upload Consensus Artifacts to Indexer +### [Optional] Configure Indexer Upload ```bash ---indexer +cargo run --bin setup -- indexer --dir assets --url ``` +_The indexer URL is configured separately because it is typically only known after the threshold key is generated (derived in `setup generate`)._ + ### Build Validator Binary #### Build Cross-Platform Compiler diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index 08753201..473f7c63 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -1,5 +1,5 @@ use alto_chain::Config; -use clap::{value_parser, Arg, Command}; +use clap::{value_parser, Arg, ArgMatches, Command}; use commonware_cryptography::{ bls12381::{ dkg::ops, @@ -10,104 +10,165 @@ use commonware_cryptography::{ use commonware_deployer::ec2; use commonware_utils::{hex, quorum}; use rand::{rngs::OsRng, seq::IteratorRandom}; -use tracing::info; +use std::fs; +use tracing::{error, info}; use uuid::Uuid; const BINARY_NAME: &str = "validator"; const PORT: u16 = 4545; fn main() { - // Parse arguments - let matches = Command::new("setup") - .about("Generate configuration files for an alto chain.") - .arg( - Arg::new("peers") - .long("peers") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("bootstrappers") - .long("bootstrappers") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("regions") - .long("regions") - .required(true) - .value_delimiter(',') - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("instance_type") - .long("instance-type") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("storage_size") - .long("storage-size") - .required(true) - .value_parser(value_parser!(i32)), - ) - .arg( - Arg::new("storage_class") - .long("storage-class") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("worker-threads") - .long("worker-threads") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("message-backlog") - .long("message-backlog") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("mailbox-size") - .long("mailbox-size") - .required(true) - .value_parser(value_parser!(usize)), - ) - .arg( - Arg::new("dashboard") - .long("dashboard") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("output") - .long("output") - .required(true) - .value_parser(value_parser!(String)), - ) - .arg( - Arg::new("indexer") - .long("indexer") - .required(false) - .value_parser(value_parser!(String)), + // Initialize logger + tracing_subscriber::fmt().init(); + + // Define the main command with subcommands + let app = Command::new("setup") + .about("Manage configuration files for an alto chain.") + .subcommand( + Command::new("generate") + .about("Generate configuration files for an alto chain") + .arg( + Arg::new("peers") + .long("peers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("bootstrappers") + .long("bootstrappers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("regions") + .long("regions") + .required(true) + .value_delimiter(',') + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("instance_type") + .long("instance-type") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("storage_size") + .long("storage-size") + .required(true) + .value_parser(value_parser!(i32)), + ) + .arg( + Arg::new("storage_class") + .long("storage-class") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("worker_threads") + .long("worker-threads") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("message_backlog") + .long("message-backlog") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("mailbox_size") + .long("mailbox-size") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("dashboard") + .long("dashboard") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("output") + .long("output") + .required(true) + .value_parser(value_parser!(String)), + ), ) - .get_matches(); + .subcommand( + Command::new("indexer") + .about("Add indexer support for an alto chain.") + .arg( + Arg::new("dir") + .long("dir") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("url") + .long("url") + .required(true) + .value_parser(value_parser!(String)), + ), + ); - // Create logger - tracing_subscriber::fmt().init(); + // Parse arguments + let matches = app.get_matches(); + + // Handle subcommands + match matches.subcommand() { + Some(("generate", sub_matches)) => generate(sub_matches), + Some(("indexer", sub_matches)) => indexer(sub_matches), + _ => { + eprintln!("Invalid subcommand. Use 'generate' or 'indexer'."); + std::process::exit(1); + } + } +} + +fn generate(sub_matches: &ArgMatches) { + // Extract arguments + let peers = *sub_matches.get_one::("peers").unwrap(); + let bootstrappers = *sub_matches.get_one::("bootstrappers").unwrap(); + let regions = sub_matches + .get_many::("regions") + .unwrap() + .cloned() + .collect::>(); + let instance_type = sub_matches + .get_one::("instance_type") + .unwrap() + .clone(); + let storage_size = *sub_matches.get_one::("storage_size").unwrap(); + let storage_class = sub_matches + .get_one::("storage_class") + .unwrap() + .clone(); + let worker_threads = *sub_matches.get_one::("worker_threads").unwrap(); + let message_backlog = *sub_matches.get_one::("message_backlog").unwrap(); + let mailbox_size = *sub_matches.get_one::("mailbox_size").unwrap(); + let dashboard = sub_matches.get_one::("dashboard").unwrap().clone(); + let output = sub_matches.get_one::("output").unwrap().clone(); + + // Construct output path + let raw_current_dir = std::env::current_dir().unwrap(); + let current_dir = raw_current_dir.to_str().unwrap(); + let output = format!("{}/{}", current_dir, output); + + // Check if output directory exists + if fs::metadata(&output).is_ok() { + error!("output directory already exists: {}", output); + std::process::exit(1); + } // Generate UUID let tag = Uuid::new_v4().to_string(); info!(tag, "generated deployment tag"); // Generate peers - let peers = *matches.get_one::("peers").unwrap(); - let bootstrappers = *matches.get_one::("bootstrappers").unwrap(); assert!( bootstrappers <= peers, - "bootstrappers must be less than peers" + "bootstrappers must be less than or equal to peers" ); let mut peer_schemes = (0..peers) .map(|_| Ed25519::new(&mut OsRng)) @@ -134,22 +195,10 @@ fn main() { ); // Generate instance configurations - let regions = matches - .get_many::("regions") - .unwrap() - .cloned() - .collect::>(); assert!( regions.len() <= peers, "must be at least one peer per specified region" ); - let instance_type = matches.get_one::("instance_type").unwrap(); - let storage_size = *matches.get_one::("storage_size").unwrap(); - let storage_class = matches.get_one::("storage_class").unwrap(); - let worker_threads = *matches.get_one::("worker-threads").unwrap(); - let message_backlog = *matches.get_one::("message-backlog").unwrap(); - let mailbox_size = *matches.get_one::("mailbox-size").unwrap(); - let indexer = matches.get_one::("indexer").cloned(); let mut instance_configs = Vec::new(); let mut peer_configs = Vec::new(); for (index, scheme) in peer_schemes.iter().enumerate() { @@ -171,7 +220,7 @@ fn main() { message_backlog, mailbox_size, - indexer: indexer.clone(), + indexer: None, }; peer_configs.push((peer_config_file.clone(), peer_config)); @@ -208,28 +257,90 @@ fn main() { }; // Write configuration files - let raw_current_dir = std::env::current_dir().unwrap(); - let current_dir = raw_current_dir.to_str().unwrap(); - let output = matches.get_one::("output").unwrap(); - let output = format!("{}/{}", current_dir, output); - assert!( - !std::path::Path::new(&output).exists(), - "output directory already exists" - ); - std::fs::create_dir_all(output.clone()).unwrap(); - let dashboard = matches.get_one::("dashboard").unwrap().clone(); - std::fs::copy( + fs::create_dir_all(&output).unwrap(); + fs::copy( format!("{}/{}", current_dir, dashboard), format!("{}/dashboard.json", output), ) .unwrap(); for (peer_config_file, peer_config) in peer_configs { let path = format!("{}/{}", output, peer_config_file); - let file = std::fs::File::create(path).unwrap(); + let file = fs::File::create(&path).unwrap(); serde_yaml::to_writer(file, &peer_config).unwrap(); } let path = format!("{}/config.yaml", output); - let file = std::fs::File::create(path.clone()).unwrap(); + let file = fs::File::create(&path).unwrap(); serde_yaml::to_writer(file, &config).unwrap(); info!(path, "wrote configuration files"); } + +fn indexer(sub_matches: &ArgMatches) { + // Extract arguments + let dir = sub_matches.get_one::("dir").unwrap().clone(); + let url = sub_matches.get_one::("url").unwrap().clone(); + + // Construct directory path + let raw_current_dir = std::env::current_dir().unwrap(); + let current_dir = raw_current_dir.to_str().unwrap(); + let dir = format!("{}/{}", current_dir, dir); + + // Check if directory exists + if fs::metadata(&dir).is_err() { + error!("directory does not exist: {}", dir); + std::process::exit(1); + } + + // Iterate over all peer configuration files and add indexer URL + for entry in fs::read_dir(&dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.ends_with(".yaml") && file_name != "config.yaml" { + let relative_path = path.strip_prefix(&dir).unwrap(); + match fs::read_to_string(&path) { + Ok(content) => match serde_yaml::from_str::(&content) { + Ok(mut config) => { + config.indexer = Some(url.clone()); + match serde_yaml::to_string(&config) { + Ok(updated_content) => { + if let Err(e) = fs::write(&path, updated_content) { + error!( + path = ?relative_path, + error = ?e, + "failed to write", + ); + } else { + info!(path = ?relative_path, "updated"); + } + } + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to serialize config", + ); + } + } + } + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to parse" + ); + } + }, + Err(e) => { + error!( + path = ?relative_path, + error = ?e, + "failed to read", + ); + } + } + } + } + } + } +} From 455478bb2b3e23dd9496a5b2fb97246d814c9ebe Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 15:39:35 -0700 Subject: [PATCH 13/19] remove reserialization --- chain/src/actors/syncer/actor.rs | 13 +++++++++---- client/src/consensus.rs | 26 ++++++++------------------ 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 6abb16b0..fcff9a41 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -12,6 +12,7 @@ use crate::actors::syncer::{ }; use alto_client::Client; use alto_types::{Block, Finalization, Finalized, Notarized}; +use bytes::Bytes; use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest}; use commonware_macros::select; use commonware_p2p::{utils::requester, Receiver, Recipients, Sender}; @@ -483,6 +484,7 @@ impl> Actor> Actor> Actor> Actor> Actor Result<(), Error> { - let request = seed.serialize(); + pub async fn seed_upload(&self, seed: Bytes) -> Result<(), Error> { let result = self .client .post(seed_upload_path(self.uri.clone())) - .body(request) + .body(seed) .send() .await .map_err(Error::Reqwest)?; @@ -90,16 +90,11 @@ impl Client { Ok(result) } - pub async fn notarization_upload( - &self, - proof: Notarization, - block: Block, - ) -> Result<(), Error> { - let request = Notarized::new(proof, block).serialize(); + pub async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Error> { let result = self .client .post(notarization_upload_path(self.uri.clone())) - .body(request) + .body(notarized) .send() .await .map_err(Error::Reqwest)?; @@ -136,16 +131,11 @@ impl Client { Ok(result) } - pub async fn finalization_upload( - &self, - proof: Finalization, - block: Block, - ) -> Result<(), Error> { - let request = Finalized::new(proof, block).serialize(); + pub async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Error> { let result = self .client .post(finalization_upload_path(self.uri.clone())) - .body(request) + .body(finalized) .send() .await .map_err(Error::Reqwest)?; From 86e2a6135b7993eabc0aeeb83ecedaea3ba9d702 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 16:01:49 -0700 Subject: [PATCH 14/19] use finalizer to avoid confusion --- chain/dashboard.json | 2 +- chain/src/actors/syncer/actor.rs | 105 ++++++++++++++++--------------- 2 files changed, 57 insertions(+), 50 deletions(-) diff --git a/chain/dashboard.json b/chain/dashboard.json index 25327174..a908890b 100644 --- a/chain/dashboard.json +++ b/chain/dashboard.json @@ -312,7 +312,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "engine_syncer_latest_height{job=~\"$Validator\", region=~\"$Region\"}", + "expr": "engine_syncer_finalized_height{job=~\"$Validator\", region=~\"$Region\"}", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "{{job}} ({{instance}})", diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index fcff9a41..617272ab 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -67,13 +67,13 @@ pub struct Actor, - // Indexer storage - indexer: Metadata>, + // Finalizer storage + finalizer: Metadata>, // Latest height metric - latest_height: Gauge, + finalized_height: Gauge, // Indexed height metric - indexed_height: Gauge, + contiguous_height: Gauge, } impl> Actor { @@ -171,28 +171,28 @@ impl> Actor> Actor> Actor(1); - self.context.with_label("indexer").spawn({ + let (mut finalizer_sender, mut finalizer_receiver) = mpsc::channel::<()>(1); + self.context.with_label("finalizer").spawn({ let mut resolver = resolver.clone(); let last_view_processed = last_view_processed.clone(); let verified = verified.clone(); @@ -298,7 +298,7 @@ impl> Actor> Actor> Actor> Actor> Actor { // Upload seed to indexer (if available) if let Some(client) = self.client.as_ref() { - let client = client.clone(); - let view = proof.view; - let seed = seed.serialize().into(); - self.context.with_label("indexer").spawn( + self.context.with_label("indexer").spawn({ + let client = client.clone(); + let view = proof.view; move |_| async move { + let seed = seed.serialize().into(); let result = client.seed_upload(seed).await; if let Err(e) = result { warn!(?e, "failed to upload seed"); } debug!(view, "seed uploaded to indexer"); - }); + } + }); } // Check if in buffer @@ -523,8 +527,8 @@ impl> Actor> Actor> Actor { // Upload seed to indexer (if available) if let Some(client) = self.client.as_ref() { - let client = client.clone(); - let view = proof.view; - let seed = seed.serialize().into(); - self.context.with_label("indexer").spawn( + self.context.with_label("indexer").spawn({ + let client = client.clone(); + let view = proof.view; move |_| async move { + let seed = seed.serialize().into(); let result = client.seed_upload(seed).await; if let Err(e) = result { warn!(?e, "failed to upload seed"); } debug!(view, "seed uploaded to indexer"); - }); + } + }); } // Check if in buffer @@ -609,20 +615,20 @@ impl> Actor> Actor> Actor { // Parse block @@ -873,8 +880,8 @@ impl> Actor Date: Mon, 10 Mar 2025 16:11:31 -0700 Subject: [PATCH 15/19] fix end condition --- chain/src/lib.rs | 171 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 147 insertions(+), 24 deletions(-) diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 3a38bb1a..f49cfbb9 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -199,7 +199,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -217,16 +217,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < required_container { - failed = true; + if value >= required_container { + success = true; break; } } } - if !failed { + if !success { break; } @@ -357,7 +357,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = true; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -375,16 +375,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < initial_container_required { - failed = true; + if value >= initial_container_required { + success = true; break; } } } - if !failed { + if success { break; } @@ -438,7 +438,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -456,16 +456,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < final_container_required { - failed = true; + if value >= final_container_required { + success = true; break; } } } - if !failed { + if success { break; } @@ -576,7 +576,7 @@ mod tests { let metrics = context.encode(); // Iterate over all lines - let mut failed = false; + let mut success = false; for line in metrics.lines() { // Ensure it is a metrics line if !line.starts_with("validator-") { @@ -594,16 +594,16 @@ mod tests { assert_eq!(value, 0); } - // If ends with indexed_height, ensure it is at least required_container - if metric.ends_with("_syncer_indexed_height") { + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { let value = value.parse::().unwrap(); - if value < required_container { - failed = true; + if value >= required_container { + success = true; break; } } } - if !failed { + if success { break; } @@ -626,4 +626,127 @@ mod tests { assert!(runs > 1); info!(runs, "unclean shutdown recovery worked"); } + + #[test_traced] + fn test_indexer() { + // Create context + let n = 5; + let threshold = quorum(n).unwrap(); + let required_container = 10; + let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + simulated::Config { + max_size: 1024 * 1024, + }, + ); + + // Start network + network.start(); + + // Register participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + let mut registrations = register_validators(&mut oracle, &validators).await; + + // Link all validators + let link = Link { + latency: 10.0, + jitter: 1.0, + success_rate: 1.0, + }; + link_validators(&mut oracle, &validators, link, None).await; + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + + // Create instances + let mut public_keys = HashSet::new(); + for (idx, scheme) in schemes.into_iter().enumerate() { + // Create scheme context + let public_key = scheme.public_key(); + public_keys.insert(public_key.clone()); + + // Configure engine + let uid = format!("validator-{}", public_key); + let config = engine::Config { + partition_prefix: uid.clone(), + signer: scheme, + identity: public.clone(), + share: shares[idx], + participants: validators.clone(), + mailbox_size: 1024, + backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()), + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + nullify_retry: Duration::from_secs(10), + fetch_timeout: Duration::from_secs(1), + activity_timeout: 10, + max_fetch_count: 10, + max_fetch_size: 1024 * 512, + fetch_concurrent: 10, + fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), + indexer: None, + }; + let engine = Engine::new(context.with_label(&uid), config).await; + + // Get networking + let (voter, resolver, broadcast, backfill) = + registrations.remove(&public_key).unwrap(); + + // Start engine + engine.start(voter, resolver, broadcast, backfill); + } + + // Poll metrics + loop { + let metrics = context.encode(); + + // Iterate over all lines + let mut success = false; + for line in metrics.lines() { + // Ensure it is a metrics line + if !line.starts_with("validator-") { + continue; + } + + // Split metric and value + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + // If ends with peers_blocked, ensure it is zero + if metric.ends_with("_peers_blocked") { + let value = value.parse::().unwrap(); + assert_eq!(value, 0); + } + + // If ends with contiguous_height, ensure it is at least required_container + if metric.ends_with("_syncer_contiguous_height") { + let value = value.parse::().unwrap(); + if value >= required_container { + success = true; + break; + } + } + } + if success { + break; + } + + // Still waiting for all validators to complete + context.sleep(Duration::from_secs(1)).await; + } + }); + } } From b7f4c964e2e2f7dbfe78e45a45882101dad13da8 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 16:59:14 -0700 Subject: [PATCH 16/19] [chain] Add `Indexer` trait (#5) * pipe indexer into engine * basic indexer test * check indexer is working --- chain/src/actors/syncer/actor.rs | 55 +++++++------ chain/src/actors/syncer/mod.rs | 6 +- chain/src/bin/validator.rs | 12 ++- chain/src/engine.rs | 23 ++++-- chain/src/lib.rs | 129 +++++++++++++++++++++++++++++-- 5 files changed, 178 insertions(+), 47 deletions(-) diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 617272ab..d6b5f997 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -6,11 +6,13 @@ use super::{ ingress::{Mailbox, Message}, Config, }; -use crate::actors::syncer::{ - handler, - key::{self, MultiIndex, Value}, +use crate::{ + actors::syncer::{ + handler, + key::{self, MultiIndex, Value}, + }, + Indexer, }; -use alto_client::Client; use alto_types::{Block, Finalization, Finalized, Notarized}; use bytes::Bytes; use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest}; @@ -44,7 +46,7 @@ use std::{ use tracing::{debug, info, warn}; /// Application actor. -pub struct Actor> { +pub struct Actor, I: Indexer> { context: R, public_key: PublicKey, public: bls12381::PublicKey, @@ -53,7 +55,7 @@ pub struct Actor, + indexer: Option, // Blocks verified stored by view<>digest verified: Archive, @@ -76,9 +78,9 @@ pub struct Actor> Actor { +impl, I: Indexer> Actor { /// Create a new application actor. - pub async fn init(context: R, config: Config) -> (Self, Mailbox) { + pub async fn init(context: R, config: Config) -> (Self, Mailbox) { // Initialize verified blocks let verified_journal = Journal::init( context.with_label("verified_journal"), @@ -197,13 +199,6 @@ impl> Actor> Actor> Actor { // Upload seed to indexer (if available) - if let Some(client) = self.client.as_ref() { + if let Some(indexer) = self.indexer.as_ref() { self.context.with_label("indexer").spawn({ - let client = client.clone(); + let indexer = indexer.clone(); let view = proof.view; move |_| async move { let seed = seed.serialize().into(); - let result = client.seed_upload(seed).await; + let result = indexer.seed_upload(seed).await; if let Err(e) = result { warn!(?e, "failed to upload seed"); + return; } debug!(view, "seed uploaded to indexer"); } @@ -526,15 +522,16 @@ impl> Actor> Actor { // Upload seed to indexer (if available) - if let Some(client) = self.client.as_ref() { + if let Some(indexer) = self.indexer.as_ref() { self.context.with_label("indexer").spawn({ - let client = client.clone(); + let indexer = indexer.clone(); let view = proof.view; move |_| async move { let seed = seed.serialize().into(); - let result = client.seed_upload(seed).await; + let result = indexer.seed_upload(seed).await; if let Err(e) = result { warn!(?e, "failed to upload seed"); + return; } debug!(view, "seed uploaded to indexer"); } @@ -625,16 +623,17 @@ impl> Actor { pub partition_prefix: String, pub public_key: PublicKey, @@ -30,5 +32,5 @@ pub struct Config { pub activity_timeout: u64, - pub indexer: Option, + pub indexer: Option, } diff --git a/chain/src/bin/validator.rs b/chain/src/bin/validator.rs index 677afcf8..030fcc75 100644 --- a/chain/src/bin/validator.rs +++ b/chain/src/bin/validator.rs @@ -1,4 +1,5 @@ use alto_chain::{engine, Config}; +use alto_client::Client; use alto_types::P2P_NAMESPACE; use axum::{routing::get, serve, Extension, Router}; use clap::{Arg, Command}; @@ -91,11 +92,12 @@ fn main() { let threshold = quorum(peers_u32).expect("unable to derive quorum"); let identity = from_hex_formatted(&config.identity).expect("Could not parse identity"); let identity = poly::Public::deserialize(&identity, threshold).expect("Identity is invalid"); + let identity_public = poly::public(&identity); let public_key = signer.public_key(); let ip = peers.get(&public_key).expect("Could not find self in IPs"); info!( ?public_key, - identity = hex(&poly::public(&identity).serialize()), + identity = hex(&identity_public.serialize()), ?ip, port = config.port, "loaded config" @@ -177,6 +179,12 @@ fn main() { // Create network let p2p = network.start(); + // Create indexer + let mut indexer = None; + if let Some(uri) = config.indexer { + indexer = Some(Client::new(&uri, identity_public.into())); + } + // Create engine let config = engine::Config { partition_prefix: "engine".to_string(), @@ -195,7 +203,7 @@ fn main() { max_fetch_size: MAX_FETCH_SIZE, fetch_concurrent: FETCH_CONCURRENT, fetch_rate_per_peer: resolver_limit, - indexer: config.indexer, + indexer, }; let engine = engine::Engine::new(context.with_label("engine"), config).await; diff --git a/chain/src/engine.rs b/chain/src/engine.rs index 89fa0437..2bb8dabc 100644 --- a/chain/src/engine.rs +++ b/chain/src/engine.rs @@ -1,4 +1,7 @@ -use crate::actors::{application, syncer}; +use crate::{ + actors::{application, syncer}, + Indexer, +}; use alto_types::NAMESPACE; use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover}; use commonware_cryptography::{ @@ -17,7 +20,7 @@ use rand::{CryptoRng, Rng}; use std::time::Duration; use tracing::{error, warn}; -pub struct Config { +pub struct Config { pub partition_prefix: String, pub signer: Ed25519, pub identity: Poly, @@ -36,14 +39,18 @@ pub struct Config { pub fetch_concurrent: usize, pub fetch_rate_per_peer: Quota, - pub indexer: Option, + pub indexer: Option, } -pub struct Engine + Metrics> { +pub struct Engine< + B: Blob, + E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics, + I: Indexer, +> { context: E, application: application::Actor, - syncer: syncer::Actor, + syncer: syncer::Actor, syncer_mailbox: syncer::Mailbox, consensus: Consensus< B, @@ -57,8 +64,10 @@ pub struct Engine, } -impl + Metrics> Engine { - pub async fn new(context: E, cfg: Config) -> Self { +impl + Metrics, I: Indexer> + Engine +{ + pub async fn new(context: E, cfg: Config) -> Self { // Create the application let public = public(&cfg.identity); let (application, supervisor, application_mailbox) = application::Actor::new( diff --git a/chain/src/lib.rs b/chain/src/lib.rs index f49cfbb9..dd73bb76 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -1,8 +1,63 @@ +use std::future::Future; + +use bytes::Bytes; +use commonware_cryptography::bls12381; use serde::{Deserialize, Serialize}; pub mod actors; pub mod engine; +/// Trait for interacting with an indexer. +pub trait Indexer: Clone + Send + Sync + 'static { + type Error: std::error::Error + Send + Sync + 'static; + + /// Create a new indexer with the given URI and public key. + fn new(uri: &str, public: bls12381::PublicKey) -> Self; + + /// Upload a seed to the indexer. + fn seed_upload(&self, seed: Bytes) -> impl Future> + Send; + + /// Upload a notarization to the indexer. + fn notarization_upload( + &self, + notarized: Bytes, + ) -> impl Future> + Send; + + /// Upload a finalization to the indexer. + fn finalization_upload( + &self, + finalized: Bytes, + ) -> impl Future> + Send; +} + +impl Indexer for alto_client::Client { + type Error = alto_client::Error; + fn new(uri: &str, public: bls12381::PublicKey) -> Self { + Self::new(uri, public) + } + + fn seed_upload( + &self, + seed: bytes::Bytes, + ) -> impl Future> + Send { + self.seed_upload(seed) + } + + fn notarization_upload( + &self, + notarization: bytes::Bytes, + ) -> impl Future> + Send { + self.notarization_upload(notarization) + } + + fn finalization_upload( + &self, + finalization: bytes::Bytes, + ) -> impl Future> + Send { + self.finalization_upload(finalization) + } +} + #[derive(Deserialize, Serialize)] pub struct Config { pub private_key: String, @@ -25,6 +80,8 @@ pub struct Config { #[cfg(test)] mod tests { use super::*; + use alto_types::{Finalized, Notarized, Seed}; + use bls12381::primitives::poly; use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme}; use commonware_macros::test_traced; use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender}; @@ -33,17 +90,61 @@ mod tests { Clock, Metrics, Runner, Spawner, }; use commonware_utils::quorum; - use engine::Engine; + use engine::{Config, Engine}; use governor::Quota; use rand::{rngs::StdRng, Rng, SeedableRng}; - use std::time::Duration; use std::{ collections::{HashMap, HashSet}, num::NonZeroU32, sync::{Arc, Mutex}, }; + use std::{sync::atomic::AtomicBool, time::Duration}; use tracing::info; + /// MockIndexer is a simple indexer implementation for testing. + #[derive(Clone)] + struct MockIndexer { + public: bls12381::PublicKey, + + seed_seen: Arc, + notarization_seen: Arc, + finalization_seen: Arc, + } + + impl Indexer for MockIndexer { + type Error = std::io::Error; + + fn new(_: &str, public: bls12381::PublicKey) -> Self { + MockIndexer { + public, + seed_seen: Arc::new(AtomicBool::new(false)), + notarization_seen: Arc::new(AtomicBool::new(false)), + finalization_seen: Arc::new(AtomicBool::new(false)), + } + } + + async fn seed_upload(&self, seed: Bytes) -> Result<(), Self::Error> { + Seed::deserialize(Some(&self.public), &seed).unwrap(); + self.seed_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Self::Error> { + Notarized::deserialize(Some(&self.public), ¬arized).unwrap(); + self.notarization_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + + async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Self::Error> { + Finalized::deserialize(Some(&self.public), &finalized).unwrap(); + self.finalization_seen + .store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + } + /// Registers all validators using the oracle. async fn register_validators( oracle: &mut Oracle, @@ -165,7 +266,7 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, identity: public.clone(), @@ -323,7 +424,7 @@ mod tests { // Configure engine let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), identity: public.clone(), @@ -406,7 +507,7 @@ mod tests { let share = shares[0]; let public_key = scheme.public_key(); let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme.clone(), identity: public.clone(), @@ -539,7 +640,7 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, identity: public.clone(), @@ -670,6 +771,9 @@ mod tests { // Derive threshold let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + // Define mock indexer + let indexer = MockIndexer::new("", poly::public(&public).into()); + // Create instances let mut public_keys = HashSet::new(); for (idx, scheme) in schemes.into_iter().enumerate() { @@ -679,7 +783,7 @@ mod tests { // Configure engine let uid = format!("validator-{}", public_key); - let config = engine::Config { + let config: Config = engine::Config { partition_prefix: uid.clone(), signer: scheme, identity: public.clone(), @@ -696,7 +800,7 @@ mod tests { max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), - indexer: None, + indexer: Some(indexer.clone()), }; let engine = Engine::new(context.with_label(&uid), config).await; @@ -747,6 +851,15 @@ mod tests { // Still waiting for all validators to complete context.sleep(Duration::from_secs(1)).await; } + + // Check indexer uploads + assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed)); + assert!(indexer + .notarization_seen + .load(std::sync::atomic::Ordering::Relaxed)); + assert!(indexer + .finalization_seen + .load(std::sync::atomic::Ordering::Relaxed)); }); } } From c67df8de5b3c3e92aab574f23facfae384a01865 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 18:02:39 -0700 Subject: [PATCH 17/19] [inspector] Add remaining functionality (#7) * cleanup inspector * finish inspector * refactor code * update README * add much faster * add connection prep * add debug * add range * fetch range * update README --- client/src/consensus.rs | 6 +- client/src/utils.rs | 4 +- inspector/Cargo.toml | 4 + inspector/README.md | 68 +++++++++- inspector/src/main.rs | 278 ++++++++++++++++++++++++++++++++-------- inspector/src/utils.rs | 135 +++++++++++++++++++ 6 files changed, 436 insertions(+), 59 deletions(-) create mode 100644 inspector/src/utils.rs diff --git a/client/src/consensus.rs b/client/src/consensus.rs index cc49df17..f06aa9de 100644 --- a/client/src/consensus.rs +++ b/client/src/consensus.rs @@ -34,7 +34,7 @@ fn block_get_path(base: String, query: &Query) -> String { format!("{}/block/{}", base, query.serialize()) } -fn register_path(base: String) -> String { +fn listen_path(base: String) -> String { format!("{}/consensus/ws", base) } @@ -211,9 +211,9 @@ impl Client { Ok(result) } - pub async fn register(&self) -> Result>, Error> { + pub async fn listen(&self) -> Result>, Error> { // Connect to the websocket endpoint - let (stream, _) = connect_async(register_path(self.ws_uri.clone())) + let (stream, _) = connect_async(listen_path(self.ws_uri.clone())) .await .map_err(Error::from)?; let (_, read) = stream.split(); diff --git a/client/src/utils.rs b/client/src/utils.rs index 8d2d6a19..6bd3c31d 100644 --- a/client/src/utils.rs +++ b/client/src/utils.rs @@ -6,8 +6,8 @@ fn healthy_path(base: String) -> String { impl Client { pub async fn health(&self) -> Result<(), Error> { - let client = reqwest::Client::new(); - let result = client + let result = self + .client .get(healthy_path(self.uri.clone())) .send() .await diff --git a/inspector/Cargo.toml b/inspector/Cargo.toml index 026c4d12..216981e5 100644 --- a/inspector/Cargo.toml +++ b/inspector/Cargo.toml @@ -23,3 +23,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio = { version = "1.40.0", features = ["full"] } futures = { workspace = true } + +[[bin]] +name = "inspector" +path = "src/main.rs" diff --git a/inspector/README.md b/inspector/README.md index ff53af80..5f8d870a 100644 --- a/inspector/README.md +++ b/inspector/README.md @@ -9,8 +9,74 @@ Monitor `alto` activity. `alto-inspector` is **ALPHA** software and is not yet recommended for production use. Developers should expect breaking changes and occasional instability. +## Installation + +### Local + +```bash +cargo install --path . --force +``` + +### Crates.io + +```bash +cargo install alto-inspector +``` + ## Usage +_Use `-v` or `--verbose` to enable verbose logging (like request latency). Use `--prepare` to initialize the connection before making the request (for accurate latency measurement)._ + +### Get the latest seed + +```bash +inspector get seed latest --indexer --identity +``` + +### Get the notarization for view 100 + +```bash +inspector get notarization 100 --indexer --identity +``` + +### Get the notarizations between views 100 to 110 + +```bash +inspector get notarization 100..110 --indexer --identity +``` + +### Get the finalization for view 50 + +```bash +inspector get finalization 50 --indexer --identity +``` + +### Get the latest finalized block + +```bash +inspector get block latest --indexer --identity +``` + +### Get the block at height 10 + +```bash +inspector get block 10 --indexer --identity +``` + +### Get the blocks between heights 10 and 20 + +```bash +inspector get block 10..20 --indexer --identity +``` + +### Get the block with a specific digest + +```bash +inspector -- get block 0x65016ff40e824e21fffe903953c07b6d604dbcf39f681c62e7b3ed57ab1d1994 --indexer --identity +``` + +### Listen for consensus events + ```bash -cargo run --release -- --indexer --identity +inspector listen --indexer --identity ``` \ No newline at end of file diff --git a/inspector/src/main.rs b/inspector/src/main.rs index 3421038c..b700c302 100644 --- a/inspector/src/main.rs +++ b/inspector/src/main.rs @@ -1,75 +1,247 @@ -use alto_client::{consensus::Message, Client}; +use alto_client::{ + consensus::{Message, Payload}, + Client, IndexQuery, Query, +}; use clap::{value_parser, Arg, Command}; use commonware_cryptography::bls12381::PublicKey; -use commonware_utils::{from_hex_formatted, SystemTimeExt}; +use commonware_utils::from_hex_formatted; use futures::StreamExt; -use std::time; -use tracing::info; +use tracing::{info, warn, Level}; +use utils::{ + log_block, log_finalization, log_latency, log_notarization, log_seed, parse_index_query, + parse_query, IndexQueryKind, QueryKind, +}; + +mod utils; #[tokio::main] async fn main() { - // Parse arguments let matches = Command::new("inspector") .about("Monitor alto activity.") .arg( - Arg::new("indexer") - .long("indexer") - .required(true) - .value_parser(value_parser!(String)), + Arg::new("verbose") + .short('v') + .long("verbose") + .help("Enable debug logging") + .global(true) + .action(clap::ArgAction::SetTrue), ) - .arg( - Arg::new("identity") - .long("identity") - .required(true) - .value_parser(value_parser!(String)), + .subcommand( + Command::new("listen") + .about("Listen for consensus messages") + .arg( + Arg::new("indexer") + .long("indexer") + .required(true) + .value_parser(value_parser!(String)) + .help("URL of the indexer to connect to"), + ) + .arg( + Arg::new("identity") + .long("identity") + .required(true) + .value_parser(value_parser!(String)) + .help("Hex-encoded public key of the identity"), + ), + ) + .subcommand( + Command::new("get") + .about("Get specific consensus data") + .arg( + Arg::new("type") + .required(true) + .value_parser(["seed", "notarization", "finalization", "block"]) + .help("Type of data to retrieve"), + ) + .arg( + Arg::new("query") + .required(true) + .value_parser(value_parser!(String)) + .help("Query parameter (e.g., 'latest', number, range like '23..45', or hex digest for block)"), + ) + .arg( + Arg::new("indexer") + .long("indexer") + .required(true) + .value_parser(value_parser!(String)) + .help("URL of the indexer to connect to"), + ) + .arg( + Arg::new("identity") + .long("identity") + .required(true) + .value_parser(value_parser!(String)) + .help("Hex-encoded public key of the identity"), + ) + .arg( + Arg::new("prepare") + .long("prepare") + .help("Prepare the connection for some request to get a more accurate latency observation") + .required(false) + .action(clap::ArgAction::SetTrue), + ), ) .get_matches(); - // Create logger - tracing_subscriber::fmt().init(); + let log_level = if matches.get_flag("verbose") { + Level::DEBUG + } else { + Level::INFO + }; + tracing_subscriber::fmt().with_max_level(log_level).init(); + + if let Some(matches) = matches.subcommand_matches("listen") { + let indexer = matches.get_one::("indexer").unwrap(); + let identity = matches.get_one::("identity").unwrap(); + let identity = from_hex_formatted(identity).expect("Failed to decode identity"); + let identity = PublicKey::try_from(identity).expect("Invalid identity"); + let client = Client::new(indexer, identity); - // Parse the identity - let identity = matches.get_one::("identity").unwrap(); - let identity = from_hex_formatted(identity).unwrap(); - let identity = PublicKey::try_from(identity).expect("Invalid identity"); + let mut stream = client.listen().await.expect("Failed to connect to indexer"); + info!("listening for consensus messages..."); + while let Some(message) = stream.next().await { + let message = message.expect("Failed to receive message"); + match message { + Message::Seed(seed) => log_seed(seed), + Message::Notarization(notarized) => log_notarization(notarized), + Message::Finalization(finalized) => log_finalization(finalized), + } + } + } else if let Some(matches) = matches.subcommand_matches("get") { + let type_ = matches.get_one::("type").unwrap(); + let query_str = matches.get_one::("query").unwrap(); + let indexer = matches.get_one::("indexer").unwrap(); + let identity = matches.get_one::("identity").unwrap(); + let identity = from_hex_formatted(identity).expect("Failed to decode identity"); + let identity = PublicKey::try_from(identity).expect("Invalid identity"); + let client = Client::new(indexer, identity); + let prepare_flag = matches.get_flag("prepare"); - // Connect to the indexer - let indexer = matches.get_one::("indexer").unwrap(); - let client = Client::new(indexer, identity); + if prepare_flag { + client.health().await.expect("Failed to prepare connection"); + info!("connection prepared"); + } - // Stream the chain - let mut stream = client - .register() - .await - .expect("Failed to connect to indexer"); - info!("connected to indexer"); - while let Some(message) = stream.next().await { - let message = message.expect("Failed to receive message"); - match message { - Message::Seed(seed) => { - info!(view = seed.view, signature = ?seed.signature, "seed"); + match type_.as_str() { + "seed" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let seed = client.seed_get(query).await.expect("Failed to get seed"); + log_latency(start); + log_seed(seed); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.seed_get(query).await { + Ok(seed) => { + log_latency(start); + log_seed(seed); + } + Err(e) => { + warn!(view, error=?e, "failed to get seed"); + } + } + } + } + } + } + "notarization" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let notarized = client + .notarization_get(query) + .await + .expect("Failed to get notarization"); + log_latency(start); + log_notarization(notarized); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.notarization_get(query).await { + Ok(notarized) => { + log_latency(start); + log_notarization(notarized); + } + Err(e) => { + warn!(view, error=?e, "failed to get notarization"); + } + } + } + } + } } - Message::Notarization(notarized) => { - let now = time::SystemTime::now().epoch_millis(); - info!( - view = notarized.proof.view, - height = notarized.block.height, - timestamp = notarized.block.timestamp, - age = now - notarized.block.timestamp, - digest = ?notarized.block.digest(), - "notarized"); + "finalization" => { + let query_kind = parse_index_query(query_str).expect("Invalid query"); + match query_kind { + IndexQueryKind::Single(query) => { + let start = std::time::Instant::now(); + let finalized = client + .finalization_get(query) + .await + .expect("Failed to get finalization"); + log_latency(start); + log_finalization(finalized); + } + IndexQueryKind::Range(start_view, end_view) => { + for view in start_view..end_view { + let start = std::time::Instant::now(); + let query = IndexQuery::Index(view); + match client.finalization_get(query).await { + Ok(finalized) => { + log_latency(start); + log_finalization(finalized); + } + Err(e) => { + warn!(view, error=?e, "failed to get finalization"); + } + } + } + } + } } - Message::Finalization(finalized) => { - let now = time::SystemTime::now().epoch_millis(); - info!( - view = finalized.proof.view, - height = finalized.block.height, - timestamp = finalized.block.timestamp, - age = now - finalized.block.timestamp, - digest = ?finalized.block.digest(), - "finalized" - ); + "block" => { + let query_kind = parse_query(query_str).expect("Invalid query"); + match query_kind { + QueryKind::Single(query) => { + let start = std::time::Instant::now(); + let payload = client.block_get(query).await.expect("Failed to get block"); + log_latency(start); + match payload { + Payload::Finalized(finalized) => log_finalization(*finalized), + Payload::Block(block) => log_block(block), + } + } + QueryKind::Range(start_height, end_height) => { + for height in start_height..end_height { + let start = std::time::Instant::now(); + let query = Query::Index(height); + match client.block_get(query).await { + Ok(payload) => { + log_latency(start); + match payload { + Payload::Finalized(finalized) => { + log_finalization(*finalized) + } + Payload::Block(block) => log_block(block), + } + } + Err(e) => { + warn!(height, error=?e, "failed to get block"); + } + } + } + } + } } + _ => unreachable!(), } } } diff --git a/inspector/src/utils.rs b/inspector/src/utils.rs new file mode 100644 index 00000000..7b4b0009 --- /dev/null +++ b/inspector/src/utils.rs @@ -0,0 +1,135 @@ +use alto_client::{IndexQuery, Query}; +use alto_types::{Finalized, Notarized, Seed}; +use commonware_cryptography::sha256::Digest; +use commonware_utils::{SizedSerialize, SystemTimeExt}; +use std::time; +use tracing::{debug, info}; + +// Define enums for query kinds +pub enum IndexQueryKind { + Single(IndexQuery), + Range(u64, u64), +} + +pub enum QueryKind { + Single(Query), + Range(u64, u64), +} + +// Parse IndexQuery for seed, notarization, and finalization +pub fn parse_index_query(query: &str) -> Option { + if query == "latest" { + Some(IndexQueryKind::Single(IndexQuery::Latest)) + } else if let Some((start, end)) = parse_range(query) { + Some(IndexQueryKind::Range(start, end)) + } else if let Ok(index) = query.parse::() { + Some(IndexQueryKind::Single(IndexQuery::Index(index))) + } else { + None + } +} + +// Parse Query for block +pub fn parse_query(query: &str) -> Option { + if query == "latest" { + Some(QueryKind::Single(Query::Latest)) + } else if let Some((start, end)) = parse_range(query) { + Some(QueryKind::Range(start, end)) + } else if let Ok(index) = query.parse::() { + Some(QueryKind::Single(Query::Index(index))) + } else { + let bytes = commonware_utils::from_hex(query)?; + let digest: [u8; Digest::SERIALIZED_LEN] = bytes.try_into().ok()?; + Some(QueryKind::Single(Query::Digest(digest.into()))) + } +} + +// Helper function to parse range queries +fn parse_range(query: &str) -> Option<(u64, u64)> { + let parts: Vec<&str> = query.split("..").collect(); + if parts.len() == 2 { + let start = parts[0].parse::().ok()?; + let end = parts[1].parse::().ok()?; + if start <= end { + Some((start, end)) + } else { + None + } + } else { + None + } +} + +// Existing logging functions remain unchanged +const MS_PER_SECOND: u64 = 1000; +const MS_PER_HOUR: u64 = 3_600_000; +const MS_PER_DAY: u64 = 86_400_000; + +pub fn format_age(age: u64) -> String { + if age < MS_PER_SECOND { + format!("{}ms", age) + } else if age < MS_PER_HOUR { + let seconds = age as f64 / MS_PER_SECOND as f64; + format!("{:.1}s", seconds) + } else if age < MS_PER_DAY { + let hours = age as f64 / MS_PER_HOUR as f64; + format!("{:.1}h", hours) + } else { + let days = age / MS_PER_DAY; + let remaining_ms = age % MS_PER_DAY; + let hours = remaining_ms / MS_PER_HOUR; + format!("{}d {}h", days, hours) + } +} + +pub fn log_seed(seed: Seed) { + info!(view = seed.view, signature = ?seed.signature, "seed"); +} + +pub fn log_notarization(notarized: Notarized) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(notarized.block.timestamp); + let age_str = format_age(age_ms); + info!( + view = notarized.proof.view, + height = notarized.block.height, + timestamp = notarized.block.timestamp, + age = %age_str, + digest = ?notarized.block.digest(), + "notarized" + ); +} + +pub fn log_finalization(finalized: Finalized) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(finalized.block.timestamp); + let age_str = format_age(age_ms); + info!( + view = finalized.proof.view, + height = finalized.block.height, + timestamp = finalized.block.timestamp, + age = %age_str, + digest = ?finalized.block.digest(), + "finalized" + ); +} + +pub fn log_block(block: alto_types::Block) { + let now = time::SystemTime::now().epoch_millis(); + let age_ms = now.saturating_sub(block.timestamp); + let age_str = format_age(age_ms); + info!( + height = block.height, + timestamp = block.timestamp, + age = %age_str, + digest = ?block.digest(), + "block" + ); +} + +pub fn log_latency(start: time::Instant) { + let elapsed = start.elapsed(); + let elapsed_ms = elapsed.as_millis(); + let elapsed_str = format_age(elapsed_ms as u64); + debug!(elapsed = %elapsed_str, "latency"); +} From 7f12320e32a7180b61fccf70ac579072193c1b8a Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 18:04:51 -0700 Subject: [PATCH 18/19] update versions to 0.0.3 --- Cargo.lock | 8 +++--- Cargo.toml | 4 +-- chain/Cargo.toml | 2 +- client/Cargo.toml | 2 +- inspector/Cargo.toml | 2 +- scripts/bump_versions.sh | 58 ++++++++++++++++++++++++++++++++++++++++ types/Cargo.toml | 2 +- 7 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 scripts/bump_versions.sh diff --git a/Cargo.lock b/Cargo.lock index 868eb25a..98204930 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ dependencies = [ [[package]] name = "alto-chain" -version = "0.0.2" +version = "0.0.3" dependencies = [ "alto-client", "alto-types", @@ -70,7 +70,7 @@ dependencies = [ [[package]] name = "alto-client" -version = "0.0.2" +version = "0.0.3" dependencies = [ "alto-types", "bytes", @@ -86,7 +86,7 @@ dependencies = [ [[package]] name = "alto-inspector" -version = "0.0.2" +version = "0.0.3" dependencies = [ "alto-client", "alto-types", @@ -104,7 +104,7 @@ dependencies = [ [[package]] name = "alto-types" -version = "0.0.2" +version = "0.0.3" dependencies = [ "bytes", "commonware-cryptography", diff --git a/Cargo.toml b/Cargo.toml index 2dcc16a0..cb0e0f0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ members = [ resolver = "2" [workspace.dependencies] -alto-client = { version = "0.0.2", path = "client" } -alto-types = { version = "0.0.2", path = "types" } +alto-client = { version = "0.0.3", path = "client" } +alto-types = { version = "0.0.3", path = "types" } commonware-consensus = { version = "0.0.40" } commonware-cryptography = { version = "0.0.40" } commonware-deployer = { version = "0.0.40" } diff --git a/chain/Cargo.toml b/chain/Cargo.toml index a7b0ba35..3542575d 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-chain" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" diff --git a/client/Cargo.toml b/client/Cargo.toml index 40bbb016..a1ce81c6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-client" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" diff --git a/inspector/Cargo.toml b/inspector/Cargo.toml index 216981e5..12508706 100644 --- a/inspector/Cargo.toml +++ b/inspector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-inspector" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" diff --git a/scripts/bump_versions.sh b/scripts/bump_versions.sh new file mode 100644 index 00000000..ba3f4719 --- /dev/null +++ b/scripts/bump_versions.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +# +# Increment patch versions for crates/packages whose name starts with "alto-" +# in both package declarations and [workspace.dependencies]. + +set -euo pipefail + +# Function: bump the patch number in e.g., 0.0.14 -> 0.0.15 +bump_version() { + local old="$1" + local major minor patch + IFS='.' read -r major minor patch <<< "${old}" + patch=$((patch + 1)) + echo "${major}.${minor}.${patch}" +} + +# Recursively find all Cargo.toml files +find . -name "Cargo.toml" | while read -r cargo_file; do + # We'll store updated file content in an array + content=() + changed=false + + # Read the file line by line + name="" + while IFS= read -r line; do + # 1) Match workspace deps like: alto-foo = { version = "0.0.3", path = "foo" } + if [[ "${line}" =~ ^[[:space:]]*(alto-[^[:space:]]+)[[:space:]]*=\ {[[:space:]]*version[[:space:]]*=[[:space:]]*\"([0-9]+\.[0-9]+\.[0-9]+)\" ]]; then + old="${BASH_REMATCH[2]}" + new="$(bump_version "${old}")" + line="${line/${old}/${new}}" + changed=true + fi + + # 2) Check for package name lines like: name = "alto-foo" + if [[ "${line}" =~ ^[[:space:]]*name[[:space:]]*=[[:space:]]*\"(alto-[^\"]+)\" ]]; then + name="${BASH_REMATCH[1]}" + else + # 3) If name is set, we may be on a version line + if [[ -n "${name}" && "${line}" =~ ^[[:space:]]*version[[:space:]]*=[[:space:]]*\"([0-9]+\.[0-9]+\.[0-9]+)\" ]]; then + old="${BASH_REMATCH[1]}" + new="$(bump_version "${old}")" + line="${line/${old}/${new}}" + changed=true + name="" + fi + fi + + content+=("${line}") + done < "${cargo_file}" + + # If we changed anything, overwrite the file + if ${changed}; then + for line in "${content[@]}"; do + printf "%s\n" "${line}" + done > "${cargo_file}" + echo "Updated ${cargo_file}" + fi +done \ No newline at end of file diff --git a/types/Cargo.toml b/types/Cargo.toml index 0bfae034..d93ea756 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alto-types" -version = "0.0.2" +version = "0.0.3" publish = true edition = "2021" license = "MIT OR Apache-2.0" From cb1562e7799041a6bb3ee658a4f6107300d6511b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 10 Mar 2025 18:05:46 -0700 Subject: [PATCH 19/19] make bump_versions executable --- scripts/bump_versions.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 scripts/bump_versions.sh diff --git a/scripts/bump_versions.sh b/scripts/bump_versions.sh old mode 100644 new mode 100755