From 39850f32cb77dc521c87a9d06509ff46bf45f893 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Fri, 18 Apr 2025 13:07:23 -0500 Subject: [PATCH 1/6] feat: RPC redundancy --- Cargo.lock | 12 ++--- Cargo.toml | 1 + config/config.sample.pythnet.toml | 5 +- config/config.sample.pythtest.toml | 5 +- config/config.toml | 5 +- src/agent.rs | 4 +- src/agent/services/exporter.rs | 55 ++++++++++++++------ src/agent/services/keypairs.rs | 45 +++++++++------- src/agent/services/oracle.rs | 24 ++++++--- src/agent/solana.rs | 8 +-- src/agent/state/exporter.rs | 83 ++++++++++++++++++++++-------- src/agent/state/oracle.rs | 64 ++++++++++++++++++----- src/agent/state/transactions.rs | 37 ++++++++++--- 13 files changed, 245 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6dedc9..7f1dbcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,9 +808,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "4.0.2" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74fa05ad7d803d413eb8380983b092cbbaf9a85f151b871360e7b00cd7060b37" +checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -3662,6 +3662,7 @@ dependencies = [ "solana-client", "solana-pubkey", "solana-sdk", + "solana-transaction-status", "thiserror 1.0.69", "tokio", "tokio-retry", @@ -3855,13 +3856,12 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", - "zerocopy 0.8.24", ] [[package]] @@ -7197,7 +7197,7 @@ dependencies = [ "httparse", "log", "native-tls", - "rand 0.9.0", + "rand 0.9.1", "sha1", "thiserror 2.0.12", "url", diff --git a/Cargo.toml b/Cargo.toml index 9fb38d8..a1cd85e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ solana-account-decoder = "1.18.8" solana-client = "1.18.8" solana-pubkey = "2.3.0" solana-sdk = "1.18.8" +solana-transaction-status = "1.18.26" bincode = { version = "2.0.1", features = ["serde"] } rand = "0.8.5" config = "0.14.0" diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 15dff3c..70e9d2e 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually +# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually # rate-limited, so a private endpoint should be used in most cases. -rpc_url = "https://api2.pythnet.pyth.network" +# API calls will cycle through each on failure. +rpc_urls = ["https://api2.pythnet.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index b832573..3894d8d 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. -rpc_url = "https://api.pythtest.pyth.network" +# HTTP(S) endpoints of the RPC node. +# API calls will cycle through each on failure. +rpc_urls = ["https://api.pythtest.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes # on the network. This can be omitted when oracle.subscriber_enabled is set to diff --git a/config/config.toml b/config/config.toml index d38b5bb..a43e026 100644 --- a/config/config.toml +++ b/config/config.toml @@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910" [primary_network] ### Required fields ### -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually +# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually # rate-limited for Pythnet, and so a private endpoint should be used in most # cases. For Pythtest, the public endpoint can be used. -rpc_url = "https://api.pythtest.pyth.network" +# API calls will cycle through each on failure. +rpc_urls = ["https://api.pythtest.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. diff --git a/src/agent.rs b/src/agent.rs index 659458d..302842b 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -183,11 +183,11 @@ impl Agent { // Spawn the remote keypair loader endpoint for both networks handles.extend( services::keypairs( - self.config.primary_network.rpc_url.clone(), + self.config.primary_network.rpc_urls.clone(), self.config .secondary_network .as_ref() - .map(|c| c.rpc_url.clone()), + .map(|c| c.rpc_urls.clone()), self.config.remote_keypair_loader.clone(), state, ) diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 2500b9d..0e20810 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -111,7 +111,7 @@ pub struct NetworkState { /// fetching the blockhash and slot number. struct NetworkStateQuerier { /// The RPC client - rpc_client: RpcClient, + rpc_clients: Vec, /// The interval with which to query the network state query_interval: Interval, @@ -122,20 +122,24 @@ struct NetworkStateQuerier { impl NetworkStateQuerier { #[instrument( - skip(rpc_endpoint, rpc_timeout, query_interval), + skip(rpc_urls, rpc_timeout, query_interval), fields( rpc_timeout = rpc_timeout.as_millis(), query_interval = query_interval.period().as_millis(), ) )] pub fn new( - rpc_endpoint: &str, + rpc_urls: &Vec, rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, ) -> Self { + let rpc_clients = rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), rpc_timeout)) + .collect(); NetworkStateQuerier { - rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), + rpc_clients, query_interval, network_state_tx, } @@ -152,11 +156,12 @@ impl NetworkStateQuerier { #[instrument(skip(self))] async fn query_network_state(&mut self) -> Result<()> { + // TODO: These are polled every 200ms and errors are simply logged. + // TODO: Should we retry/fallback on failure? // Fetch the blockhash and current slot in parallel - let current_slot_future = self - .rpc_client - .get_slot_with_commitment(CommitmentConfig::confirmed()); - let latest_blockhash_future = self.rpc_client.get_latest_blockhash(); + let current_slot_future = + self.rpc_clients[0].get_slot_with_commitment(CommitmentConfig::confirmed()); + let latest_blockhash_future = self.rpc_clients[0].get_latest_blockhash(); let (current_slot_result, latest_blockhash_result) = future::join(current_slot_future, latest_blockhash_future).await; @@ -183,7 +188,7 @@ where // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( - &config.rpc_url, + &config.rpc_urls, config.rpc_timeout, tokio::time::interval(config.exporter.refresh_network_state_interval_duration), network_state_tx, @@ -226,6 +231,7 @@ mod exporter { }, }, solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::commitment_config::CommitmentConfig, std::sync::Arc, tokio::sync::watch, }; @@ -243,10 +249,21 @@ mod exporter { let mut dynamic_compute_unit_price_update_interval = tokio::time::interval(config.exporter.publish_interval_duration); - let client = Arc::new(RpcClient::new_with_timeout( - config.rpc_url.to_string(), - config.rpc_timeout, - )); + let clients: Arc> = Arc::new( + config + .rpc_urls + .iter() + .map(|rpc_url| { + RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + ) + }) + .collect(), + ); let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Exporter won't start."); return; @@ -265,7 +282,7 @@ mod exporter { let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await; if let Err(err) = publish_batches( state.clone(), - client.clone(), + clients.clone(), network, &network_state_rx, key_store.accumulator_key, @@ -293,7 +310,7 @@ mod exporter { if let Err(err) = Exporter::update_recent_compute_unit_price( &*state, &publish_keypair, - &client, + &clients, config.exporter.staleness_threshold, config.exporter.unchanged_publish_threshold, ).await { @@ -352,13 +369,17 @@ mod transaction_monitor { where S: Transactions, { - let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout); + let rpc_clients = config + .rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), config.rpc_timeout)) + .collect(); let mut poll_interval = tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration); loop { poll_interval.tick().await; - if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await { + if let Err(err) = Transactions::poll_transactions_status(&*state, &rpc_clients).await { tracing::error!(err = ?err, "Transaction monitor failed."); } } diff --git a/src/agent/services/keypairs.rs b/src/agent/services/keypairs.rs index 0d2863d..1911b27 100644 --- a/src/agent/services/keypairs.rs +++ b/src/agent/services/keypairs.rs @@ -8,7 +8,7 @@ use { state::keypairs::Keypairs, }, anyhow::{ - Context, + bail, Result, }, serde::Deserialize, @@ -61,8 +61,8 @@ impl Default for Config { } pub async fn keypairs( - primary_rpc_url: String, - secondary_rpc_url: Option, + primary_rpc_urls: Vec, + secondary_rpc_urls: Option>, config: Config, state: Arc, ) -> Vec> @@ -81,7 +81,7 @@ where let primary_upload_route = { let state = state.clone(); - let rpc_url = primary_rpc_url.clone(); + let rpc_urls = primary_rpc_urls.clone(); let min_balance = config.primary_min_keypair_balance_sol; warp::path!("primary" / "load_keypair") .and(warp::post()) @@ -90,14 +90,14 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let rpc_url = rpc_url.clone(); + let rpc_urls = rpc_urls.clone(); async move { let response = handle_new_keypair( state, Network::Primary, kp, min_balance, - rpc_url, + rpc_urls, "primary", ) .await; @@ -113,16 +113,16 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let rpc_url = secondary_rpc_url.clone(); + let rpc_urls = secondary_rpc_urls.clone(); async move { - if let Some(rpc_url) = rpc_url { + if let Some(rpc_urls) = rpc_urls { let min_balance = config.secondary_min_keypair_balance_sol; let response = handle_new_keypair( state, Network::Secondary, kp, min_balance, - rpc_url, + rpc_urls, "secondary", ) .await; @@ -160,7 +160,7 @@ async fn handle_new_keypair<'a, 'b: 'a, S>( network: Network, new_keypair_bytes: Vec, min_keypair_balance_sol: u64, - rpc_url: String, + rpc_urls: Vec, network_name: &'b str, ) -> WithStatus<&'static str> where @@ -168,7 +168,7 @@ where { let mut upload_ok = true; match Keypair::from_bytes(&new_keypair_bytes) { - Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await { + Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await { Ok(()) => { Keypairs::update_keypair(&*state, network, kp).await; } @@ -205,14 +205,12 @@ where pub async fn validate_keypair( kp: &Keypair, min_keypair_balance_sol: u64, - rpc_url: String, + rpc_urls: Vec, ) -> Result<()> { - let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - let balance_lamports = c - .get_balance(&kp.pubkey()) - .await - .context("Could not check keypair's balance")?; + let balance_lamports = match get_balance(kp, rpc_urls).await { + Ok(balance_lamports) => balance_lamports, + Err(_) => bail!("Could not check keypair's balance"), + }; let lamports_in_sol = 1_000_000_000; @@ -227,3 +225,14 @@ pub async fn validate_keypair( ))) } } + +async fn get_balance(kp: &Keypair, rpc_urls: Vec) -> Result { + for rpc_url in rpc_urls { + let c = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + match c.get_balance(&kp.pubkey()).await { + Ok(balance) => return Ok(balance), + Err(e) => tracing::warn!("getBalance error for rpc endpoint {}: {}", rpc_url, e), + } + } + bail!("getBalance failed for all RPC endpoints") +} diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index e89ccbe..eaebb08 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -168,13 +168,21 @@ async fn poller( { // Setup an RpcClient for manual polling. let mut tick = tokio::time::interval(config.oracle.poll_interval_duration); - let client = Arc::new(RpcClient::new_with_timeout_and_commitment( - config.rpc_url, - config.rpc_timeout, - CommitmentConfig { - commitment: config.oracle.commitment, - }, - )); + let clients: Arc> = Arc::new( + config + .rpc_urls + .iter() + .map(|rpc_url| { + RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + ) + }) + .collect(), + ); loop { if let Err(err) = async { @@ -186,7 +194,7 @@ async fn poller( oracle_program_key, publish_keypair.as_ref(), pyth_price_store_program_key, - &client, + &clients, max_lookup_batch_size, ) .await?; diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 8336d8b..0cb744d 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -25,8 +25,8 @@ pub mod network { Secondary, } - pub fn default_rpc_url() -> String { - "http://localhost:8899".to_string() + pub fn default_rpc_url() -> Vec { + vec!["http://localhost:8899".to_string()] } pub fn default_wss_url() -> String { @@ -40,9 +40,9 @@ pub mod network { /// Configuration for a network #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Config { - /// HTTP RPC endpoint + /// HTTP RPC endpoint list #[serde(default = "default_rpc_url")] - pub rpc_url: String, + pub rpc_urls: Vec, /// WSS RPC endpoint #[serde(default = "default_wss_url")] pub wss_url: String, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index ac9f0b8..aaa1ec5 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -33,6 +33,7 @@ use { solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig, + rpc_response::RpcPrioritizationFee, }, solana_pubkey, solana_sdk::{ @@ -42,7 +43,10 @@ use { Instruction, }, pubkey::Pubkey, - signature::Keypair, + signature::{ + Keypair, + Signature, + }, signer::Signer, sysvar::clock, transaction::Transaction, @@ -119,7 +123,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_client: &RpcClient, + rpc_clients: &Vec, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()>; @@ -300,7 +304,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_client: &RpcClient, + rpc_clients: &Vec, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()> { @@ -321,7 +325,7 @@ where .recent_compute_unit_price_micro_lamports .write() .await = - estimate_compute_unit_price_micro_lamports(rpc_client, &price_accounts).await?; + estimate_compute_unit_price_micro_lamports(rpc_clients, &price_accounts).await?; Ok(()) } @@ -388,7 +392,7 @@ where } async fn estimate_compute_unit_price_micro_lamports( - rpc_client: &RpcClient, + rpc_clients: &Vec, price_accounts: &[Pubkey], ) -> Result> { let mut slot_compute_fee: BTreeMap = BTreeMap::new(); @@ -397,7 +401,7 @@ async fn estimate_compute_unit_price_micro_lamports( let prioritization_fees_batches = futures_util::future::join_all( price_accounts .chunks(128) - .map(|price_accounts| rpc_client.get_recent_prioritization_fees(price_accounts)), + .map(|price_accounts| get_recent_prioritization_fees(rpc_clients, price_accounts)), ) .await .into_iter() @@ -434,6 +438,26 @@ async fn estimate_compute_unit_price_micro_lamports( Ok(median_priority_fee) } +async fn get_recent_prioritization_fees( + rpc_clients: &Vec, + price_accounts: &[Pubkey], +) -> Result> { + for rpc_client in rpc_clients { + match rpc_client + .get_recent_prioritization_fees(price_accounts) + .await + { + Ok(fees) => return Ok(fees), + Err(e) => tracing::warn!( + "getRecentPrioritizationFee failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getRecentPrioritizationFees failed for every rpc endpoint") +} + /// Publishes any price updates in the local store that we haven't sent to this network. /// /// The strategy used to do this is as follows: @@ -450,7 +474,7 @@ async fn estimate_compute_unit_price_micro_lamports( /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. #[instrument( - skip(state, client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + skip(state, clients, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), fields( publish_keypair = publish_keypair.pubkey().to_string(), staleness_threshold = staleness_threshold.as_millis(), @@ -458,7 +482,7 @@ async fn estimate_compute_unit_price_micro_lamports( )] pub async fn publish_batches( state: Arc, - client: Arc, + clients: Arc>, network: Network, network_state_rx: &watch::Receiver, accumulator_key: Option, @@ -497,7 +521,7 @@ where for batch in batches { batch_futures.push(publish_batch( state.clone(), - client.clone(), + clients.clone(), network, network_state, accumulator_key, @@ -531,7 +555,7 @@ where } #[instrument( - skip(state, client, network_state, publish_keypair, batch, staleness_threshold), + skip(state, rpc_clients, network_state, publish_keypair, batch, staleness_threshold), fields( publish_keypair = publish_keypair.pubkey().to_string(), blockhash = network_state.blockhash.to_string(), @@ -542,7 +566,7 @@ where )] async fn publish_batch( state: Arc, - client: Arc, + rpc_clients: Arc>, network: Network, network_state: NetworkState, accumulator_key: Option, @@ -744,16 +768,7 @@ where ); tokio::spawn(async move { - let signature = match client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await - { + let signature = match send_transaction_with_config(rpc_clients, &transaction).await { Ok(signature) => signature, Err(err) => { tracing::error!(err = ?err, "Exporter: failed to send transaction."); @@ -774,6 +789,32 @@ where Ok(()) } +async fn send_transaction_with_config( + rpc_clients: Arc>, + transaction: &Transaction, +) -> Result { + for rpc_client in rpc_clients.iter() { + match rpc_client + .send_transaction_with_config( + transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => return Ok(signature), + Err(e) => tracing::warn!( + "sendTransactionWithConfig failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("sendTransactionWithConfig failed for all rpc endpoints") +} + fn create_instruction_without_accumulator( publish_pubkey: Pubkey, pyth_oracle_program_key: Pubkey, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index ae97fb8..16f050e 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -15,6 +15,7 @@ use { }, anyhow::{ anyhow, + bail, Context, Result, }, @@ -205,7 +206,7 @@ pub trait Oracle { oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_client: &RpcClient, + rpc_clients: &Vec, max_lookup_batch_size: usize, ) -> Result<()>; async fn handle_price_account_update( @@ -274,20 +275,23 @@ where } /// Poll target Solana based chain for Pyth related accounts. - #[instrument(skip(self, publish_keypair, rpc_client))] + #[instrument(skip(self, publish_keypair, rpc_clients))] async fn poll_updates( &self, network: Network, oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_client: &RpcClient, + rpc_clients: &Vec, max_lookup_batch_size: usize, ) -> Result<()> { let mut publisher_permissions = HashMap::new(); - let (product_accounts, price_accounts) = - fetch_product_and_price_accounts(rpc_client, oracle_program_key, max_lookup_batch_size) - .await?; + let (product_accounts, price_accounts) = fetch_product_and_price_accounts( + rpc_clients, + oracle_program_key, + max_lookup_batch_size, + ) + .await?; for (price_key, price_entry) in price_accounts.iter() { for component in price_entry.comp { @@ -326,7 +330,7 @@ where (pyth_price_store_program_key, publish_keypair) { match fetch_publisher_buffer_key( - rpc_client, + rpc_clients, pyth_price_store_program_key, publish_keypair.pubkey(), ) @@ -402,7 +406,7 @@ where } async fn fetch_publisher_buffer_key( - rpc_client: &RpcClient, + rpc_clients: &Vec, pyth_price_store_program_key: Pubkey, publisher_pubkey: Pubkey, ) -> Result { @@ -413,26 +417,43 @@ async fn fetch_publisher_buffer_key( ], &pyth_price_store_program_key, ); - let data = rpc_client.get_account_data(&publisher_config_key).await?; + let data = get_account_data(rpc_clients, &publisher_config_key).await?; let config = pyth_price_store::accounts::publisher_config::read(&data)?; Ok(config.buffer_account.into()) } +async fn get_account_data( + rpc_clients: &Vec, + publisher_config_key: &Pubkey, +) -> Result> { + for rpc_client in rpc_clients { + match rpc_client.get_account_data(publisher_config_key).await { + Ok(data) => return Ok(data), + Err(e) => tracing::warn!( + "getAccountData failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getAccountData failed for all rpc endpoints") +} + type ProductAndPriceAccounts = ( HashMap>, HashMap>, ); -#[instrument(skip(rpc_client))] +#[instrument(skip(rpc_clients))] async fn fetch_product_and_price_accounts( - rpc_client: &RpcClient, + rpc_clients: &Vec, oracle_program_key: Pubkey, - max_lookup_batch_size: usize, + _max_lookup_batch_size: usize, ) -> Result { let mut product_entries = HashMap::new(); let mut price_entries = HashMap::new(); - let oracle_accounts = rpc_client.get_program_accounts(&oracle_program_key).await?; + let oracle_accounts = get_program_accounts(rpc_clients, oracle_program_key).await?; // Go over all the product accounts and partially fill the product entires. The product // entires need to have prices inside them which gets filled by going over all the @@ -541,6 +562,23 @@ async fn fetch_product_and_price_accounts( )) } +async fn get_program_accounts( + rpc_clients: &Vec, + oracle_program_key: Pubkey, +) -> Result> { + for rpc_client in rpc_clients { + match rpc_client.get_program_accounts(&oracle_program_key).await { + Ok(accounts) => return Ok(accounts), + Err(e) => tracing::warn!( + "getProgramAccounts failed for rpc endpoint {}: {}", + rpc_client.url(), + e + ), + } + } + bail!("getProgramAccounts failed for all rpc endpoints") +} + #[instrument(skip(rpc_client, product_key_batch))] async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index c0e8921..5313b8c 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -1,11 +1,15 @@ use { super::State, - anyhow::Result, + anyhow::{ + bail, + Result, + }, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, signature::Signature, }, + solana_transaction_status::TransactionStatus, std::collections::VecDeque, tokio::sync::RwLock, tracing::instrument, @@ -29,7 +33,7 @@ impl TransactionsState { #[async_trait::async_trait] pub trait Transactions { async fn add_transaction(&self, signature: Signature); - async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()>; + async fn poll_transactions_status(&self, rpc_clients: &Vec) -> Result<()>; } /// Allow downcasting State into TransactionsState for functions that depend on the `Transactions` service. @@ -62,8 +66,8 @@ where } } - #[instrument(skip(self, rpc))] - async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { + #[instrument(skip(self, rpc_clients))] + async fn poll_transactions_status(&self, rpc_clients: &Vec) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() { return Ok(()); @@ -72,10 +76,7 @@ where let signatures_contiguous = txs.make_contiguous(); // Poll the status of each transaction, in a single RPC request - let statuses = rpc - .get_signature_statuses(signatures_contiguous) - .await? - .value; + let statuses = get_signature_statuses(rpc_clients, signatures_contiguous).await?; tracing::debug!( statuses = ?statuses, @@ -111,3 +112,23 @@ where Ok(()) } } + +async fn get_signature_statuses( + rpc_clients: &Vec, + signatures_contiguous: &mut [Signature], +) -> Result>> { + for rpc_client in rpc_clients { + match rpc_client + .get_signature_statuses(signatures_contiguous) + .await + { + Ok(statuses) => return Ok(statuses.value), + Err(e) => tracing::warn!( + "getSignatureStatus failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getSignatureStatuses failed for all rpc endpoints") +} From 7ed93c6e80a1e919dcee2853c7fbf6dea59a072e Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 24 Apr 2025 01:34:32 -0500 Subject: [PATCH 2/6] Refactor into rpc_multi_client module --- src/agent.rs | 1 + src/agent/services/exporter.rs | 65 ++++----- src/agent/services/keypairs.rs | 26 ++-- src/agent/services/oracle.rs | 30 ++-- src/agent/solana.rs | 9 +- src/agent/state/exporter.rs | 82 +++-------- src/agent/state/oracle.rs | 60 ++------ src/agent/state/transactions.rs | 38 ++--- src/agent/utils.rs | 1 + src/agent/utils/rpc_multi_client.rs | 208 ++++++++++++++++++++++++++++ 10 files changed, 300 insertions(+), 220 deletions(-) create mode 100644 src/agent/utils.rs create mode 100644 src/agent/utils/rpc_multi_client.rs diff --git a/src/agent.rs b/src/agent.rs index 302842b..2023db7 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -83,6 +83,7 @@ pub mod pyth; pub mod services; pub mod solana; pub mod state; +pub mod utils; lazy_static! { /// A static exit flag to indicate to running threads that we're shutting down. This is used to diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 0e20810..27635c8 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -8,6 +8,7 @@ use { exporter::Exporter, transactions::Transactions, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::Result, futures_util::future, @@ -15,7 +16,6 @@ use { Deserialize, Serialize, }, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::commitment_config::CommitmentConfig, std::{ sync::Arc, @@ -27,6 +27,7 @@ use { time::Interval, }, tracing::instrument, + url::Url, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -111,7 +112,7 @@ pub struct NetworkState { /// fetching the blockhash and slot number. struct NetworkStateQuerier { /// The RPC client - rpc_clients: Vec, + rpc_multi_client: RpcMultiClient, /// The interval with which to query the network state query_interval: Interval, @@ -129,17 +130,14 @@ impl NetworkStateQuerier { ) )] pub fn new( - rpc_urls: &Vec, + rpc_urls: &Vec, rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, ) -> Self { - let rpc_clients = rpc_urls - .iter() - .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), rpc_timeout)) - .collect(); + let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.clone(), rpc_timeout); NetworkStateQuerier { - rpc_clients, + rpc_multi_client, query_interval, network_state_tx, } @@ -156,12 +154,11 @@ impl NetworkStateQuerier { #[instrument(skip(self))] async fn query_network_state(&mut self) -> Result<()> { - // TODO: These are polled every 200ms and errors are simply logged. - // TODO: Should we retry/fallback on failure? // Fetch the blockhash and current slot in parallel - let current_slot_future = - self.rpc_clients[0].get_slot_with_commitment(CommitmentConfig::confirmed()); - let latest_blockhash_future = self.rpc_clients[0].get_latest_blockhash(); + let current_slot_future = self + .rpc_multi_client + .get_slot_with_commitment(CommitmentConfig::confirmed()); + let latest_blockhash_future = self.rpc_multi_client.get_latest_blockhash(); let (current_slot_result, latest_blockhash_result) = future::join(current_slot_future, latest_blockhash_future).await; @@ -229,8 +226,8 @@ mod exporter { publish_batches, Exporter, }, + utils::rpc_multi_client::RpcMultiClient, }, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::commitment_config::CommitmentConfig, std::sync::Arc, tokio::sync::watch, @@ -249,21 +246,14 @@ mod exporter { let mut dynamic_compute_unit_price_update_interval = tokio::time::interval(config.exporter.publish_interval_duration); - let clients: Arc> = Arc::new( - config - .rpc_urls - .iter() - .map(|rpc_url| { - RpcClient::new_with_timeout_and_commitment( - rpc_url.clone(), - config.rpc_timeout, - CommitmentConfig { - commitment: config.oracle.commitment, - }, - ) - }) - .collect(), - ); + let rpc_multi_client: Arc = + Arc::new(RpcMultiClient::new_with_timeout_and_commitment( + config.rpc_urls.clone(), + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + )); let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Exporter won't start."); return; @@ -282,7 +272,7 @@ mod exporter { let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await; if let Err(err) = publish_batches( state.clone(), - clients.clone(), + rpc_multi_client.clone(), network, &network_state_rx, key_store.accumulator_key, @@ -310,7 +300,7 @@ mod exporter { if let Err(err) = Exporter::update_recent_compute_unit_price( &*state, &publish_keypair, - &clients, + &rpc_multi_client, config.exporter.staleness_threshold, config.exporter.unchanged_publish_threshold, ).await { @@ -329,12 +319,12 @@ mod transaction_monitor { crate::agent::{ solana::network, state::transactions::Transactions, + utils::rpc_multi_client::RpcMultiClient, }, serde::{ Deserialize, Serialize, }, - solana_client::nonblocking::rpc_client::RpcClient, std::{ sync::Arc, time::Duration, @@ -369,17 +359,16 @@ mod transaction_monitor { where S: Transactions, { - let rpc_clients = config - .rpc_urls - .iter() - .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), config.rpc_timeout)) - .collect(); + let rpc_multi_client = + RpcMultiClient::new_with_timeout(config.rpc_urls.clone(), config.rpc_timeout); let mut poll_interval = tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration); loop { poll_interval.tick().await; - if let Err(err) = Transactions::poll_transactions_status(&*state, &rpc_clients).await { + if let Err(err) = + Transactions::poll_transactions_status(&*state, &rpc_multi_client).await + { tracing::error!(err = ?err, "Transaction monitor failed."); } } diff --git a/src/agent/services/keypairs.rs b/src/agent/services/keypairs.rs index 1911b27..ae0023f 100644 --- a/src/agent/services/keypairs.rs +++ b/src/agent/services/keypairs.rs @@ -6,13 +6,13 @@ use { crate::agent::{ solana::network::Network, state::keypairs::Keypairs, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ bail, Result, }, serde::Deserialize, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, signature::Keypair, @@ -23,6 +23,7 @@ use { sync::Arc, }, tokio::task::JoinHandle, + url::Url, warp::{ hyper::StatusCode, reject::Rejection, @@ -61,8 +62,8 @@ impl Default for Config { } pub async fn keypairs( - primary_rpc_urls: Vec, - secondary_rpc_urls: Option>, + primary_rpc_urls: Vec, + secondary_rpc_urls: Option>, config: Config, state: Arc, ) -> Vec> @@ -160,7 +161,7 @@ async fn handle_new_keypair<'a, 'b: 'a, S>( network: Network, new_keypair_bytes: Vec, min_keypair_balance_sol: u64, - rpc_urls: Vec, + rpc_urls: Vec, network_name: &'b str, ) -> WithStatus<&'static str> where @@ -205,9 +206,11 @@ where pub async fn validate_keypair( kp: &Keypair, min_keypair_balance_sol: u64, - rpc_urls: Vec, + rpc_urls: Vec, ) -> Result<()> { - let balance_lamports = match get_balance(kp, rpc_urls).await { + let rpc_multi_client = + RpcMultiClient::new_with_commitment(rpc_urls, CommitmentConfig::confirmed()); + let balance_lamports = match rpc_multi_client.get_balance(kp).await { Ok(balance_lamports) => balance_lamports, Err(_) => bail!("Could not check keypair's balance"), }; @@ -225,14 +228,3 @@ pub async fn validate_keypair( ))) } } - -async fn get_balance(kp: &Keypair, rpc_urls: Vec) -> Result { - for rpc_url in rpc_urls { - let c = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - match c.get_balance(&kp.pubkey()).await { - Ok(balance) => return Ok(balance), - Err(e) => tracing::warn!("getBalance error for rpc endpoint {}: {}", rpc_url, e), - } - } - bail!("getBalance failed for all RPC endpoints") -} diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index eaebb08..614679b 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -12,14 +12,12 @@ use { }, }, state::oracle::Oracle, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::Result, solana_account_decoder::UiAccountEncoding, solana_client::{ - nonblocking::{ - pubsub_client::PubsubClient, - rpc_client::RpcClient, - }, + nonblocking::pubsub_client::PubsubClient, rpc_config::{ RpcAccountInfoConfig, RpcProgramAccountsConfig, @@ -168,21 +166,13 @@ async fn poller( { // Setup an RpcClient for manual polling. let mut tick = tokio::time::interval(config.oracle.poll_interval_duration); - let clients: Arc> = Arc::new( - config - .rpc_urls - .iter() - .map(|rpc_url| { - RpcClient::new_with_timeout_and_commitment( - rpc_url.clone(), - config.rpc_timeout, - CommitmentConfig { - commitment: config.oracle.commitment, - }, - ) - }) - .collect(), - ); + let rpc_multi_client = Arc::new(RpcMultiClient::new_with_timeout_and_commitment( + config.rpc_urls.clone(), + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + )); loop { if let Err(err) = async { @@ -194,7 +184,7 @@ async fn poller( oracle_program_key, publish_keypair.as_ref(), pyth_price_store_program_key, - &clients, + &rpc_multi_client, max_lookup_batch_size, ) .await?; diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 0cb744d..f5abda2 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -17,6 +17,7 @@ pub mod network { Serialize, }, std::time::Duration, + url::Url, }; #[derive(Clone, Copy, Serialize, Deserialize, Debug)] @@ -25,8 +26,8 @@ pub mod network { Secondary, } - pub fn default_rpc_url() -> Vec { - vec!["http://localhost:8899".to_string()] + pub fn default_rpc_urls() -> Vec { + vec![Url::parse("http://localhost:8899").unwrap()] } pub fn default_wss_url() -> String { @@ -41,8 +42,8 @@ pub mod network { #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Config { /// HTTP RPC endpoint list - #[serde(default = "default_rpc_url")] - pub rpc_urls: Vec, + #[serde(default = "default_rpc_urls")] + pub rpc_urls: Vec, /// WSS RPC endpoint #[serde(default = "default_wss_url")] pub wss_url: String, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index aaa1ec5..e188d64 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -13,6 +13,7 @@ use { keypairs::Keypairs, local::LocalStore, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ anyhow, @@ -30,11 +31,6 @@ use { pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, - solana_client::{ - nonblocking::rpc_client::RpcClient, - rpc_config::RpcSendTransactionConfig, - rpc_response::RpcPrioritizationFee, - }, solana_pubkey, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -43,10 +39,7 @@ use { Instruction, }, pubkey::Pubkey, - signature::{ - Keypair, - Signature, - }, + signature::Keypair, signer::Signer, sysvar::clock, transaction::Transaction, @@ -123,7 +116,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_clients: &Vec, + rpc_clients: &RpcMultiClient, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()>; @@ -304,7 +297,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()> { @@ -325,7 +318,7 @@ where .recent_compute_unit_price_micro_lamports .write() .await = - estimate_compute_unit_price_micro_lamports(rpc_clients, &price_accounts).await?; + estimate_compute_unit_price_micro_lamports(rpc_multi_client, &price_accounts).await?; Ok(()) } @@ -392,7 +385,7 @@ where } async fn estimate_compute_unit_price_micro_lamports( - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, price_accounts: &[Pubkey], ) -> Result> { let mut slot_compute_fee: BTreeMap = BTreeMap::new(); @@ -401,7 +394,7 @@ async fn estimate_compute_unit_price_micro_lamports( let prioritization_fees_batches = futures_util::future::join_all( price_accounts .chunks(128) - .map(|price_accounts| get_recent_prioritization_fees(rpc_clients, price_accounts)), + .map(|price_accounts| rpc_multi_client.get_recent_prioritization_fees(price_accounts)), ) .await .into_iter() @@ -438,26 +431,6 @@ async fn estimate_compute_unit_price_micro_lamports( Ok(median_priority_fee) } -async fn get_recent_prioritization_fees( - rpc_clients: &Vec, - price_accounts: &[Pubkey], -) -> Result> { - for rpc_client in rpc_clients { - match rpc_client - .get_recent_prioritization_fees(price_accounts) - .await - { - Ok(fees) => return Ok(fees), - Err(e) => tracing::warn!( - "getRecentPrioritizationFee failed for rpc endpoint {}: {:?}", - rpc_client.url(), - e - ), - } - } - bail!("getRecentPrioritizationFees failed for every rpc endpoint") -} - /// Publishes any price updates in the local store that we haven't sent to this network. /// /// The strategy used to do this is as follows: @@ -474,7 +447,7 @@ async fn get_recent_prioritization_fees( /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. #[instrument( - skip(state, clients, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + skip(state, rpc_multi_client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), fields( publish_keypair = publish_keypair.pubkey().to_string(), staleness_threshold = staleness_threshold.as_millis(), @@ -482,7 +455,7 @@ async fn get_recent_prioritization_fees( )] pub async fn publish_batches( state: Arc, - clients: Arc>, + rpc_multi_client: Arc, network: Network, network_state_rx: &watch::Receiver, accumulator_key: Option, @@ -521,7 +494,7 @@ where for batch in batches { batch_futures.push(publish_batch( state.clone(), - clients.clone(), + rpc_multi_client.clone(), network, network_state, accumulator_key, @@ -555,7 +528,7 @@ where } #[instrument( - skip(state, rpc_clients, network_state, publish_keypair, batch, staleness_threshold), + skip(state, rpc_multi_client, network_state, publish_keypair, batch, staleness_threshold), fields( publish_keypair = publish_keypair.pubkey().to_string(), blockhash = network_state.blockhash.to_string(), @@ -566,7 +539,7 @@ where )] async fn publish_batch( state: Arc, - rpc_clients: Arc>, + rpc_multi_client: Arc, network: Network, network_state: NetworkState, accumulator_key: Option, @@ -768,7 +741,10 @@ where ); tokio::spawn(async move { - let signature = match send_transaction_with_config(rpc_clients, &transaction).await { + let signature = match rpc_multi_client + .send_transaction_with_config(&transaction) + .await + { Ok(signature) => signature, Err(err) => { tracing::error!(err = ?err, "Exporter: failed to send transaction."); @@ -789,32 +765,6 @@ where Ok(()) } -async fn send_transaction_with_config( - rpc_clients: Arc>, - transaction: &Transaction, -) -> Result { - for rpc_client in rpc_clients.iter() { - match rpc_client - .send_transaction_with_config( - transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await - { - Ok(signature) => return Ok(signature), - Err(e) => tracing::warn!( - "sendTransactionWithConfig failed for rpc endpoint {}: {:?}", - rpc_client.url(), - e - ), - } - } - bail!("sendTransactionWithConfig failed for all rpc endpoints") -} - fn create_instruction_without_accumulator( publish_pubkey: Pubkey, pyth_oracle_program_key: Pubkey, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 16f050e..7325d57 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -12,10 +12,10 @@ use { Prices, State, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ anyhow, - bail, Context, Result, }, @@ -206,7 +206,7 @@ pub trait Oracle { oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, max_lookup_batch_size: usize, ) -> Result<()>; async fn handle_price_account_update( @@ -275,19 +275,19 @@ where } /// Poll target Solana based chain for Pyth related accounts. - #[instrument(skip(self, publish_keypair, rpc_clients))] + #[instrument(skip(self, publish_keypair, rpc_multi_client))] async fn poll_updates( &self, network: Network, oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, max_lookup_batch_size: usize, ) -> Result<()> { let mut publisher_permissions = HashMap::new(); let (product_accounts, price_accounts) = fetch_product_and_price_accounts( - rpc_clients, + rpc_multi_client, oracle_program_key, max_lookup_batch_size, ) @@ -330,7 +330,7 @@ where (pyth_price_store_program_key, publish_keypair) { match fetch_publisher_buffer_key( - rpc_clients, + rpc_multi_client, pyth_price_store_program_key, publish_keypair.pubkey(), ) @@ -406,7 +406,7 @@ where } async fn fetch_publisher_buffer_key( - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, pyth_price_store_program_key: Pubkey, publisher_pubkey: Pubkey, ) -> Result { @@ -417,43 +417,30 @@ async fn fetch_publisher_buffer_key( ], &pyth_price_store_program_key, ); - let data = get_account_data(rpc_clients, &publisher_config_key).await?; + let data = rpc_multi_client + .get_account_data(&publisher_config_key) + .await?; let config = pyth_price_store::accounts::publisher_config::read(&data)?; Ok(config.buffer_account.into()) } -async fn get_account_data( - rpc_clients: &Vec, - publisher_config_key: &Pubkey, -) -> Result> { - for rpc_client in rpc_clients { - match rpc_client.get_account_data(publisher_config_key).await { - Ok(data) => return Ok(data), - Err(e) => tracing::warn!( - "getAccountData failed for rpc endpoint {}: {:?}", - rpc_client.url(), - e - ), - } - } - bail!("getAccountData failed for all rpc endpoints") -} - type ProductAndPriceAccounts = ( HashMap>, HashMap>, ); -#[instrument(skip(rpc_clients))] +#[instrument(skip(rpc_multi_client))] async fn fetch_product_and_price_accounts( - rpc_clients: &Vec, + rpc_multi_client: &RpcMultiClient, oracle_program_key: Pubkey, _max_lookup_batch_size: usize, ) -> Result { let mut product_entries = HashMap::new(); let mut price_entries = HashMap::new(); - let oracle_accounts = get_program_accounts(rpc_clients, oracle_program_key).await?; + let oracle_accounts = rpc_multi_client + .get_program_accounts(oracle_program_key) + .await?; // Go over all the product accounts and partially fill the product entires. The product // entires need to have prices inside them which gets filled by going over all the @@ -562,23 +549,6 @@ async fn fetch_product_and_price_accounts( )) } -async fn get_program_accounts( - rpc_clients: &Vec, - oracle_program_key: Pubkey, -) -> Result> { - for rpc_client in rpc_clients { - match rpc_client.get_program_accounts(&oracle_program_key).await { - Ok(accounts) => return Ok(accounts), - Err(e) => tracing::warn!( - "getProgramAccounts failed for rpc endpoint {}: {}", - rpc_client.url(), - e - ), - } - } - bail!("getProgramAccounts failed for all rpc endpoints") -} - #[instrument(skip(rpc_client, product_key_batch))] async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index 5313b8c..641fd02 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -1,15 +1,11 @@ use { super::State, - anyhow::{ - bail, - Result, - }, - solana_client::nonblocking::rpc_client::RpcClient, + crate::agent::utils::rpc_multi_client::RpcMultiClient, + anyhow::Result, solana_sdk::{ commitment_config::CommitmentConfig, signature::Signature, }, - solana_transaction_status::TransactionStatus, std::collections::VecDeque, tokio::sync::RwLock, tracing::instrument, @@ -33,7 +29,7 @@ impl TransactionsState { #[async_trait::async_trait] pub trait Transactions { async fn add_transaction(&self, signature: Signature); - async fn poll_transactions_status(&self, rpc_clients: &Vec) -> Result<()>; + async fn poll_transactions_status(&self, rpc_multi_client: &RpcMultiClient) -> Result<()>; } /// Allow downcasting State into TransactionsState for functions that depend on the `Transactions` service. @@ -66,8 +62,8 @@ where } } - #[instrument(skip(self, rpc_clients))] - async fn poll_transactions_status(&self, rpc_clients: &Vec) -> Result<()> { + #[instrument(skip(self, rpc_multi_client))] + async fn poll_transactions_status(&self, rpc_multi_client: &RpcMultiClient) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() { return Ok(()); @@ -76,7 +72,9 @@ where let signatures_contiguous = txs.make_contiguous(); // Poll the status of each transaction, in a single RPC request - let statuses = get_signature_statuses(rpc_clients, signatures_contiguous).await?; + let statuses = rpc_multi_client + .get_signature_statuses(signatures_contiguous) + .await?; tracing::debug!( statuses = ?statuses, @@ -112,23 +110,3 @@ where Ok(()) } } - -async fn get_signature_statuses( - rpc_clients: &Vec, - signatures_contiguous: &mut [Signature], -) -> Result>> { - for rpc_client in rpc_clients { - match rpc_client - .get_signature_statuses(signatures_contiguous) - .await - { - Ok(statuses) => return Ok(statuses.value), - Err(e) => tracing::warn!( - "getSignatureStatus failed for rpc endpoint {}: {:?}", - rpc_client.url(), - e - ), - } - } - bail!("getSignatureStatuses failed for all rpc endpoints") -} diff --git a/src/agent/utils.rs b/src/agent/utils.rs new file mode 100644 index 0000000..6fd8afe --- /dev/null +++ b/src/agent/utils.rs @@ -0,0 +1 @@ +pub mod rpc_multi_client; diff --git a/src/agent/utils/rpc_multi_client.rs b/src/agent/utils/rpc_multi_client.rs new file mode 100644 index 0000000..7cc5fbc --- /dev/null +++ b/src/agent/utils/rpc_multi_client.rs @@ -0,0 +1,208 @@ +use { + anyhow::bail, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, + rpc_response::RpcPrioritizationFee, + }, + solana_sdk::{ + account::Account, + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signature::{ + Keypair, + Signature, + Signer, + }, + transaction::Transaction, + }, + solana_transaction_status::TransactionStatus, + std::time::Duration, + url::Url, +}; + +pub struct RpcMultiClient { + rpc_clients: Vec, +} + +impl RpcMultiClient { + pub fn new_with_timeout(rpc_urls: Vec, timeout: Duration) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.to_string(), timeout)) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub fn new_with_commitment(rpc_urls: Vec, commitment_config: CommitmentConfig) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_commitment(rpc_url.to_string(), commitment_config)) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub fn new_with_timeout_and_commitment( + rpc_urls: Vec, + timeout: Duration, + commitment_config: CommitmentConfig, + ) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| { + RpcClient::new_with_timeout_and_commitment( + rpc_url.to_string(), + timeout, + commitment_config, + ) + }) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result { + for client in self.rpc_clients.iter() { + match client.get_balance(&kp.pubkey()).await { + Ok(balance) => return Ok(balance), + Err(e) => { + tracing::warn!("getBalance error for rpc endpoint {}: {}", client.url(), e) + } + } + } + bail!("getBalance failed for all RPC endpoints") + } + + pub async fn send_transaction_with_config( + &self, + transaction: &Transaction, + ) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .send_transaction_with_config( + transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => return Ok(signature), + Err(e) => tracing::warn!( + "sendTransactionWithConfig failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("sendTransactionWithConfig failed for all rpc endpoints") + } + + pub async fn get_signature_statuses( + &self, + signatures_contiguous: &mut [Signature], + ) -> anyhow::Result>> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .get_signature_statuses(signatures_contiguous) + .await + { + Ok(statuses) => return Ok(statuses.value), + Err(e) => tracing::warn!( + "getSignatureStatus failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getSignatureStatuses failed for all rpc endpoints") + } + + pub async fn get_recent_prioritization_fees( + &self, + price_accounts: &[Pubkey], + ) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .get_recent_prioritization_fees(price_accounts) + .await + { + Ok(fees) => return Ok(fees), + Err(e) => tracing::warn!( + "getRecentPrioritizationFee failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getRecentPrioritizationFees failed for every rpc endpoint") + } + + pub async fn get_program_accounts( + &self, + oracle_program_key: Pubkey, + ) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_program_accounts(&oracle_program_key).await { + Ok(accounts) => return Ok(accounts), + Err(e) => tracing::warn!( + "getProgramAccounts failed for rpc endpoint {}: {}", + rpc_client.url(), + e + ), + } + } + bail!("getProgramAccounts failed for all rpc endpoints") + } + + pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_account_data(publisher_config_key).await { + Ok(data) => return Ok(data), + Err(e) => tracing::warn!( + "getAccountData failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getAccountData failed for all rpc endpoints") + } + + pub async fn get_slot_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_slot_with_commitment(commitment_config).await { + Ok(slot) => return Ok(slot), + Err(e) => tracing::warn!( + "getSlotWithCommitment failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getSlotWithCommitment failed for all rpc endpoints") + } + + pub async fn get_latest_blockhash(&self) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_latest_blockhash().await { + Ok(hash) => return Ok(hash), + Err(e) => tracing::warn!( + "getLatestBlockhash failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getLatestBlockhash failed for all rpc endpoints") + } +} From f4e871e2218fde4e1125410515acca9ad2fe0a1d Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 24 Apr 2025 01:37:23 -0500 Subject: [PATCH 3/6] clippy --- src/agent/services/exporter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 27635c8..7cbed94 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -130,12 +130,12 @@ impl NetworkStateQuerier { ) )] pub fn new( - rpc_urls: &Vec, + rpc_urls: &[Url], rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, ) -> Self { - let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.clone(), rpc_timeout); + let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.to_vec(), rpc_timeout); NetworkStateQuerier { rpc_multi_client, query_interval, From 062bc3bfa8df92bafeaafca4ee479c89d8120029 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 24 Apr 2025 02:37:23 -0500 Subject: [PATCH 4/6] Round robin through wss_urls --- src/agent/services/oracle.rs | 16 +++++++++++++--- src/agent/solana.rs | 8 ++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 614679b..7f99cb3 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -36,6 +36,7 @@ use { tokio::task::JoinHandle, tokio_stream::StreamExt, tracing::instrument, + url::Url, }; #[instrument(skip(config, state))] @@ -64,12 +65,14 @@ where if config.oracle.subscriber_enabled { let min_elapsed_time = config.oracle.subscriber_finished_min_time; let sleep_time = config.oracle.subscriber_finished_sleep_time; + let mut wss_url_index: usize = 0; handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); if let Err(ref err) = subscriber( config.clone(), + &config.wss_urls[wss_url_index], network, state.clone(), key_store.pyth_oracle_program_key, @@ -81,6 +84,12 @@ where tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping"); tokio::time::sleep(sleep_time).await; } + + // Round robin to the next WSS provider + wss_url_index += 1; + if wss_url_index >= config.wss_urls.len() { + wss_url_index = 0; + } } } })); @@ -89,14 +98,15 @@ where handles } -/// When an account RPC Subscription update is receiveed. +/// When an account RPC Subscription update is received. /// /// We check if the account is one we're aware of and tracking, and if so, spawn /// a small background task that handles that update. We only do this for price /// accounts, all other accounts are handled below in the poller. -#[instrument(skip(config, state))] +#[instrument(skip(config, wss_url, state))] async fn subscriber( config: Config, + wss_url: &Url, network: Network, state: Arc, program_key: Pubkey, @@ -106,7 +116,7 @@ where S: Send + Sync + 'static, { // Setup PubsubClient to listen for account changes on the Oracle program. - let client = PubsubClient::new(config.wss_url.as_str()).await?; + let client = PubsubClient::new(wss_url.as_str()).await?; let (mut notifier, _unsub) = { let commitment = config.oracle.commitment; diff --git a/src/agent/solana.rs b/src/agent/solana.rs index f5abda2..cd31710 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -30,8 +30,8 @@ pub mod network { vec![Url::parse("http://localhost:8899").unwrap()] } - pub fn default_wss_url() -> String { - "http://localhost:8900".to_string() + pub fn default_wss_urls() -> Vec { + vec![Url::parse("http://localhost:8900").unwrap()] } pub fn default_rpc_timeout() -> Duration { @@ -45,8 +45,8 @@ pub mod network { #[serde(default = "default_rpc_urls")] pub rpc_urls: Vec, /// WSS RPC endpoint - #[serde(default = "default_wss_url")] - pub wss_url: String, + #[serde(default = "default_wss_urls")] + pub wss_urls: Vec, /// Timeout for the requests to the RPC #[serde(with = "humantime_serde", default = "default_rpc_timeout")] pub rpc_timeout: Duration, From 07b45697dc32defe32b263cfcaba2a2a84f5c789 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 24 Apr 2025 11:48:12 -0500 Subject: [PATCH 5/6] Log wss url on disconnect --- src/agent/services/oracle.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 7f99cb3..d131f30 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -79,7 +79,11 @@ where ) .await { - tracing::error!(?err, "Subscriber exited unexpectedly"); + tracing::error!( + ?err, + "Subscriber url: {} exited unexpectedly", + config.wss_urls[wss_url_index] + ); if current_time.elapsed() < min_elapsed_time { tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping"); tokio::time::sleep(sleep_time).await; From 6da01880c5ec3deb74e5a04f24a8d64e3caa281a Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 24 Apr 2025 11:53:46 -0500 Subject: [PATCH 6/6] cargo update --- Cargo.lock | 103 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f1dbcf..408e873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,7 +69,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "once_cell", "version_check", ] @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom 0.2.15", + "getrandom 0.2.16", "once_cell", "version_check", "zerocopy 0.7.35", @@ -396,9 +396,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64" +checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07" dependencies = [ "brotli", "flate2", @@ -645,9 +645,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3" +checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" dependencies = [ "arrayref", "arrayvec", @@ -797,9 +797,9 @@ dependencies = [ [[package]] name = "brotli" -version = "7.0.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" +checksum = "cf19e729cdbd51af9a397fb9ef8ac8378007b797f8273cfbfdf45dcaa316167b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -808,9 +808,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "4.0.3" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -1013,9 +1013,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.36" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2df961d8c8a0d08aa9945718ccf584145eee3f3aa06cddbeac12933781102e04" +checksum = "eccb054f56cbd38340b380d4a8e69ef1f02f1af43db2f0cc817a4774d80ae071" dependencies = [ "clap_builder", "clap_derive", @@ -1023,9 +1023,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.36" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "132dbda40fb6753878316a489d5a1242a8ef2f0d9e47ba01c951ea8aa7d013a5" +checksum = "efd9466fac8543255d3b1fcad4762c5e116ffe808c8a3043d4263cd4fd4862a2" dependencies = [ "anstream", "anstyle", @@ -1161,7 +1161,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "once_cell", "tiny-keccak", ] @@ -1857,9 +1857,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "js-sys", @@ -3600,6 +3600,26 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -3630,7 +3650,7 @@ dependencies = [ "bytemuck", "chrono", "chrono-tz", - "clap 4.5.36", + "clap 4.5.37", "config", "futures", "futures-util", @@ -3680,15 +3700,16 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584ecb22ba83ffa922cd635184515607df135dfe454cbf18ce2cba8c09ec476a" +checksum = "a9bdf4e2ba853a8b437309487542e742c7d094d8db189db194cb538f2be02ecd" dependencies = [ "anyhow", "base64 0.22.1", "byteorder", "derive_more", "itertools 0.13.0", + "protobuf", "rust_decimal", "serde", "serde_json", @@ -3712,7 +3733,7 @@ checksum = "1e7aeef4d5f0a9c98ff5af2ddd84a8b89919c512188305b497a9eb9afa97a949" dependencies = [ "borsh 0.10.4", "borsh-derive 0.10.4", - "getrandom 0.2.15", + "getrandom 0.2.16", "hex", "schemars", "serde", @@ -3909,7 +3930,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", ] [[package]] @@ -4150,7 +4171,7 @@ checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", - "getrandom 0.2.15", + "getrandom 0.2.16", "libc", "untrusted 0.9.0", "windows-sys 0.52.0", @@ -4199,23 +4220,23 @@ dependencies = [ [[package]] name = "rpassword" -version = "7.3.1" +version = "7.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80472be3c897911d0137b2d2b9055faf6eeac5b14e324073d83bc17b191d7e3f" +checksum = "66d4c8b64f049c6721ec8ccec37ddfc3d641c4a7fca57e8f2a89de509c73df39" dependencies = [ "libc", "rtoolbox", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] name = "rtoolbox" -version = "0.0.2" +version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c247d24e63230cdb56463ae328478bd5eac8b8faa8c69461a77e8e323afac90e" +checksum = "a7cc970b249fbe527d6e02e0a227762c9108b2f49d81094fe357ffc6d14d7f6f" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4663,9 +4684,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.2" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" dependencies = [ "libc", ] @@ -5144,7 +5165,7 @@ checksum = "9ce496a475e5062ba5de97215ab39d9c358f9c9df4bb7f3a45a1f1a8bd9065ed" dependencies = [ "bincode 1.3.3", "borsh 1.5.7", - "getrandom 0.2.15", + "getrandom 0.2.16", "js-sys", "num-traits", "serde", @@ -5403,7 +5424,7 @@ dependencies = [ "console_error_panic_hook", "console_log", "curve25519-dalek 3.2.1", - "getrandom 0.2.15", + "getrandom 0.2.16", "itertools 0.10.5", "js-sys", "lazy_static", @@ -5448,7 +5469,7 @@ dependencies = [ "bytemuck", "console_error_panic_hook", "console_log", - "getrandom 0.2.15", + "getrandom 0.2.16", "lazy_static", "log", "memoffset 0.9.1", @@ -5608,7 +5629,7 @@ dependencies = [ "bytemuck_derive", "curve25519-dalek 4.1.3", "five8_const", - "getrandom 0.2.15", + "getrandom 0.2.16", "js-sys", "num-traits", "serde", @@ -6918,16 +6939,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-io", "futures-sink", "futures-util", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "pin-project-lite", "slab", "tokio", @@ -6984,7 +7005,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.7.6", + "winnow 0.7.7", ] [[package]] @@ -7824,9 +7845,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" +checksum = "6cb8234a863ea0e8cd7284fcdd4f145233eb00fee02bbdd9861aec44e6477bc5" dependencies = [ "memchr", ]