From 0c747ec4af4faf27f96921fc64c161e0565f17d6 Mon Sep 17 00:00:00 2001 From: Reisen Date: Mon, 1 Jul 2024 11:56:57 +0000 Subject: [PATCH] refactor(agent): add tracing to expensive calls --- src/agent.rs | 32 +++++++++++++------------- src/agent/pyth/rpc.rs | 2 ++ src/agent/pyth/rpc/update_price.rs | 4 ++++ src/agent/services/exporter.rs | 34 ++++++++++++++++++++++++---- src/agent/services/notifier.rs | 2 ++ src/agent/services/oracle.rs | 21 ++++++++++++----- src/agent/state/api.rs | 5 +++++ src/agent/state/exporter.rs | 36 ++++++++++++++++++++++++++++++ src/agent/state/keypairs.rs | 3 +++ src/agent/state/oracle.rs | 7 ++++++ src/agent/state/transactions.rs | 3 +++ 11 files changed, 124 insertions(+), 25 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 0395401..7096423 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -72,6 +72,7 @@ use { lazy_static::lazy_static, std::sync::Arc, tokio::sync::watch, + tracing::instrument, }; pub mod config; @@ -121,58 +122,59 @@ impl Agent { }; } + #[instrument(skip(self))] async fn spawn(&self) -> Result<()> { // job handles - let mut jhs = vec![]; + let mut handles = vec![]; // Create the Application State. let state = Arc::new(state::State::new(&self.config).await); // Spawn the primary network Oracle. - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); // Spawn the secondary network Oracle, if needed. if let Some(config) = &self.config.secondary_network { - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(services::notifier(state.clone()))); + handles.push(tokio::spawn(services::notifier(state.clone()))); // Spawn the Pythd API Server - jhs.push(tokio::spawn(rpc::run( + handles.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), state.clone(), ))); // Spawn the metrics server - jhs.push(tokio::spawn(metrics::spawn( + handles.push(tokio::spawn(metrics::spawn( self.config.metrics_server.bind_address, ))); // Spawn the remote keypair loader endpoint for both networks - jhs.append( - &mut services::keypairs( + handles.extend( + services::keypairs( self.config.primary_network.rpc_url.clone(), self.config .secondary_network @@ -185,7 +187,7 @@ impl Agent { ); // Wait for all tasks to complete - join_all(jhs).await; + join_all(handles).await; Ok(()) } diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index 8d705a4..ea8b7a7 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -50,6 +50,7 @@ use { sync::Arc, }, tokio::sync::mpsc, + tracing::instrument, warp::{ ws::{ Message, @@ -411,6 +412,7 @@ impl Default for Config { } } +#[instrument(skip_all)] pub async fn run(config: Config, state: Arc) where S: state::Prices, diff --git a/src/agent/pyth/rpc/update_price.rs b/src/agent/pyth/rpc/update_price.rs index c5748af..06b11f9 100644 --- a/src/agent/pyth/rpc/update_price.rs +++ b/src/agent/pyth/rpc/update_price.rs @@ -12,8 +12,10 @@ use { Request, Value, }, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn update_price( state: &S, request: &Request, @@ -28,6 +30,8 @@ where .ok_or_else(|| anyhow!("Missing request parameters"))?, )?; + tracing::Span::current().record("account", params.account.to_string()); + state .update_local_price( ¶ms.account.parse::()?, diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index d7fe47a..14c0ac9 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -23,8 +23,10 @@ use { }, tokio::{ sync::watch, + task::JoinHandle, time::Interval, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -118,6 +120,13 @@ struct NetworkStateQuerier { } impl NetworkStateQuerier { + #[instrument( + skip(rpc_endpoint, rpc_timeout, query_interval), + fields( + rpc_timeout = rpc_timeout.as_millis(), + query_interval = query_interval.period().as_millis(), + ) + )] pub fn new( rpc_endpoint: &str, rpc_timeout: Duration, @@ -140,6 +149,7 @@ impl NetworkStateQuerier { } } + #[instrument(skip(self))] async fn query_network_state(&mut self) -> Result<()> { // Fetch the blockhash and current slot in parallel let current_slot_future = self @@ -160,12 +170,15 @@ impl NetworkStateQuerier { } } -pub async fn exporter(config: network::Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn exporter(config: network::Config, network: Network, state: Arc) -> Vec> where S: Exporter, S: Transactions, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( @@ -175,12 +188,23 @@ where network_state_tx, ); - tokio::spawn(transaction_monitor::transaction_monitor( + handles.push(tokio::spawn(transaction_monitor::transaction_monitor( config.clone(), state.clone(), + ))); + + handles.push(tokio::spawn(exporter::exporter( + config, + network, + state, + network_state_rx, + ))); + + handles.push(tokio::spawn( + async move { network_state_querier.run().await }, )); - tokio::spawn(exporter::exporter(config, network, state, network_state_rx)); - tokio::spawn(async move { network_state_querier.run().await }); + + handles } mod exporter { @@ -294,6 +318,7 @@ mod transaction_monitor { sync::Arc, time::Duration, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -318,6 +343,7 @@ mod transaction_monitor { } } + #[instrument(skip(config, state))] pub async fn transaction_monitor(config: network::Config, state: Arc) where S: Transactions, diff --git a/src/agent/services/notifier.rs b/src/agent/services/notifier.rs index aecc682..0f4e4a2 100644 --- a/src/agent/services/notifier.rs +++ b/src/agent/services/notifier.rs @@ -6,8 +6,10 @@ use { crate::agent::state::Prices, std::sync::Arc, + tracing::instrument, }; +#[instrument(skip(state))] pub async fn notifier(state: Arc) where S: Prices, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 9cfb523..67a274a 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -38,30 +38,35 @@ use { Instant, }, }, + tokio::task::JoinHandle, tokio_stream::StreamExt, + tracing::instrument, }; -pub async fn oracle(config: Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn oracle(config: Config, network: Network, state: Arc) -> Vec> where S: Oracle, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Oracle won't start."); - return; + return handles; }; - tokio::spawn(poller( + handles.push(tokio::spawn(poller( config.clone(), network, state.clone(), key_store.mapping_key, key_store.publish_keypair, config.oracle.max_lookup_batch_size, - )); + ))); if config.oracle.subscriber_enabled { - tokio::spawn(async move { + handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); if let Err(ref err) = subscriber( @@ -79,8 +84,10 @@ where } } } - }); + })); } + + handles } /// When an account RPC Subscription update is receiveed. @@ -88,6 +95,7 @@ where /// We check if the account is one we're aware of and tracking, and if so, spawn /// a small background task that handles that update. We only do this for price /// accounts, all other accounts are handled below in the poller. +#[instrument(skip(config, state))] async fn subscriber( config: Config, network: Network, @@ -144,6 +152,7 @@ where } /// On poll lookup all Pyth Mapping/Product/Price accounts and sync. +#[instrument(skip(config, state))] async fn poller( config: Config, network: Network, diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index 3ea1308..dc92f5d 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -53,6 +53,7 @@ use { mpsc, RwLock, }, + tracing::instrument, }; // TODO: implement Display on PriceStatus and then just call PriceStatus::to_string @@ -382,6 +383,10 @@ where .map_err(|_| anyhow!("failed to send update to local store")) } + #[instrument(skip(self, update), fields(update = match update { + Update::ProductAccountUpdate { account_key, .. } => account_key, + Update::PriceAccountUpdate { account_key, .. } => account_key, + }.to_string()))] async fn update_global_price(&self, network: Network, update: &Update) -> Result<()> { GlobalStore::update(self, network, update) .await diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 3e8585f..be0aac0 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -53,6 +53,7 @@ use { watch, RwLock, }, + tracing::instrument, }; const PYTH_ORACLE_VERSION: u32 = 2; @@ -146,6 +147,14 @@ where .extend(batch_state); } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn get_permissioned_updates( &self, publish_keypair: &Keypair, @@ -254,6 +263,14 @@ where .await } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, @@ -283,6 +300,7 @@ where Ok(()) } + #[instrument(skip(self, publisher_permissions))] async fn update_permissions( &self, network: Network, @@ -305,6 +323,7 @@ where } } +#[instrument(skip(state))] pub async fn get_publish_keypair( state: &S, network: Network, @@ -402,6 +421,13 @@ async fn estimate_compute_unit_price_micro_lamports( /// - Degrade gracefully if the blockchain RPC node exhibits poor performance. If the RPC node takes a long /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. +#[instrument( + skip(state, client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + ) +)] pub async fn publish_batches( state: &S, client: Arc, @@ -472,6 +498,16 @@ where Ok(()) } +#[instrument( + skip(state, client, network_state, publish_keypair, batch, staleness_threshold), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + blockhash = network_state.blockhash.to_string(), + current_slot = network_state.current_slot, + staleness_threshold = staleness_threshold.as_millis(), + batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + ) +)] async fn publish_batch( state: &S, client: Arc, diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index 5b57962..53565cc 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -8,6 +8,7 @@ use { anyhow::Result, solana_sdk::signature::Keypair, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -35,6 +36,7 @@ where for<'a> &'a T: Into<&'a KeypairState>, T: Sync, { + #[instrument(skip(self))] async fn request_keypair(&self, network: Network) -> Result { let keypair = match network { Network::Primary => &self.into().primary_current_keypair, @@ -51,6 +53,7 @@ where )?) } + #[instrument(skip(self))] async fn update_keypair(&self, network: Network, new_keypair: Keypair) { *match network { Network::Primary => self.into().primary_current_keypair.write().await, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 0f4182a..899565d 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -46,6 +46,7 @@ use { time::Duration, }, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Debug, Clone)] @@ -96,6 +97,7 @@ impl From for PriceEntry { impl PriceEntry { /// Construct the right underlying GenericPriceAccount based on the account size. + #[instrument(skip(acc))] pub fn load_from_account(acc: &[u8]) -> Option { unsafe { let size = match acc.len() { @@ -216,6 +218,7 @@ where T: Prices, T: Exporter, { + #[instrument(skip(self, account_key))] async fn handle_price_account_update( &self, network: Network, @@ -259,6 +262,7 @@ where } /// Poll target Solana based chain for Pyth related accounts. + #[instrument(skip(self, rpc_client))] async fn poll_updates( &self, network: Network, @@ -329,6 +333,7 @@ where } /// Sync Product/Price Accounts found by polling to the Global Store. + #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { for (product_account_key, product_account) in &self.into().data.read().await.product_accounts @@ -362,6 +367,7 @@ where } } +#[instrument(skip(rpc_client))] async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, @@ -381,6 +387,7 @@ async fn fetch_mapping_accounts( Ok(accounts) } +#[instrument(skip(rpc_client, mapping_accounts))] async fn fetch_product_and_price_accounts<'a, A>( rpc_client: &RpcClient, max_lookup_batch_size: usize, diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index f69ca1e..c0e8921 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -8,6 +8,7 @@ use { }, std::collections::VecDeque, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -44,6 +45,7 @@ where for<'a> &'a T: Into<&'a TransactionsState>, T: Sync + Send + 'static, { + #[instrument(skip(self))] async fn add_transaction(&self, signature: Signature) { tracing::debug!( signature = signature.to_string(), @@ -60,6 +62,7 @@ where } } + #[instrument(skip(self, rpc))] async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() {