diff --git a/Cargo.lock b/Cargo.lock index ed26520..249e3c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3266,6 +3266,7 @@ dependencies = [ "base64 0.22.1", "borsh 0.9.3", "clap", + "futures", "hex", "prost", "reqwest 0.12.19", diff --git a/Cargo.toml b/Cargo.toml index c484f18..a7bc275 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" anyhow = "1.0.98" borsh = "0.9.3" clap = { version = "4.5.39", features = ["derive", "env"] } +futures = "0.3.31" hex = { version = "0.4.3", features = ["serde"] } prost = "0.14.1" reqwest = { version = "0.12.19", features = ["json"] } diff --git a/README.md b/README.md index 3a349fe..bb05df7 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,16 @@ Make sure to set `RUST_LOG=INFO` to enable logs from tracing: ```bash RUST_LOG=INFO cargo run -- run \ --pythnet-url wss://api2.pythnet.pyth.network \ - --server-url https://quorum.pyth.network \ + --server-url https://quorum-1.pyth.network \ + --server-url https://quorum-2.pyth.network \ + --server-url https://quorum-3.pyth.network \ --secret-key /path/to/secret.key \ --wormhole-pid H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU ``` +You can specify multiple `--server-url` flags to broadcast observations to more than one server. + + --- ### 🌱 Environment Variables (Optional) @@ -41,7 +46,7 @@ Instead of CLI flags, you can also set environment variables: ```bash export PYTHNET_URL=wss://api2.pythnet.pyth.network -export SERVER_URL=https://quorum.pyth.network +export SERVER_URL=https://quorum-1.pyth.network,https://quorum-2.pyth.network,https://quorum-3.pyth.network export SECRET_KEY=/path/to/secret.key export WORMHOLE_PID=H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU export RUST_LOG=INFO @@ -49,6 +54,8 @@ export RUST_LOG=INFO cargo run ``` +You can provide multiple server URLs in the `SERVER_URL` environment variable by separating them with commas. + --- ### 🔑 Generate a Secret Key diff --git a/src/api_client.rs b/src/api_client.rs index c47b231..2ae5d25 100644 --- a/src/api_client.rs +++ b/src/api_client.rs @@ -55,6 +55,10 @@ impl Observation

{ } impl ApiClient { + pub fn get_base_url(&self) -> &Url { + &self.inner.base_url + } + pub fn try_new( base_url: String, config: Option, diff --git a/src/config.rs b/src/config.rs index de0785c..427a596 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,8 +21,8 @@ pub struct RunOptions { default_value = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU" )] pub wormhole_pid: String, - #[arg(long = "server-url", env = "SERVER_URL")] - pub server_url: String, + #[arg(long = "server-url", env = "SERVER_URL", value_delimiter = ',')] + pub server_urls: Vec, #[arg(long = "mode", env = "MODE", default_value = "production")] pub mode: Mode, } diff --git a/src/main.rs b/src/main.rs index 8aa231b..f72b29f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use { api_client::{ApiClient, Observation}, borsh::BorshDeserialize, clap::Parser, + futures::future::join_all, posted_message::PostedMessageUnreliableData, prost::Message, secp256k1::{rand::rngs::OsRng, Secp256k1}, @@ -41,7 +42,7 @@ struct RunListenerInput { signer: T, wormhole_pid: Pubkey, accumulator_address: Pubkey, - api_client: ApiClient, + api_clients: Vec, } fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey { @@ -148,16 +149,22 @@ async fn run_listener( }; tokio::spawn({ - let (api_client, signer) = (input.api_client.clone(), input.signer.clone()); + let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); async move { let body = message_data_to_body(&unreliable_data); match Observation::try_new(body.clone(), signer.clone()) { Ok(observation) => { - if let Err(e) = api_client.post_observation(observation).await { - tracing::error!(error = ?e, "Failed to post observation"); - } else { - tracing::info!("Observation posted successfully"); - }; + join_all(api_clients.iter().map(|api_client| { + let observation = observation.clone(); + let api_client = api_client.clone(); + async move { + if let Err(e) = api_client.post_observation(observation).await { + tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + } else { + tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + } + } + })).await; } Err(e) => tracing::error!(error = ?e, "Failed to create observation"), } @@ -182,8 +189,13 @@ async fn run(run_options: config::RunOptions) { .expect("Invalid accumulator address"); let wormhole_pid = Pubkey::from_str(&run_options.wormhole_pid).expect("Invalid Wormhole program ID"); - let api_client = - ApiClient::try_new(run_options.server_url, None).expect("Failed to create API client"); + let api_clients: Vec = run_options + .server_urls + .into_iter() + .map(|server_url| { + ApiClient::try_new(server_url, None).expect("Failed to create API client") + }) + .collect(); let (pubkey, pubkey_evm) = signer.get_public_key().expect("Failed to get public key"); let evm_encded_public_key = format!("0x{}", hex::encode(pubkey_evm)); @@ -199,7 +211,7 @@ async fn run(run_options: config::RunOptions) { signer: signer.clone(), wormhole_pid, accumulator_address, - api_client: api_client.clone(), + api_clients: api_clients.clone(), }) .await {