From bb3a535b3e2e687cff6566911d55d82616acb5f0 Mon Sep 17 00:00:00 2001 From: Danial Mehrjerdi Date: Mon, 30 Jun 2025 10:13:29 +0200 Subject: [PATCH 1/4] Add support for multiple server urls --- README.md | 11 +++++++++-- src/api_client.rs | 4 ++++ src/config.rs | 4 ++-- src/main.rs | 45 ++++++++++++++++++++++++++++----------------- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 3a349fe..e77d7d6 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,16 @@ Make sure to set `RUST_LOG=INFO` to enable logs from tracing: ```bash RUST_LOG=INFO cargo run -- run \ --pythnet-url wss://api2.pythnet.pyth.network \ - --server-url https://quorum.pyth.network \ + --server-url https://quorum-green.pyth.network \ + --server-url https://quorum-yellow.pyth.network \ + --server-url https://quorum-cyan.pyth.network \ --secret-key /path/to/secret.key \ --wormhole-pid H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU ``` +You can specify multiple `--server-url` flags to broadcast observations to more than one server. + + --- ### 🌱 Environment Variables (Optional) @@ -41,7 +46,7 @@ Instead of CLI flags, you can also set environment variables: ```bash export PYTHNET_URL=wss://api2.pythnet.pyth.network -export SERVER_URL=https://quorum.pyth.network +export SERVER_URL=https://quorum-green.pyth.network,https://quorum-yellow.pyth.network,https://quorum-cyan.pyth.network export SECRET_KEY=/path/to/secret.key export WORMHOLE_PID=H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU export RUST_LOG=INFO @@ -49,6 +54,8 @@ export RUST_LOG=INFO cargo run ``` +You can provide multiple server URLs in the `SERVER_URL` environment variable by separating them with commas. + --- ### 🔑 Generate a Secret Key diff --git a/src/api_client.rs b/src/api_client.rs index c47b231..2ae5d25 100644 --- a/src/api_client.rs +++ b/src/api_client.rs @@ -55,6 +55,10 @@ impl Observation

{ } impl ApiClient { + pub fn get_base_url(&self) -> &Url { + &self.inner.base_url + } + pub fn try_new( base_url: String, config: Option, diff --git a/src/config.rs b/src/config.rs index de0785c..427a596 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,8 +21,8 @@ pub struct RunOptions { default_value = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU" )] pub wormhole_pid: String, - #[arg(long = "server-url", env = "SERVER_URL")] - pub server_url: String, + #[arg(long = "server-url", env = "SERVER_URL", value_delimiter = ',')] + pub server_urls: Vec, #[arg(long = "mode", env = "MODE", default_value = "production")] pub mode: Mode, } diff --git a/src/main.rs b/src/main.rs index 8aa231b..e503dbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,7 +41,7 @@ struct RunListenerInput { signer: T, wormhole_pid: Pubkey, accumulator_address: Pubkey, - api_client: ApiClient, + api_clients: Vec, } fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey { @@ -147,21 +147,27 @@ async fn run_listener( Err(_) => continue, }; - tokio::spawn({ - let (api_client, signer) = (input.api_client.clone(), input.signer.clone()); - async move { - let body = message_data_to_body(&unreliable_data); - match Observation::try_new(body.clone(), signer.clone()) { - Ok(observation) => { - if let Err(e) = api_client.post_observation(observation).await { - tracing::error!(error = ?e, "Failed to post observation"); - } else { - tracing::info!("Observation posted successfully"); - }; + input.api_clients.iter().for_each(|api_client| { + tokio::spawn({ + let (unreliable_data, api_client, signer) = ( + unreliable_data.clone(), + api_client.clone(), + input.signer.clone() + ); + async move { + let body = message_data_to_body(&unreliable_data); + match Observation::try_new(body.clone(), signer.clone()) { + Ok(observation) => { + if let Err(e) = api_client.post_observation(observation).await { + tracing::error!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + } else { + tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + }; + } + Err(e) => tracing::error!(error = ?e, "Failed to create observation"), } - Err(e) => tracing::error!(error = ?e, "Failed to create observation"), } - } + }); }); } @@ -182,8 +188,13 @@ async fn run(run_options: config::RunOptions) { .expect("Invalid accumulator address"); let wormhole_pid = Pubkey::from_str(&run_options.wormhole_pid).expect("Invalid Wormhole program ID"); - let api_client = - ApiClient::try_new(run_options.server_url, None).expect("Failed to create API client"); + let api_clients: Vec = run_options + .server_urls + .into_iter() + .map(|server_url| { + ApiClient::try_new(server_url, None).expect("Failed to create API client") + }) + .collect(); let (pubkey, pubkey_evm) = signer.get_public_key().expect("Failed to get public key"); let evm_encded_public_key = format!("0x{}", hex::encode(pubkey_evm)); @@ -199,7 +210,7 @@ async fn run(run_options: config::RunOptions) { signer: signer.clone(), wormhole_pid, accumulator_address, - api_client: api_client.clone(), + api_clients: api_clients.clone(), }) .await { From babda3ee81b3e46ad06b771845f446e7b4e34e6e Mon Sep 17 00:00:00 2001 From: Danial Mehrjerdi Date: Mon, 30 Jun 2025 10:39:11 +0200 Subject: [PATCH 2/4] Refactor --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 39 ++++++++++++++++++++------------------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed26520..249e3c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3266,6 +3266,7 @@ dependencies = [ "base64 0.22.1", "borsh 0.9.3", "clap", + "futures", "hex", "prost", "reqwest 0.12.19", diff --git a/Cargo.toml b/Cargo.toml index c484f18..a7bc275 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" anyhow = "1.0.98" borsh = "0.9.3" clap = { version = "4.5.39", features = ["derive", "env"] } +futures = "0.3.31" hex = { version = "0.4.3", features = ["serde"] } prost = "0.14.1" reqwest = { version = "0.12.19", features = ["json"] } diff --git a/src/main.rs b/src/main.rs index e503dbd..56c828f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use { api_client::{ApiClient, Observation}, borsh::BorshDeserialize, clap::Parser, + futures::future::join_all, posted_message::PostedMessageUnreliableData, prost::Message, secp256k1::{rand::rngs::OsRng, Secp256k1}, @@ -147,27 +148,27 @@ async fn run_listener( Err(_) => continue, }; - input.api_clients.iter().for_each(|api_client| { - tokio::spawn({ - let (unreliable_data, api_client, signer) = ( - unreliable_data.clone(), - api_client.clone(), - input.signer.clone() - ); - async move { - let body = message_data_to_body(&unreliable_data); - match Observation::try_new(body.clone(), signer.clone()) { - Ok(observation) => { - if let Err(e) = api_client.post_observation(observation).await { - tracing::error!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); - } else { - tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); - }; - } - Err(e) => tracing::error!(error = ?e, "Failed to create observation"), + tokio::spawn({ + let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); + async move { + let body = message_data_to_body(&unreliable_data); + match Observation::try_new(body.clone(), signer.clone()) { + Ok(observation) => { + join_all(api_clients.iter().map(|api_client| { + let observation = observation.clone(); + let api_client = api_client.clone(); + async move { + if let Err(e) = api_client.post_observation(observation).await { + tracing::error!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + } else { + tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + } + } + })).await; } + Err(e) => tracing::error!(error = ?e, "Failed to create observation"), } - }); + } }); } From 6c6f443bacb0fad5ff009607fbd5d2d790237f4b Mon Sep 17 00:00:00 2001 From: Danial Mehrjerdi Date: Mon, 30 Jun 2025 14:21:24 +0200 Subject: [PATCH 3/4] Update readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e77d7d6..bb05df7 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,9 @@ Make sure to set `RUST_LOG=INFO` to enable logs from tracing: ```bash RUST_LOG=INFO cargo run -- run \ --pythnet-url wss://api2.pythnet.pyth.network \ - --server-url https://quorum-green.pyth.network \ - --server-url https://quorum-yellow.pyth.network \ - --server-url https://quorum-cyan.pyth.network \ + --server-url https://quorum-1.pyth.network \ + --server-url https://quorum-2.pyth.network \ + --server-url https://quorum-3.pyth.network \ --secret-key /path/to/secret.key \ --wormhole-pid H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU ``` @@ -46,7 +46,7 @@ Instead of CLI flags, you can also set environment variables: ```bash export PYTHNET_URL=wss://api2.pythnet.pyth.network -export SERVER_URL=https://quorum-green.pyth.network,https://quorum-yellow.pyth.network,https://quorum-cyan.pyth.network +export SERVER_URL=https://quorum-1.pyth.network,https://quorum-2.pyth.network,https://quorum-3.pyth.network export SECRET_KEY=/path/to/secret.key export WORMHOLE_PID=H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU export RUST_LOG=INFO From 2d048b9a7e99702e0699dce37d318f6c03f64b10 Mon Sep 17 00:00:00 2001 From: Danial Mehrjerdi Date: Mon, 30 Jun 2025 14:39:27 +0200 Subject: [PATCH 4/4] Address comments --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 56c828f..f72b29f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -159,7 +159,7 @@ async fn run_listener( let api_client = api_client.clone(); async move { if let Err(e) = api_client.post_observation(observation).await { - tracing::error!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); } else { tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); }