diff --git a/Cargo.lock b/Cargo.lock index 2c0c9da..81aaa85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3638,6 +3638,7 @@ dependencies = [ "solana-client", "solana-pubkey", "solana-sdk", + "solana-transaction-status", "thiserror 1.0.69", "tokio", "tokio-retry", @@ -6888,7 +6889,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.7.6", + "winnow 0.7.7", ] [[package]] @@ -7734,9 +7735,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" +checksum = "6cb8234a863ea0e8cd7284fcdd4f145233eb00fee02bbdd9861aec44e6477bc5" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 09a130d..7fa2647 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ solana-account-decoder = "1.18.8" solana-client = "1.18.8" solana-pubkey = "2.3.0" solana-sdk = "1.18.8" +solana-transaction-status = "1.18.26" bincode = { version = "2.0.1", features = ["serde"] } rand = "0.8.5" config = "0.14.0" diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 6a1ea73..d5ffbca 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually +# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually # rate-limited, so a private endpoint should be used in most cases. -rpc_url = "https://api2.pythnet.pyth.network" +# API calls will cycle through each on failure. +rpc_urls = ["https://api2.pythnet.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index fc01b80..6890660 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. -rpc_url = "https://api.pythtest.pyth.network" +# HTTP(S) endpoints of the RPC node. +# API calls will cycle through each on failure. +rpc_urls = ["https://api.pythtest.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes # on the network. This can be omitted when oracle.subscriber_enabled is set to diff --git a/config/config.toml b/config/config.toml index 39369b0..6cf270d 100644 --- a/config/config.toml +++ b/config/config.toml @@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910" [primary_network] ### Required fields ### -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually +# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually # rate-limited for Pythnet, and so a private endpoint should be used in most # cases. For Pythtest, the public endpoint can be used. -rpc_url = "https://api.pythtest.pyth.network" +# API calls will cycle through each on failure. +rpc_urls = ["https://api.pythtest.pyth.network"] # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. diff --git a/src/agent.rs b/src/agent.rs index 659458d..2023db7 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -83,6 +83,7 @@ pub mod pyth; pub mod services; pub mod solana; pub mod state; +pub mod utils; lazy_static! { /// A static exit flag to indicate to running threads that we're shutting down. This is used to @@ -183,11 +184,11 @@ impl Agent { // Spawn the remote keypair loader endpoint for both networks handles.extend( services::keypairs( - self.config.primary_network.rpc_url.clone(), + self.config.primary_network.rpc_urls.clone(), self.config .secondary_network .as_ref() - .map(|c| c.rpc_url.clone()), + .map(|c| c.rpc_urls.clone()), self.config.remote_keypair_loader.clone(), state, ) diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 2500b9d..7cbed94 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -8,6 +8,7 @@ use { exporter::Exporter, transactions::Transactions, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::Result, futures_util::future, @@ -15,7 +16,6 @@ use { Deserialize, Serialize, }, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::commitment_config::CommitmentConfig, std::{ sync::Arc, @@ -27,6 +27,7 @@ use { time::Interval, }, tracing::instrument, + url::Url, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -111,7 +112,7 @@ pub struct NetworkState { /// fetching the blockhash and slot number. struct NetworkStateQuerier { /// The RPC client - rpc_client: RpcClient, + rpc_multi_client: RpcMultiClient, /// The interval with which to query the network state query_interval: Interval, @@ -122,20 +123,21 @@ struct NetworkStateQuerier { impl NetworkStateQuerier { #[instrument( - skip(rpc_endpoint, rpc_timeout, query_interval), + skip(rpc_urls, rpc_timeout, query_interval), fields( rpc_timeout = rpc_timeout.as_millis(), query_interval = query_interval.period().as_millis(), ) )] pub fn new( - rpc_endpoint: &str, + rpc_urls: &[Url], rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, ) -> Self { + let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.to_vec(), rpc_timeout); NetworkStateQuerier { - rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), + rpc_multi_client, query_interval, network_state_tx, } @@ -154,9 +156,9 @@ impl NetworkStateQuerier { async fn query_network_state(&mut self) -> Result<()> { // Fetch the blockhash and current slot in parallel let current_slot_future = self - .rpc_client + .rpc_multi_client .get_slot_with_commitment(CommitmentConfig::confirmed()); - let latest_blockhash_future = self.rpc_client.get_latest_blockhash(); + let latest_blockhash_future = self.rpc_multi_client.get_latest_blockhash(); let (current_slot_result, latest_blockhash_result) = future::join(current_slot_future, latest_blockhash_future).await; @@ -183,7 +185,7 @@ where // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( - &config.rpc_url, + &config.rpc_urls, config.rpc_timeout, tokio::time::interval(config.exporter.refresh_network_state_interval_duration), network_state_tx, @@ -224,8 +226,9 @@ mod exporter { publish_batches, Exporter, }, + utils::rpc_multi_client::RpcMultiClient, }, - solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::commitment_config::CommitmentConfig, std::sync::Arc, tokio::sync::watch, }; @@ -243,10 +246,14 @@ mod exporter { let mut dynamic_compute_unit_price_update_interval = tokio::time::interval(config.exporter.publish_interval_duration); - let client = Arc::new(RpcClient::new_with_timeout( - config.rpc_url.to_string(), - config.rpc_timeout, - )); + let rpc_multi_client: Arc = + Arc::new(RpcMultiClient::new_with_timeout_and_commitment( + config.rpc_urls.clone(), + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + )); let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Exporter won't start."); return; @@ -265,7 +272,7 @@ mod exporter { let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await; if let Err(err) = publish_batches( state.clone(), - client.clone(), + rpc_multi_client.clone(), network, &network_state_rx, key_store.accumulator_key, @@ -293,7 +300,7 @@ mod exporter { if let Err(err) = Exporter::update_recent_compute_unit_price( &*state, &publish_keypair, - &client, + &rpc_multi_client, config.exporter.staleness_threshold, config.exporter.unchanged_publish_threshold, ).await { @@ -312,12 +319,12 @@ mod transaction_monitor { crate::agent::{ solana::network, state::transactions::Transactions, + utils::rpc_multi_client::RpcMultiClient, }, serde::{ Deserialize, Serialize, }, - solana_client::nonblocking::rpc_client::RpcClient, std::{ sync::Arc, time::Duration, @@ -352,13 +359,16 @@ mod transaction_monitor { where S: Transactions, { - let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout); + let rpc_multi_client = + RpcMultiClient::new_with_timeout(config.rpc_urls.clone(), config.rpc_timeout); let mut poll_interval = tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration); loop { poll_interval.tick().await; - if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await { + if let Err(err) = + Transactions::poll_transactions_status(&*state, &rpc_multi_client).await + { tracing::error!(err = ?err, "Transaction monitor failed."); } } diff --git a/src/agent/services/keypairs.rs b/src/agent/services/keypairs.rs index 0d2863d..ae0023f 100644 --- a/src/agent/services/keypairs.rs +++ b/src/agent/services/keypairs.rs @@ -6,13 +6,13 @@ use { crate::agent::{ solana::network::Network, state::keypairs::Keypairs, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ - Context, + bail, Result, }, serde::Deserialize, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, signature::Keypair, @@ -23,6 +23,7 @@ use { sync::Arc, }, tokio::task::JoinHandle, + url::Url, warp::{ hyper::StatusCode, reject::Rejection, @@ -61,8 +62,8 @@ impl Default for Config { } pub async fn keypairs( - primary_rpc_url: String, - secondary_rpc_url: Option, + primary_rpc_urls: Vec, + secondary_rpc_urls: Option>, config: Config, state: Arc, ) -> Vec> @@ -81,7 +82,7 @@ where let primary_upload_route = { let state = state.clone(); - let rpc_url = primary_rpc_url.clone(); + let rpc_urls = primary_rpc_urls.clone(); let min_balance = config.primary_min_keypair_balance_sol; warp::path!("primary" / "load_keypair") .and(warp::post()) @@ -90,14 +91,14 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let rpc_url = rpc_url.clone(); + let rpc_urls = rpc_urls.clone(); async move { let response = handle_new_keypair( state, Network::Primary, kp, min_balance, - rpc_url, + rpc_urls, "primary", ) .await; @@ -113,16 +114,16 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let rpc_url = secondary_rpc_url.clone(); + let rpc_urls = secondary_rpc_urls.clone(); async move { - if let Some(rpc_url) = rpc_url { + if let Some(rpc_urls) = rpc_urls { let min_balance = config.secondary_min_keypair_balance_sol; let response = handle_new_keypair( state, Network::Secondary, kp, min_balance, - rpc_url, + rpc_urls, "secondary", ) .await; @@ -160,7 +161,7 @@ async fn handle_new_keypair<'a, 'b: 'a, S>( network: Network, new_keypair_bytes: Vec, min_keypair_balance_sol: u64, - rpc_url: String, + rpc_urls: Vec, network_name: &'b str, ) -> WithStatus<&'static str> where @@ -168,7 +169,7 @@ where { let mut upload_ok = true; match Keypair::from_bytes(&new_keypair_bytes) { - Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await { + Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await { Ok(()) => { Keypairs::update_keypair(&*state, network, kp).await; } @@ -205,14 +206,14 @@ where pub async fn validate_keypair( kp: &Keypair, min_keypair_balance_sol: u64, - rpc_url: String, + rpc_urls: Vec, ) -> Result<()> { - let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - let balance_lamports = c - .get_balance(&kp.pubkey()) - .await - .context("Could not check keypair's balance")?; + let rpc_multi_client = + RpcMultiClient::new_with_commitment(rpc_urls, CommitmentConfig::confirmed()); + let balance_lamports = match rpc_multi_client.get_balance(kp).await { + Ok(balance_lamports) => balance_lamports, + Err(_) => bail!("Could not check keypair's balance"), + }; let lamports_in_sol = 1_000_000_000; diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index e89ccbe..d131f30 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -12,14 +12,12 @@ use { }, }, state::oracle::Oracle, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::Result, solana_account_decoder::UiAccountEncoding, solana_client::{ - nonblocking::{ - pubsub_client::PubsubClient, - rpc_client::RpcClient, - }, + nonblocking::pubsub_client::PubsubClient, rpc_config::{ RpcAccountInfoConfig, RpcProgramAccountsConfig, @@ -38,6 +36,7 @@ use { tokio::task::JoinHandle, tokio_stream::StreamExt, tracing::instrument, + url::Url, }; #[instrument(skip(config, state))] @@ -66,23 +65,35 @@ where if config.oracle.subscriber_enabled { let min_elapsed_time = config.oracle.subscriber_finished_min_time; let sleep_time = config.oracle.subscriber_finished_sleep_time; + let mut wss_url_index: usize = 0; handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); if let Err(ref err) = subscriber( config.clone(), + &config.wss_urls[wss_url_index], network, state.clone(), key_store.pyth_oracle_program_key, ) .await { - tracing::error!(?err, "Subscriber exited unexpectedly"); + tracing::error!( + ?err, + "Subscriber url: {} exited unexpectedly", + config.wss_urls[wss_url_index] + ); if current_time.elapsed() < min_elapsed_time { tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping"); tokio::time::sleep(sleep_time).await; } + + // Round robin to the next WSS provider + wss_url_index += 1; + if wss_url_index >= config.wss_urls.len() { + wss_url_index = 0; + } } } })); @@ -91,14 +102,15 @@ where handles } -/// When an account RPC Subscription update is receiveed. +/// When an account RPC Subscription update is received. /// /// We check if the account is one we're aware of and tracking, and if so, spawn /// a small background task that handles that update. We only do this for price /// accounts, all other accounts are handled below in the poller. -#[instrument(skip(config, state))] +#[instrument(skip(config, wss_url, state))] async fn subscriber( config: Config, + wss_url: &Url, network: Network, state: Arc, program_key: Pubkey, @@ -108,7 +120,7 @@ where S: Send + Sync + 'static, { // Setup PubsubClient to listen for account changes on the Oracle program. - let client = PubsubClient::new(config.wss_url.as_str()).await?; + let client = PubsubClient::new(wss_url.as_str()).await?; let (mut notifier, _unsub) = { let commitment = config.oracle.commitment; @@ -168,8 +180,8 @@ async fn poller( { // Setup an RpcClient for manual polling. let mut tick = tokio::time::interval(config.oracle.poll_interval_duration); - let client = Arc::new(RpcClient::new_with_timeout_and_commitment( - config.rpc_url, + let rpc_multi_client = Arc::new(RpcMultiClient::new_with_timeout_and_commitment( + config.rpc_urls.clone(), config.rpc_timeout, CommitmentConfig { commitment: config.oracle.commitment, @@ -186,7 +198,7 @@ async fn poller( oracle_program_key, publish_keypair.as_ref(), pyth_price_store_program_key, - &client, + &rpc_multi_client, max_lookup_batch_size, ) .await?; diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 8336d8b..cd31710 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -17,6 +17,7 @@ pub mod network { Serialize, }, std::time::Duration, + url::Url, }; #[derive(Clone, Copy, Serialize, Deserialize, Debug)] @@ -25,12 +26,12 @@ pub mod network { Secondary, } - pub fn default_rpc_url() -> String { - "http://localhost:8899".to_string() + pub fn default_rpc_urls() -> Vec { + vec![Url::parse("http://localhost:8899").unwrap()] } - pub fn default_wss_url() -> String { - "http://localhost:8900".to_string() + pub fn default_wss_urls() -> Vec { + vec![Url::parse("http://localhost:8900").unwrap()] } pub fn default_rpc_timeout() -> Duration { @@ -40,12 +41,12 @@ pub mod network { /// Configuration for a network #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Config { - /// HTTP RPC endpoint - #[serde(default = "default_rpc_url")] - pub rpc_url: String, + /// HTTP RPC endpoint list + #[serde(default = "default_rpc_urls")] + pub rpc_urls: Vec, /// WSS RPC endpoint - #[serde(default = "default_wss_url")] - pub wss_url: String, + #[serde(default = "default_wss_urls")] + pub wss_urls: Vec, /// Timeout for the requests to the RPC #[serde(with = "humantime_serde", default = "default_rpc_timeout")] pub rpc_timeout: Duration, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index ac9f0b8..e188d64 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -13,6 +13,7 @@ use { keypairs::Keypairs, local::LocalStore, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ anyhow, @@ -30,10 +31,6 @@ use { pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, - solana_client::{ - nonblocking::rpc_client::RpcClient, - rpc_config::RpcSendTransactionConfig, - }, solana_pubkey, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -119,7 +116,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_client: &RpcClient, + rpc_clients: &RpcMultiClient, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()>; @@ -300,7 +297,7 @@ where async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()> { @@ -321,7 +318,7 @@ where .recent_compute_unit_price_micro_lamports .write() .await = - estimate_compute_unit_price_micro_lamports(rpc_client, &price_accounts).await?; + estimate_compute_unit_price_micro_lamports(rpc_multi_client, &price_accounts).await?; Ok(()) } @@ -388,7 +385,7 @@ where } async fn estimate_compute_unit_price_micro_lamports( - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, price_accounts: &[Pubkey], ) -> Result> { let mut slot_compute_fee: BTreeMap = BTreeMap::new(); @@ -397,7 +394,7 @@ async fn estimate_compute_unit_price_micro_lamports( let prioritization_fees_batches = futures_util::future::join_all( price_accounts .chunks(128) - .map(|price_accounts| rpc_client.get_recent_prioritization_fees(price_accounts)), + .map(|price_accounts| rpc_multi_client.get_recent_prioritization_fees(price_accounts)), ) .await .into_iter() @@ -450,7 +447,7 @@ async fn estimate_compute_unit_price_micro_lamports( /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. #[instrument( - skip(state, client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + skip(state, rpc_multi_client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), fields( publish_keypair = publish_keypair.pubkey().to_string(), staleness_threshold = staleness_threshold.as_millis(), @@ -458,7 +455,7 @@ async fn estimate_compute_unit_price_micro_lamports( )] pub async fn publish_batches( state: Arc, - client: Arc, + rpc_multi_client: Arc, network: Network, network_state_rx: &watch::Receiver, accumulator_key: Option, @@ -497,7 +494,7 @@ where for batch in batches { batch_futures.push(publish_batch( state.clone(), - client.clone(), + rpc_multi_client.clone(), network, network_state, accumulator_key, @@ -531,7 +528,7 @@ where } #[instrument( - skip(state, client, network_state, publish_keypair, batch, staleness_threshold), + skip(state, rpc_multi_client, network_state, publish_keypair, batch, staleness_threshold), fields( publish_keypair = publish_keypair.pubkey().to_string(), blockhash = network_state.blockhash.to_string(), @@ -542,7 +539,7 @@ where )] async fn publish_batch( state: Arc, - client: Arc, + rpc_multi_client: Arc, network: Network, network_state: NetworkState, accumulator_key: Option, @@ -744,14 +741,8 @@ where ); tokio::spawn(async move { - let signature = match client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) + let signature = match rpc_multi_client + .send_transaction_with_config(&transaction) .await { Ok(signature) => signature, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index ae97fb8..7325d57 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -12,6 +12,7 @@ use { Prices, State, }, + utils::rpc_multi_client::RpcMultiClient, }, anyhow::{ anyhow, @@ -205,7 +206,7 @@ pub trait Oracle { oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, max_lookup_batch_size: usize, ) -> Result<()>; async fn handle_price_account_update( @@ -274,20 +275,23 @@ where } /// Poll target Solana based chain for Pyth related accounts. - #[instrument(skip(self, publish_keypair, rpc_client))] + #[instrument(skip(self, publish_keypair, rpc_multi_client))] async fn poll_updates( &self, network: Network, oracle_program_key: Pubkey, publish_keypair: Option<&Keypair>, pyth_price_store_program_key: Option, - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, max_lookup_batch_size: usize, ) -> Result<()> { let mut publisher_permissions = HashMap::new(); - let (product_accounts, price_accounts) = - fetch_product_and_price_accounts(rpc_client, oracle_program_key, max_lookup_batch_size) - .await?; + let (product_accounts, price_accounts) = fetch_product_and_price_accounts( + rpc_multi_client, + oracle_program_key, + max_lookup_batch_size, + ) + .await?; for (price_key, price_entry) in price_accounts.iter() { for component in price_entry.comp { @@ -326,7 +330,7 @@ where (pyth_price_store_program_key, publish_keypair) { match fetch_publisher_buffer_key( - rpc_client, + rpc_multi_client, pyth_price_store_program_key, publish_keypair.pubkey(), ) @@ -402,7 +406,7 @@ where } async fn fetch_publisher_buffer_key( - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, pyth_price_store_program_key: Pubkey, publisher_pubkey: Pubkey, ) -> Result { @@ -413,7 +417,9 @@ async fn fetch_publisher_buffer_key( ], &pyth_price_store_program_key, ); - let data = rpc_client.get_account_data(&publisher_config_key).await?; + let data = rpc_multi_client + .get_account_data(&publisher_config_key) + .await?; let config = pyth_price_store::accounts::publisher_config::read(&data)?; Ok(config.buffer_account.into()) } @@ -423,16 +429,18 @@ type ProductAndPriceAccounts = ( HashMap>, ); -#[instrument(skip(rpc_client))] +#[instrument(skip(rpc_multi_client))] async fn fetch_product_and_price_accounts( - rpc_client: &RpcClient, + rpc_multi_client: &RpcMultiClient, oracle_program_key: Pubkey, - max_lookup_batch_size: usize, + _max_lookup_batch_size: usize, ) -> Result { let mut product_entries = HashMap::new(); let mut price_entries = HashMap::new(); - let oracle_accounts = rpc_client.get_program_accounts(&oracle_program_key).await?; + let oracle_accounts = rpc_multi_client + .get_program_accounts(oracle_program_key) + .await?; // Go over all the product accounts and partially fill the product entires. The product // entires need to have prices inside them which gets filled by going over all the diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index c0e8921..641fd02 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -1,7 +1,7 @@ use { super::State, + crate::agent::utils::rpc_multi_client::RpcMultiClient, anyhow::Result, - solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, signature::Signature, @@ -29,7 +29,7 @@ impl TransactionsState { #[async_trait::async_trait] pub trait Transactions { async fn add_transaction(&self, signature: Signature); - async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()>; + async fn poll_transactions_status(&self, rpc_multi_client: &RpcMultiClient) -> Result<()>; } /// Allow downcasting State into TransactionsState for functions that depend on the `Transactions` service. @@ -62,8 +62,8 @@ where } } - #[instrument(skip(self, rpc))] - async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { + #[instrument(skip(self, rpc_multi_client))] + async fn poll_transactions_status(&self, rpc_multi_client: &RpcMultiClient) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() { return Ok(()); @@ -72,10 +72,9 @@ where let signatures_contiguous = txs.make_contiguous(); // Poll the status of each transaction, in a single RPC request - let statuses = rpc + let statuses = rpc_multi_client .get_signature_statuses(signatures_contiguous) - .await? - .value; + .await?; tracing::debug!( statuses = ?statuses, diff --git a/src/agent/utils.rs b/src/agent/utils.rs new file mode 100644 index 0000000..6fd8afe --- /dev/null +++ b/src/agent/utils.rs @@ -0,0 +1 @@ +pub mod rpc_multi_client; diff --git a/src/agent/utils/rpc_multi_client.rs b/src/agent/utils/rpc_multi_client.rs new file mode 100644 index 0000000..7cc5fbc --- /dev/null +++ b/src/agent/utils/rpc_multi_client.rs @@ -0,0 +1,208 @@ +use { + anyhow::bail, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, + rpc_response::RpcPrioritizationFee, + }, + solana_sdk::{ + account::Account, + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signature::{ + Keypair, + Signature, + Signer, + }, + transaction::Transaction, + }, + solana_transaction_status::TransactionStatus, + std::time::Duration, + url::Url, +}; + +pub struct RpcMultiClient { + rpc_clients: Vec, +} + +impl RpcMultiClient { + pub fn new_with_timeout(rpc_urls: Vec, timeout: Duration) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_timeout(rpc_url.to_string(), timeout)) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub fn new_with_commitment(rpc_urls: Vec, commitment_config: CommitmentConfig) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| RpcClient::new_with_commitment(rpc_url.to_string(), commitment_config)) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub fn new_with_timeout_and_commitment( + rpc_urls: Vec, + timeout: Duration, + commitment_config: CommitmentConfig, + ) -> Self { + let clients = rpc_urls + .iter() + .map(|rpc_url| { + RpcClient::new_with_timeout_and_commitment( + rpc_url.to_string(), + timeout, + commitment_config, + ) + }) + .collect(); + Self { + rpc_clients: clients, + } + } + + pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result { + for client in self.rpc_clients.iter() { + match client.get_balance(&kp.pubkey()).await { + Ok(balance) => return Ok(balance), + Err(e) => { + tracing::warn!("getBalance error for rpc endpoint {}: {}", client.url(), e) + } + } + } + bail!("getBalance failed for all RPC endpoints") + } + + pub async fn send_transaction_with_config( + &self, + transaction: &Transaction, + ) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .send_transaction_with_config( + transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => return Ok(signature), + Err(e) => tracing::warn!( + "sendTransactionWithConfig failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("sendTransactionWithConfig failed for all rpc endpoints") + } + + pub async fn get_signature_statuses( + &self, + signatures_contiguous: &mut [Signature], + ) -> anyhow::Result>> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .get_signature_statuses(signatures_contiguous) + .await + { + Ok(statuses) => return Ok(statuses.value), + Err(e) => tracing::warn!( + "getSignatureStatus failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getSignatureStatuses failed for all rpc endpoints") + } + + pub async fn get_recent_prioritization_fees( + &self, + price_accounts: &[Pubkey], + ) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client + .get_recent_prioritization_fees(price_accounts) + .await + { + Ok(fees) => return Ok(fees), + Err(e) => tracing::warn!( + "getRecentPrioritizationFee failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getRecentPrioritizationFees failed for every rpc endpoint") + } + + pub async fn get_program_accounts( + &self, + oracle_program_key: Pubkey, + ) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_program_accounts(&oracle_program_key).await { + Ok(accounts) => return Ok(accounts), + Err(e) => tracing::warn!( + "getProgramAccounts failed for rpc endpoint {}: {}", + rpc_client.url(), + e + ), + } + } + bail!("getProgramAccounts failed for all rpc endpoints") + } + + pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result> { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_account_data(publisher_config_key).await { + Ok(data) => return Ok(data), + Err(e) => tracing::warn!( + "getAccountData failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getAccountData failed for all rpc endpoints") + } + + pub async fn get_slot_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_slot_with_commitment(commitment_config).await { + Ok(slot) => return Ok(slot), + Err(e) => tracing::warn!( + "getSlotWithCommitment failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getSlotWithCommitment failed for all rpc endpoints") + } + + pub async fn get_latest_blockhash(&self) -> anyhow::Result { + for rpc_client in self.rpc_clients.iter() { + match rpc_client.get_latest_blockhash().await { + Ok(hash) => return Ok(hash), + Err(e) => tracing::warn!( + "getLatestBlockhash failed for rpc endpoint {}: {:?}", + rpc_client.url(), + e + ), + } + } + bail!("getLatestBlockhash failed for all rpc endpoints") + } +}