From 0c747ec4af4faf27f96921fc64c161e0565f17d6 Mon Sep 17 00:00:00 2001 From: Reisen Date: Mon, 1 Jul 2024 11:56:57 +0000 Subject: [PATCH 01/13] refactor(agent): add tracing to expensive calls --- src/agent.rs | 32 +++++++++++++------------- src/agent/pyth/rpc.rs | 2 ++ src/agent/pyth/rpc/update_price.rs | 4 ++++ src/agent/services/exporter.rs | 34 ++++++++++++++++++++++++---- src/agent/services/notifier.rs | 2 ++ src/agent/services/oracle.rs | 21 ++++++++++++----- src/agent/state/api.rs | 5 +++++ src/agent/state/exporter.rs | 36 ++++++++++++++++++++++++++++++ src/agent/state/keypairs.rs | 3 +++ src/agent/state/oracle.rs | 7 ++++++ src/agent/state/transactions.rs | 3 +++ 11 files changed, 124 insertions(+), 25 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 0395401..7096423 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -72,6 +72,7 @@ use { lazy_static::lazy_static, std::sync::Arc, tokio::sync::watch, + tracing::instrument, }; pub mod config; @@ -121,58 +122,59 @@ impl Agent { }; } + #[instrument(skip(self))] async fn spawn(&self) -> Result<()> { // job handles - let mut jhs = vec![]; + let mut handles = vec![]; // Create the Application State. let state = Arc::new(state::State::new(&self.config).await); // Spawn the primary network Oracle. - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); // Spawn the secondary network Oracle, if needed. if let Some(config) = &self.config.secondary_network { - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(services::notifier(state.clone()))); + handles.push(tokio::spawn(services::notifier(state.clone()))); // Spawn the Pythd API Server - jhs.push(tokio::spawn(rpc::run( + handles.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), state.clone(), ))); // Spawn the metrics server - jhs.push(tokio::spawn(metrics::spawn( + handles.push(tokio::spawn(metrics::spawn( self.config.metrics_server.bind_address, ))); // Spawn the remote keypair loader endpoint for both networks - jhs.append( - &mut services::keypairs( + handles.extend( + services::keypairs( self.config.primary_network.rpc_url.clone(), self.config .secondary_network @@ -185,7 +187,7 @@ impl Agent { ); // Wait for all tasks to complete - join_all(jhs).await; + join_all(handles).await; Ok(()) } diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index 8d705a4..ea8b7a7 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -50,6 +50,7 @@ use { sync::Arc, }, tokio::sync::mpsc, + tracing::instrument, warp::{ ws::{ Message, @@ -411,6 +412,7 @@ impl Default for Config { } } +#[instrument(skip_all)] pub async fn run(config: Config, state: Arc) where S: state::Prices, diff --git a/src/agent/pyth/rpc/update_price.rs b/src/agent/pyth/rpc/update_price.rs index c5748af..06b11f9 100644 --- a/src/agent/pyth/rpc/update_price.rs +++ b/src/agent/pyth/rpc/update_price.rs @@ -12,8 +12,10 @@ use { Request, Value, }, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn update_price( state: &S, request: &Request, @@ -28,6 +30,8 @@ where .ok_or_else(|| anyhow!("Missing request parameters"))?, )?; + tracing::Span::current().record("account", params.account.to_string()); + state .update_local_price( ¶ms.account.parse::()?, diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index d7fe47a..14c0ac9 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -23,8 +23,10 @@ use { }, tokio::{ sync::watch, + task::JoinHandle, time::Interval, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -118,6 +120,13 @@ struct NetworkStateQuerier { } impl NetworkStateQuerier { + #[instrument( + skip(rpc_endpoint, rpc_timeout, query_interval), + fields( + rpc_timeout = rpc_timeout.as_millis(), + query_interval = query_interval.period().as_millis(), + ) + )] pub fn new( rpc_endpoint: &str, rpc_timeout: Duration, @@ -140,6 +149,7 @@ impl NetworkStateQuerier { } } + #[instrument(skip(self))] async fn query_network_state(&mut self) -> Result<()> { // Fetch the blockhash and current slot in parallel let current_slot_future = self @@ -160,12 +170,15 @@ impl NetworkStateQuerier { } } -pub async fn exporter(config: network::Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn exporter(config: network::Config, network: Network, state: Arc) -> Vec> where S: Exporter, S: Transactions, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( @@ -175,12 +188,23 @@ where network_state_tx, ); - tokio::spawn(transaction_monitor::transaction_monitor( + handles.push(tokio::spawn(transaction_monitor::transaction_monitor( config.clone(), state.clone(), + ))); + + handles.push(tokio::spawn(exporter::exporter( + config, + network, + state, + network_state_rx, + ))); + + handles.push(tokio::spawn( + async move { network_state_querier.run().await }, )); - tokio::spawn(exporter::exporter(config, network, state, network_state_rx)); - tokio::spawn(async move { network_state_querier.run().await }); + + handles } mod exporter { @@ -294,6 +318,7 @@ mod transaction_monitor { sync::Arc, time::Duration, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -318,6 +343,7 @@ mod transaction_monitor { } } + #[instrument(skip(config, state))] pub async fn transaction_monitor(config: network::Config, state: Arc) where S: Transactions, diff --git a/src/agent/services/notifier.rs b/src/agent/services/notifier.rs index aecc682..0f4e4a2 100644 --- a/src/agent/services/notifier.rs +++ b/src/agent/services/notifier.rs @@ -6,8 +6,10 @@ use { crate::agent::state::Prices, std::sync::Arc, + tracing::instrument, }; +#[instrument(skip(state))] pub async fn notifier(state: Arc) where S: Prices, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 9cfb523..67a274a 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -38,30 +38,35 @@ use { Instant, }, }, + tokio::task::JoinHandle, tokio_stream::StreamExt, + tracing::instrument, }; -pub async fn oracle(config: Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn oracle(config: Config, network: Network, state: Arc) -> Vec> where S: Oracle, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Oracle won't start."); - return; + return handles; }; - tokio::spawn(poller( + handles.push(tokio::spawn(poller( config.clone(), network, state.clone(), key_store.mapping_key, key_store.publish_keypair, config.oracle.max_lookup_batch_size, - )); + ))); if config.oracle.subscriber_enabled { - tokio::spawn(async move { + handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); if let Err(ref err) = subscriber( @@ -79,8 +84,10 @@ where } } } - }); + })); } + + handles } /// When an account RPC Subscription update is receiveed. @@ -88,6 +95,7 @@ where /// We check if the account is one we're aware of and tracking, and if so, spawn /// a small background task that handles that update. We only do this for price /// accounts, all other accounts are handled below in the poller. +#[instrument(skip(config, state))] async fn subscriber( config: Config, network: Network, @@ -144,6 +152,7 @@ where } /// On poll lookup all Pyth Mapping/Product/Price accounts and sync. +#[instrument(skip(config, state))] async fn poller( config: Config, network: Network, diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index 3ea1308..dc92f5d 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -53,6 +53,7 @@ use { mpsc, RwLock, }, + tracing::instrument, }; // TODO: implement Display on PriceStatus and then just call PriceStatus::to_string @@ -382,6 +383,10 @@ where .map_err(|_| anyhow!("failed to send update to local store")) } + #[instrument(skip(self, update), fields(update = match update { + Update::ProductAccountUpdate { account_key, .. } => account_key, + Update::PriceAccountUpdate { account_key, .. } => account_key, + }.to_string()))] async fn update_global_price(&self, network: Network, update: &Update) -> Result<()> { GlobalStore::update(self, network, update) .await diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 3e8585f..be0aac0 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -53,6 +53,7 @@ use { watch, RwLock, }, + tracing::instrument, }; const PYTH_ORACLE_VERSION: u32 = 2; @@ -146,6 +147,14 @@ where .extend(batch_state); } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn get_permissioned_updates( &self, publish_keypair: &Keypair, @@ -254,6 +263,14 @@ where .await } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, @@ -283,6 +300,7 @@ where Ok(()) } + #[instrument(skip(self, publisher_permissions))] async fn update_permissions( &self, network: Network, @@ -305,6 +323,7 @@ where } } +#[instrument(skip(state))] pub async fn get_publish_keypair( state: &S, network: Network, @@ -402,6 +421,13 @@ async fn estimate_compute_unit_price_micro_lamports( /// - Degrade gracefully if the blockchain RPC node exhibits poor performance. If the RPC node takes a long /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. +#[instrument( + skip(state, client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + ) +)] pub async fn publish_batches( state: &S, client: Arc, @@ -472,6 +498,16 @@ where Ok(()) } +#[instrument( + skip(state, client, network_state, publish_keypair, batch, staleness_threshold), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + blockhash = network_state.blockhash.to_string(), + current_slot = network_state.current_slot, + staleness_threshold = staleness_threshold.as_millis(), + batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + ) +)] async fn publish_batch( state: &S, client: Arc, diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index 5b57962..53565cc 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -8,6 +8,7 @@ use { anyhow::Result, solana_sdk::signature::Keypair, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -35,6 +36,7 @@ where for<'a> &'a T: Into<&'a KeypairState>, T: Sync, { + #[instrument(skip(self))] async fn request_keypair(&self, network: Network) -> Result { let keypair = match network { Network::Primary => &self.into().primary_current_keypair, @@ -51,6 +53,7 @@ where )?) } + #[instrument(skip(self))] async fn update_keypair(&self, network: Network, new_keypair: Keypair) { *match network { Network::Primary => self.into().primary_current_keypair.write().await, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 0f4182a..899565d 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -46,6 +46,7 @@ use { time::Duration, }, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Debug, Clone)] @@ -96,6 +97,7 @@ impl From for PriceEntry { impl PriceEntry { /// Construct the right underlying GenericPriceAccount based on the account size. + #[instrument(skip(acc))] pub fn load_from_account(acc: &[u8]) -> Option { unsafe { let size = match acc.len() { @@ -216,6 +218,7 @@ where T: Prices, T: Exporter, { + #[instrument(skip(self, account_key))] async fn handle_price_account_update( &self, network: Network, @@ -259,6 +262,7 @@ where } /// Poll target Solana based chain for Pyth related accounts. + #[instrument(skip(self, rpc_client))] async fn poll_updates( &self, network: Network, @@ -329,6 +333,7 @@ where } /// Sync Product/Price Accounts found by polling to the Global Store. + #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { for (product_account_key, product_account) in &self.into().data.read().await.product_accounts @@ -362,6 +367,7 @@ where } } +#[instrument(skip(rpc_client))] async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, @@ -381,6 +387,7 @@ async fn fetch_mapping_accounts( Ok(accounts) } +#[instrument(skip(rpc_client, mapping_accounts))] async fn fetch_product_and_price_accounts<'a, A>( rpc_client: &RpcClient, max_lookup_batch_size: usize, diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index f69ca1e..c0e8921 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -8,6 +8,7 @@ use { }, std::collections::VecDeque, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -44,6 +45,7 @@ where for<'a> &'a T: Into<&'a TransactionsState>, T: Sync + Send + 'static, { + #[instrument(skip(self))] async fn add_transaction(&self, signature: Signature) { tracing::debug!( signature = signature.to_string(), @@ -60,6 +62,7 @@ where } } + #[instrument(skip(self, rpc))] async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() { From a62a623923c6bc09957f9b519e346bd10fc70a32 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 12 Jul 2024 14:12:15 +0900 Subject: [PATCH 02/13] skip keypair for tracing --- src/agent/services/oracle.rs | 2 +- src/agent/state/exporter.rs | 4 ++-- src/agent/state/keypairs.rs | 2 +- src/agent/state/oracle.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 67a274a..a9d5237 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -152,7 +152,7 @@ where } /// On poll lookup all Pyth Mapping/Product/Price accounts and sync. -#[instrument(skip(config, state))] +#[instrument(skip(config, publish_keypair, state))] async fn poller( config: Config, network: Network, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index be0aac0..eb7c74a 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -300,7 +300,7 @@ where Ok(()) } - #[instrument(skip(self, publisher_permissions))] + #[instrument(skip(self, publish_keypair, publisher_permissions))] async fn update_permissions( &self, network: Network, @@ -323,7 +323,7 @@ where } } -#[instrument(skip(state))] +#[instrument(skip(state, publish_keypair))] pub async fn get_publish_keypair( state: &S, network: Network, diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index 53565cc..095f6b2 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -53,7 +53,7 @@ where )?) } - #[instrument(skip(self))] + #[instrument(skip(self, new_keypair))] async fn update_keypair(&self, network: Network, new_keypair: Keypair) { *match network { Network::Primary => self.into().primary_current_keypair.write().await, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 899565d..f30c4d1 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -262,7 +262,7 @@ where } /// Poll target Solana based chain for Pyth related accounts. - #[instrument(skip(self, rpc_client))] + #[instrument(skip(self, publish_keypair, rpc_client))] async fn poll_updates( &self, network: Network, From cb0fb12626314f920cd1f9b93c385ba63897a2c3 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 12 Jul 2024 14:29:16 +0900 Subject: [PATCH 03/13] add more traces to oracle state --- src/agent/state/oracle.rs | 105 ++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 54 deletions(-) diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index f30c4d1..cb58b6f 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -1,48 +1,23 @@ #[allow(deprecated)] use crate::agent::legacy_schedule::LegacySchedule; use { - super::{ - super::solana::network::Network, - exporter::Exporter, - }, + super::{super::solana::network::Network, exporter::Exporter}, crate::agent::{ market_schedule::MarketSchedule, - state::{ - global::Update, - Prices, - State, - }, - }, - anyhow::{ - anyhow, - Context, - Result, + state::{global::Update, Prices, State}, }, + anyhow::{anyhow, Context, Result}, pyth_sdk_solana::state::{ - load_mapping_account, - load_product_account, - GenericPriceAccount, - MappingAccount, - PriceComp, - PythnetPriceAccount, - SolanaPriceAccount, - }, - serde::{ - Deserialize, - Serialize, + load_mapping_account, load_product_account, GenericPriceAccount, MappingAccount, PriceComp, + PythnetPriceAccount, SolanaPriceAccount, }, + serde::{Deserialize, Serialize}, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ - account::Account, - commitment_config::CommitmentLevel, - pubkey::Pubkey, - signature::Keypair, + account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair, }, std::{ - collections::{ - HashMap, - HashSet, - }, + collections::{HashMap, HashSet}, time::Duration, }, tokio::sync::RwLock, @@ -51,15 +26,15 @@ use { #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub schedule: MarketSchedule, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, pub publish_interval: Option, } #[derive(Default, Debug, Clone)] pub struct PricePublishingMetadata { - pub schedule: MarketSchedule, + pub schedule: MarketSchedule, pub publish_interval: Option, } @@ -77,7 +52,7 @@ pub struct PricePublishingMetadata { #[derive(Copy, Clone, Debug)] pub struct PriceEntry { // We intentionally act as if we have a truncated account where the underlying memory is unavailable. - account: GenericPriceAccount<0, ()>, + account: GenericPriceAccount<0, ()>, pub comp: [PriceComp; 64], } @@ -129,9 +104,9 @@ impl std::ops::Deref for PriceEntry { #[derive(Default, Debug, Clone)] pub struct Data { - pub mapping_accounts: HashMap, - pub product_accounts: HashMap, - pub price_accounts: HashMap, + pub mapping_accounts: HashMap, + pub product_accounts: HashMap, + pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => price publishing metadata} pub publisher_permissions: HashMap>, } @@ -140,16 +115,16 @@ pub struct Data { #[serde(default)] pub struct Config { /// The commitment level to use when reading data from the RPC node. - pub commitment: CommitmentLevel, + pub commitment: CommitmentLevel, /// The interval with which to poll account information. #[serde(with = "humantime_serde")] - pub poll_interval_duration: Duration, + pub poll_interval_duration: Duration, /// Whether subscribing to account updates over websocket is enabled - pub subscriber_enabled: bool, + pub subscriber_enabled: bool, /// Capacity of the channel over which the Subscriber sends updates to the Oracle pub updates_channel_capacity: usize, /// Capacity of the channel over which the Poller sends data to the Oracle - pub data_channel_capacity: usize, + pub data_channel_capacity: usize, /// Ask the RPC for up to this many product/price accounts in a /// single request. Tune this setting if you're experiencing @@ -162,12 +137,12 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - commitment: CommitmentLevel::Confirmed, - poll_interval_duration: Duration::from_secs(5), - subscriber_enabled: true, + commitment: CommitmentLevel::Confirmed, + poll_interval_duration: Duration::from_secs(5), + subscriber_enabled: true, updates_channel_capacity: 10000, - data_channel_capacity: 10000, - max_lookup_batch_size: 100, + data_channel_capacity: 10000, + max_lookup_batch_size: 100, } } } @@ -178,6 +153,7 @@ pub struct OracleState { impl OracleState { pub fn new() -> Self { + tracing::info!("Initializing OracleState"); Self { data: Default::default(), } @@ -232,6 +208,7 @@ where // We are only interested in price account updates, all other types of updates // will be fetched using polling. if !data.price_accounts.contains_key(account_key) { + tracing::info!("Account key not found in price accounts, skipping update."); return Ok(()); } @@ -247,13 +224,14 @@ where ); data.price_accounts.insert(*account_key, price_entry); + tracing::info!("Updated price account for key: {}", account_key); Prices::update_global_price( self, network, &Update::PriceAccountUpdate { account_key: *account_key, - account: price_entry, + account: price_entry, }, ) .await?; @@ -271,14 +249,17 @@ where rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()> { + tracing::info!("Polling updates for network: {:?}", network); let mut publisher_permissions = HashMap::new(); let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?; + tracing::info!("Fetched mapping accounts."); let (product_accounts, price_accounts) = fetch_product_and_price_accounts( rpc_client, max_lookup_batch_size, mapping_accounts.values(), ) .await?; + tracing::info!("Fetched product and price accounts."); for (price_key, price_entry) in price_accounts.iter() { for component in price_entry.comp { @@ -294,7 +275,7 @@ where product_accounts.get(&price_entry.prod) { PricePublishingMetadata { - schedule: prod_entry.schedule.clone(), + schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, } } else { @@ -320,6 +301,7 @@ where let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; + tracing::info!("Updated OracleState data."); Exporter::update_permissions( self, @@ -335,6 +317,7 @@ where /// Sync Product/Price Accounts found by polling to the Global Store. #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { + tracing::info!("Syncing global store for network: {:?}", network); for (product_account_key, product_account) in &self.into().data.read().await.product_accounts { @@ -343,7 +326,7 @@ where network, &Update::ProductAccountUpdate { account_key: *product_account_key, - account: product_account.clone(), + account: product_account.clone(), }, ) .await @@ -356,13 +339,14 @@ where network, &Update::PriceAccountUpdate { account_key: *price_account_key, - account: *price_account, + account: *price_account, }, ) .await .map_err(|_| anyhow!("failed to notify price account update"))?; } + tracing::info!("Global store sync completed."); Ok(()) } } @@ -372,6 +356,10 @@ async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, ) -> Result> { + tracing::info!( + "Fetching mapping accounts starting from key: {}", + mapping_account_key + ); let mut accounts = HashMap::new(); let mut account_key = mapping_account_key; while account_key != Pubkey::default() { @@ -384,6 +372,7 @@ async fn fetch_mapping_accounts( accounts.insert(account_key, account); account_key = account.next; } + tracing::info!("Fetched {} mapping accounts.", accounts.len()); Ok(accounts) } @@ -396,6 +385,7 @@ async fn fetch_product_and_price_accounts<'a, A>( where A: IntoIterator, { + tracing::info!("Fetching product and price accounts."); let mut product_keys = vec![]; // Get all product keys @@ -421,6 +411,11 @@ where price_entries.extend(batch_prices.drain()); } + tracing::info!( + "Fetched {} product entries and {} price entries.", + product_entries.len(), + price_entries.len() + ); Ok((product_entries, price_entries)) } @@ -428,6 +423,7 @@ async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, product_key_batch: &[Pubkey], ) -> Result<(HashMap, HashMap)> { + tracing::info!("Fetching batch of product and price accounts."); let mut product_entries = HashMap::new(); let product_keys = product_key_batch; @@ -568,6 +564,7 @@ async fn fetch_batch_of_product_and_price_accounts( todo = next_todo; } + tracing::info!("Fetched batch of product and price accounts."); Ok((product_entries, price_entries)) } From 8fc1ebf80e30c9b7dd0aefba88949dd6c2cb2038 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 12 Jul 2024 14:30:56 +0900 Subject: [PATCH 04/13] precommit --- src/agent/state/oracle.rs | 83 +++++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index cb58b6f..6e3186c 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -1,23 +1,48 @@ #[allow(deprecated)] use crate::agent::legacy_schedule::LegacySchedule; use { - super::{super::solana::network::Network, exporter::Exporter}, + super::{ + super::solana::network::Network, + exporter::Exporter, + }, crate::agent::{ market_schedule::MarketSchedule, - state::{global::Update, Prices, State}, + state::{ + global::Update, + Prices, + State, + }, + }, + anyhow::{ + anyhow, + Context, + Result, }, - anyhow::{anyhow, Context, Result}, pyth_sdk_solana::state::{ - load_mapping_account, load_product_account, GenericPriceAccount, MappingAccount, PriceComp, - PythnetPriceAccount, SolanaPriceAccount, + load_mapping_account, + load_product_account, + GenericPriceAccount, + MappingAccount, + PriceComp, + PythnetPriceAccount, + SolanaPriceAccount, + }, + serde::{ + Deserialize, + Serialize, }, - serde::{Deserialize, Serialize}, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ - account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair, + account::Account, + commitment_config::CommitmentLevel, + pubkey::Pubkey, + signature::Keypair, }, std::{ - collections::{HashMap, HashSet}, + collections::{ + HashMap, + HashSet, + }, time::Duration, }, tokio::sync::RwLock, @@ -26,15 +51,15 @@ use { #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub schedule: MarketSchedule, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, pub publish_interval: Option, } #[derive(Default, Debug, Clone)] pub struct PricePublishingMetadata { - pub schedule: MarketSchedule, + pub schedule: MarketSchedule, pub publish_interval: Option, } @@ -52,7 +77,7 @@ pub struct PricePublishingMetadata { #[derive(Copy, Clone, Debug)] pub struct PriceEntry { // We intentionally act as if we have a truncated account where the underlying memory is unavailable. - account: GenericPriceAccount<0, ()>, + account: GenericPriceAccount<0, ()>, pub comp: [PriceComp; 64], } @@ -104,9 +129,9 @@ impl std::ops::Deref for PriceEntry { #[derive(Default, Debug, Clone)] pub struct Data { - pub mapping_accounts: HashMap, - pub product_accounts: HashMap, - pub price_accounts: HashMap, + pub mapping_accounts: HashMap, + pub product_accounts: HashMap, + pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => price publishing metadata} pub publisher_permissions: HashMap>, } @@ -115,16 +140,16 @@ pub struct Data { #[serde(default)] pub struct Config { /// The commitment level to use when reading data from the RPC node. - pub commitment: CommitmentLevel, + pub commitment: CommitmentLevel, /// The interval with which to poll account information. #[serde(with = "humantime_serde")] - pub poll_interval_duration: Duration, + pub poll_interval_duration: Duration, /// Whether subscribing to account updates over websocket is enabled - pub subscriber_enabled: bool, + pub subscriber_enabled: bool, /// Capacity of the channel over which the Subscriber sends updates to the Oracle pub updates_channel_capacity: usize, /// Capacity of the channel over which the Poller sends data to the Oracle - pub data_channel_capacity: usize, + pub data_channel_capacity: usize, /// Ask the RPC for up to this many product/price accounts in a /// single request. Tune this setting if you're experiencing @@ -137,12 +162,12 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - commitment: CommitmentLevel::Confirmed, - poll_interval_duration: Duration::from_secs(5), - subscriber_enabled: true, + commitment: CommitmentLevel::Confirmed, + poll_interval_duration: Duration::from_secs(5), + subscriber_enabled: true, updates_channel_capacity: 10000, - data_channel_capacity: 10000, - max_lookup_batch_size: 100, + data_channel_capacity: 10000, + max_lookup_batch_size: 100, } } } @@ -231,7 +256,7 @@ where network, &Update::PriceAccountUpdate { account_key: *account_key, - account: price_entry, + account: price_entry, }, ) .await?; @@ -275,7 +300,7 @@ where product_accounts.get(&price_entry.prod) { PricePublishingMetadata { - schedule: prod_entry.schedule.clone(), + schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, } } else { @@ -326,7 +351,7 @@ where network, &Update::ProductAccountUpdate { account_key: *product_account_key, - account: product_account.clone(), + account: product_account.clone(), }, ) .await @@ -339,7 +364,7 @@ where network, &Update::PriceAccountUpdate { account_key: *price_account_key, - account: *price_account, + account: *price_account, }, ) .await From 289d6bcdc6b2781573b38b154b06aae20ae5273d Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 12 Jul 2024 17:21:06 +0900 Subject: [PATCH 05/13] remove unnecessary traces --- src/agent/state/oracle.rs | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 6e3186c..35ad863 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -178,7 +178,6 @@ pub struct OracleState { impl OracleState { pub fn new() -> Self { - tracing::info!("Initializing OracleState"); Self { data: Default::default(), } @@ -226,14 +225,11 @@ where account_key: &Pubkey, account: &Account, ) -> Result<()> { - tracing::debug!("Handling account update."); - let mut data = self.into().data.write().await; // We are only interested in price account updates, all other types of updates // will be fetched using polling. if !data.price_accounts.contains_key(account_key) { - tracing::info!("Account key not found in price accounts, skipping update."); return Ok(()); } @@ -249,7 +245,6 @@ where ); data.price_accounts.insert(*account_key, price_entry); - tracing::info!("Updated price account for key: {}", account_key); Prices::update_global_price( self, @@ -274,17 +269,14 @@ where rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()> { - tracing::info!("Polling updates for network: {:?}", network); let mut publisher_permissions = HashMap::new(); let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?; - tracing::info!("Fetched mapping accounts."); let (product_accounts, price_accounts) = fetch_product_and_price_accounts( rpc_client, max_lookup_batch_size, mapping_accounts.values(), ) .await?; - tracing::info!("Fetched product and price accounts."); for (price_key, price_entry) in price_accounts.iter() { for component in price_entry.comp { @@ -326,7 +318,6 @@ where let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; - tracing::info!("Updated OracleState data."); Exporter::update_permissions( self, @@ -342,7 +333,6 @@ where /// Sync Product/Price Accounts found by polling to the Global Store. #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { - tracing::info!("Syncing global store for network: {:?}", network); for (product_account_key, product_account) in &self.into().data.read().await.product_accounts { @@ -370,8 +360,6 @@ where .await .map_err(|_| anyhow!("failed to notify price account update"))?; } - - tracing::info!("Global store sync completed."); Ok(()) } } @@ -381,10 +369,6 @@ async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, ) -> Result> { - tracing::info!( - "Fetching mapping accounts starting from key: {}", - mapping_account_key - ); let mut accounts = HashMap::new(); let mut account_key = mapping_account_key; while account_key != Pubkey::default() { @@ -397,7 +381,6 @@ async fn fetch_mapping_accounts( accounts.insert(account_key, account); account_key = account.next; } - tracing::info!("Fetched {} mapping accounts.", accounts.len()); Ok(accounts) } @@ -410,7 +393,6 @@ async fn fetch_product_and_price_accounts<'a, A>( where A: IntoIterator, { - tracing::info!("Fetching product and price accounts."); let mut product_keys = vec![]; // Get all product keys @@ -436,11 +418,6 @@ where price_entries.extend(batch_prices.drain()); } - tracing::info!( - "Fetched {} product entries and {} price entries.", - product_entries.len(), - price_entries.len() - ); Ok((product_entries, price_entries)) } @@ -448,7 +425,6 @@ async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, product_key_batch: &[Pubkey], ) -> Result<(HashMap, HashMap)> { - tracing::info!("Fetching batch of product and price accounts."); let mut product_entries = HashMap::new(); let product_keys = product_key_batch; @@ -589,7 +565,6 @@ async fn fetch_batch_of_product_and_price_accounts( todo = next_todo; } - tracing::info!("Fetched batch of product and price accounts."); Ok((product_entries, price_entries)) } From 873cb639e3d62d5632ae285e74ebcf86929e310f Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 12 Jul 2024 17:21:45 +0900 Subject: [PATCH 06/13] add opentelemetry exporter --- src/bin/agent.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 3a515f7..3aafdb3 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -5,6 +5,8 @@ use { Result, }, clap::Parser, + opentelemetry::KeyValue, + opentelemetry_otlp::WithExportConfig, pyth_agent::agent::{ config::Config, Agent, @@ -12,7 +14,9 @@ use { std::{ io::IsTerminal, path::PathBuf, + time::Duration, }, + tracing_subscriber::prelude::*, }; #[derive(Parser, Debug)] @@ -31,18 +35,38 @@ struct Arguments { #[tokio::main] async fn main() -> Result<()> { // Initialize a Tracing Subscriber - let fmt_builder = tracing_subscriber::fmt() + let fmt_layer = tracing_subscriber::fmt::layer() .with_file(false) .with_line_number(true) .with_thread_ids(true) - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(true) .with_ansi(std::io::stderr().is_terminal()); + // Set up the OpenTelemetry exporter, defaults to 127.0.0.1:4317 + let otlp_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(3)); + + // Set up the OpenTelemetry tracer + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otlp_exporter) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource( + opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pyth-agent")]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .map_err(|e| anyhow::anyhow!("Error initializing open telemetry: {}", e))?; + + // Set up the telemetry layer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let registry = tracing_subscriber::registry().with(telemetry); + // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter. if std::io::stderr().is_terminal() { - tracing::subscriber::set_global_default(fmt_builder.compact().finish())?; + registry.with(fmt_layer.compact()).init(); } else { - tracing::subscriber::set_global_default(fmt_builder.json().finish())?; + registry.with(fmt_layer.json()).init(); } let args = Arguments::parse(); From 220db0bc9bbb39da7a5cc5e256712b140cc34c27 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Jul 2024 10:51:02 +0900 Subject: [PATCH 07/13] add more instruments --- Cargo.lock | 284 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 + src/agent/state/oracle.rs | 2 + 3 files changed, 290 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 7741ae1..145b8a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,28 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "async-trait" version = "0.1.79" @@ -461,6 +483,51 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1792,6 +1859,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "goblin" version = "0.5.4" @@ -2058,6 +2131,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2408,6 +2493,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.2" @@ -2768,6 +2859,81 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "4.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.6.0" @@ -3209,6 +3375,29 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "pyth-agent" version = "2.9.0" @@ -3227,6 +3416,9 @@ dependencies = [ "iobuffer", "jrpc", "lazy_static", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "parking_lot", "portpicker", "prometheus-client", @@ -3248,6 +3440,7 @@ dependencies = [ "tokio-util", "toml_edit 0.22.9", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "typed-html", "warp", @@ -5565,6 +5758,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -5706,6 +5909,59 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -5756,6 +6012,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -6141,6 +6415,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.24.0" diff --git a/Cargo.toml b/Cargo.toml index 489ec98..4fb0240 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,10 @@ winnow = "0.6.5" proptest = "1.4.0" tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } +tracing-opentelemetry = "0.24.0" +opentelemetry = "0.23.0" +opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} +opentelemetry-otlp = { version = "0.16.0" } [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 35ad863..eb4b0ae 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -421,6 +421,7 @@ where Ok((product_entries, price_entries)) } +#[instrument(skip(rpc_client, product_key_batch))] async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, product_key_batch: &[Pubkey], @@ -568,6 +569,7 @@ async fn fetch_batch_of_product_and_price_accounts( Ok((product_entries, price_entries)) } +#[instrument(skip(data, new_data))] fn log_data_diff(data: &Data, new_data: &Data) { // Log new accounts which have been found let previous_mapping_accounts = data From a1590998a303733752196d06712044c3846001ee Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Jul 2024 14:50:20 +0900 Subject: [PATCH 08/13] add more instruments --- src/agent/pyth/rpc/get_all_products.rs | 2 ++ src/agent/pyth/rpc/get_product.rs | 4 ++++ src/agent/pyth/rpc/get_product_list.rs | 2 ++ src/agent/pyth/rpc/subscribe_price.rs | 4 ++++ src/agent/pyth/rpc/subscribe_price_sched.rs | 4 ++++ 5 files changed, 16 insertions(+) diff --git a/src/agent/pyth/rpc/get_all_products.rs b/src/agent/pyth/rpc/get_all_products.rs index 7a34278..47dbf31 100644 --- a/src/agent/pyth/rpc/get_all_products.rs +++ b/src/agent/pyth/rpc/get_all_products.rs @@ -1,8 +1,10 @@ use { crate::agent::state, anyhow::Result, + tracing::instrument, }; +#[instrument(skip_all)] pub async fn get_all_products(state: &S) -> Result where S: state::Prices, diff --git a/src/agent/pyth/rpc/get_product.rs b/src/agent/pyth/rpc/get_product.rs index 19c69a5..07fabf5 100644 --- a/src/agent/pyth/rpc/get_product.rs +++ b/src/agent/pyth/rpc/get_product.rs @@ -12,8 +12,10 @@ use { Request, Value, }, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn get_product( state: &S, request: &Request, @@ -27,6 +29,8 @@ where }?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let product = state.get_product(&account).await?; Ok(serde_json::to_value(product)?) } diff --git a/src/agent/pyth/rpc/get_product_list.rs b/src/agent/pyth/rpc/get_product_list.rs index 30cde6e..44c57a3 100644 --- a/src/agent/pyth/rpc/get_product_list.rs +++ b/src/agent/pyth/rpc/get_product_list.rs @@ -1,8 +1,10 @@ use { crate::agent::state, anyhow::Result, + tracing::instrument, }; +#[instrument(skip_all)] pub async fn get_product_list(state: &S) -> Result where S: state::Prices, diff --git a/src/agent/pyth/rpc/subscribe_price.rs b/src/agent/pyth/rpc/subscribe_price.rs index 5936505..73a8389 100644 --- a/src/agent/pyth/rpc/subscribe_price.rs +++ b/src/agent/pyth/rpc/subscribe_price.rs @@ -15,8 +15,10 @@ use { Value, }, tokio::sync::mpsc, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn subscribe_price( state: &S, notify_price_tx: &mpsc::Sender, @@ -33,6 +35,8 @@ where )?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let subscription = state .subscribe_price(&account, notify_price_tx.clone()) .await; diff --git a/src/agent/pyth/rpc/subscribe_price_sched.rs b/src/agent/pyth/rpc/subscribe_price_sched.rs index 608a489..e3be8c3 100644 --- a/src/agent/pyth/rpc/subscribe_price_sched.rs +++ b/src/agent/pyth/rpc/subscribe_price_sched.rs @@ -15,8 +15,10 @@ use { Value, }, tokio::sync::mpsc, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn subscribe_price_sched( state: &S, notify_price_sched_tx: &mpsc::Sender, @@ -33,6 +35,8 @@ where )?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let subscription = state .subscribe_price_sched(&account, notify_price_sched_tx.clone()) .await; From a7ddefefc27466b34a2691700c74c52f5b895f4e Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Tue, 16 Jul 2024 12:49:06 +0900 Subject: [PATCH 09/13] address comments --- src/agent/config.rs | 18 ++++++++++++++++++ src/bin/agent.rs | 28 +++++++++++++++------------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/agent/config.rs b/src/agent/config.rs index d43d6ba..08b154b 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -32,6 +32,8 @@ pub struct Config { pub metrics_server: metrics::Config, #[serde(default)] pub remote_keypair_loader: services::keypairs::Config, + #[serde(default)] + pub opentelemetry: OpenTelemetryConfig, } impl Config { @@ -83,3 +85,19 @@ impl Default for ChannelCapacities { } } } + + +#[derive(Deserialize, Debug)] +pub struct OpenTelemetryConfig { + pub exporter_timeout_secs: u64, + pub exporter_endpoint: String, +} + +impl Default for OpenTelemetryConfig { + fn default() -> Self { + Self { + exporter_timeout_secs: 3, + exporter_endpoint: "http://127.0.0.1:4317".to_string(), + } + } +} diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 3aafdb3..1d2eccc 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -34,18 +34,31 @@ struct Arguments { #[tokio::main] async fn main() -> Result<()> { + let args = Arguments::parse(); + + if !args.config.as_path().exists() { + return Err(anyhow!("No config found under {:?}", args.config.to_str())); + } + + println!("Loading config from {:?}", args.config.display()); + + // Parse config early for logging channel capacity + let config = Config::new(args.config).context("Could not parse config")?; + // Initialize a Tracing Subscriber let fmt_layer = tracing_subscriber::fmt::layer() .with_file(false) .with_line_number(true) .with_thread_ids(true) - .with_target(true) .with_ansi(std::io::stderr().is_terminal()); // Set up the OpenTelemetry exporter, defaults to 127.0.0.1:4317 let otlp_exporter = opentelemetry_otlp::new_exporter() .tonic() - .with_timeout(Duration::from_secs(3)); + .with_endpoint(&config.opentelemetry.exporter_endpoint) + .with_timeout(Duration::from_secs( + config.opentelemetry.exporter_timeout_secs, + )); // Set up the OpenTelemetry tracer let tracer = opentelemetry_otlp::new_pipeline() @@ -69,17 +82,6 @@ async fn main() -> Result<()> { registry.with(fmt_layer.json()).init(); } - let args = Arguments::parse(); - - if !args.config.as_path().exists() { - return Err(anyhow!("No config found under {:?}", args.config.to_str())); - } - - println!("Loading config from {:?}", args.config.display()); - - // Parse config early for logging channel capacity - let config = Config::new(args.config).context("Could not parse config")?; - // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE // should be set to 1 for this otherwise it will only print the top-level error. if let Err(err) = start(config).await { From 1cdee4c9a19baa1bbdbe859e7015759c717b3a51 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Tue, 16 Jul 2024 12:51:35 +0900 Subject: [PATCH 10/13] update config --- config/config.sample.pythnet.toml | 9 +++++++++ config/config.sample.pythtest.toml | 9 +++++++++ config/config.toml | 10 ++++++++++ 3 files changed, 28 insertions(+) diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 8ea9a70..f1d35b9 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -70,3 +70,12 @@ exporter.maximum_compute_unit_price_micro_lamports = 100000 # Note that this doesn't affect the rate at which transactions are published: # this is soley a backwards-compatibility API feature. notify_price_sched_interval_duration = "400ms" + +# Configuration for OpenTelemetry +[opentelemetry] + +# Timeout in seconds for the OpenTelemetry exporter +exporter_timeout_secs = 3 + +# Endpoint URL for the OpenTelemetry exporter +exporter_endpoint = "http://127.0.0.1:4317" diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index 2674801..653411c 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -60,3 +60,12 @@ exporter.compute_unit_price_micro_lamports = 1000 # Note that this doesn't affect the rate at which transactions are published: # this is soley a backwards-compatibility API feature. notify_price_sched_interval_duration = "400ms" + +# Configuration for OpenTelemetry +[opentelemetry] + +# Timeout in seconds for the OpenTelemetry exporter +exporter_timeout_secs = 3 + +# Endpoint URL for the OpenTelemetry exporter +exporter_endpoint = "http://127.0.0.1:4317" diff --git a/config/config.toml b/config/config.toml index 8f35ade..39b254b 100644 --- a/config/config.toml +++ b/config/config.toml @@ -181,3 +181,13 @@ key_store.mapping_key = "RelevantOracleMappingAddress" # publish data to. In most cases this should be a Solana endpoint. The # options correspond to the ones in primary_network # [secondary_network] + + +## Configuration for OpenTelemetry ## +[opentelemetry] + +# Timeout in seconds for the OpenTelemetry exporter +exporter_timeout_secs = 3 + +# Endpoint URL for the OpenTelemetry exporter +exporter_endpoint = "http://127.0.0.1:4317" From 9db3a1cdf5081bfa80477b65758345c6e265f6e1 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Wed, 17 Jul 2024 13:50:18 +0900 Subject: [PATCH 11/13] address comments --- config/config.sample.pythnet.toml | 9 --------- config/config.sample.pythtest.toml | 9 --------- src/bin/agent.rs | 11 +++++++++-- 3 files changed, 9 insertions(+), 20 deletions(-) diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index f1d35b9..8ea9a70 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -70,12 +70,3 @@ exporter.maximum_compute_unit_price_micro_lamports = 100000 # Note that this doesn't affect the rate at which transactions are published: # this is soley a backwards-compatibility API feature. notify_price_sched_interval_duration = "400ms" - -# Configuration for OpenTelemetry -[opentelemetry] - -# Timeout in seconds for the OpenTelemetry exporter -exporter_timeout_secs = 3 - -# Endpoint URL for the OpenTelemetry exporter -exporter_endpoint = "http://127.0.0.1:4317" diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index 653411c..2674801 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -60,12 +60,3 @@ exporter.compute_unit_price_micro_lamports = 1000 # Note that this doesn't affect the rate at which transactions are published: # this is soley a backwards-compatibility API feature. notify_price_sched_interval_duration = "400ms" - -# Configuration for OpenTelemetry -[opentelemetry] - -# Timeout in seconds for the OpenTelemetry exporter -exporter_timeout_secs = 3 - -# Endpoint URL for the OpenTelemetry exporter -exporter_endpoint = "http://127.0.0.1:4317" diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 1d2eccc..18932df 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -16,7 +16,10 @@ use { path::PathBuf, time::Duration, }, - tracing_subscriber::prelude::*, + tracing_subscriber::{ + prelude::*, + EnvFilter, + }, }; #[derive(Parser, Debug)] @@ -45,6 +48,8 @@ async fn main() -> Result<()> { // Parse config early for logging channel capacity let config = Config::new(args.config).context("Could not parse config")?; + let env_filter = EnvFilter::from_default_env(); + // Initialize a Tracing Subscriber let fmt_layer = tracing_subscriber::fmt::layer() .with_file(false) @@ -73,7 +78,9 @@ async fn main() -> Result<()> { // Set up the telemetry layer let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let registry = tracing_subscriber::registry().with(telemetry); + let registry = tracing_subscriber::registry() + .with(telemetry) + .with(env_filter); // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter. if std::io::stderr().is_terminal() { From fc1455708501d3a3045c286f8f01f5aca81f2987 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Wed, 17 Jul 2024 14:02:14 +0900 Subject: [PATCH 12/13] make opentelemetry config optional --- src/agent/config.rs | 12 +--------- src/bin/agent.rs | 58 ++++++++++++++++++++++++--------------------- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/src/agent/config.rs b/src/agent/config.rs index 08b154b..8fb2292 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -32,8 +32,7 @@ pub struct Config { pub metrics_server: metrics::Config, #[serde(default)] pub remote_keypair_loader: services::keypairs::Config, - #[serde(default)] - pub opentelemetry: OpenTelemetryConfig, + pub opentelemetry: Option, } impl Config { @@ -92,12 +91,3 @@ pub struct OpenTelemetryConfig { pub exporter_timeout_secs: u64, pub exporter_endpoint: String, } - -impl Default for OpenTelemetryConfig { - fn default() -> Self { - Self { - exporter_timeout_secs: 3, - exporter_endpoint: "http://127.0.0.1:4317".to_string(), - } - } -} diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 18932df..040e018 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -57,38 +57,42 @@ async fn main() -> Result<()> { .with_thread_ids(true) .with_ansi(std::io::stderr().is_terminal()); - // Set up the OpenTelemetry exporter, defaults to 127.0.0.1:4317 - let otlp_exporter = opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(&config.opentelemetry.exporter_endpoint) - .with_timeout(Duration::from_secs( - config.opentelemetry.exporter_timeout_secs, - )); - - // Set up the OpenTelemetry tracer - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(otlp_exporter) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource( - opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pyth-agent")]), - )) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .map_err(|e| anyhow::anyhow!("Error initializing open telemetry: {}", e))?; - - // Set up the telemetry layer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - let registry = tracing_subscriber::registry() - .with(telemetry) - .with(env_filter); - + let mut layers = Vec::new(); + layers.push(env_filter.boxed()); + + // Set up OpenTelemetry only if it's configured + if let Some(opentelemetry_config) = &config.opentelemetry { + // Set up the OpenTelemetry exporter + let otlp_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(&opentelemetry_config.exporter_endpoint) + .with_timeout(Duration::from_secs( + opentelemetry_config.exporter_timeout_secs, + )); + + // Set up the OpenTelemetry tracer + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otlp_exporter) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource( + opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pyth-agent")]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .map_err(|e| anyhow::anyhow!("Error initializing open telemetry: {}", e))?; + + // Set up the telemetry layer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + layers.push(telemetry.boxed()); + } // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter. if std::io::stderr().is_terminal() { - registry.with(fmt_layer.compact()).init(); + layers.push(fmt_layer.compact().boxed()); } else { - registry.with(fmt_layer.json()).init(); + layers.push(fmt_layer.json().boxed()); } + tracing_subscriber::registry().with(layers).init(); + // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE // should be set to 1 for this otherwise it will only print the top-level error. if let Err(err) = start(config).await { From 8ce98314a42c225e18fb33bda28d35ff6eb678c2 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Wed, 17 Jul 2024 16:11:48 +0900 Subject: [PATCH 13/13] bump minor version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 145b8a7..3ee7d4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.9.0" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 4fb0240..a239c9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.9.0" +version = "2.10.0" edition = "2021" [[bin]]