Skip to content

refactor(agent): add tracing to expensive calls #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use {
lazy_static::lazy_static,
std::sync::Arc,
tokio::sync::watch,
tracing::instrument,
};

pub mod config;
Expand Down Expand Up @@ -121,58 +122,59 @@ impl Agent {
};
}

#[instrument(skip(self))]
async fn spawn(&self) -> Result<()> {
// job handles
let mut jhs = vec![];
let mut handles = vec![];

// Create the Application State.
let state = Arc::new(state::State::new(&self.config).await);

// Spawn the primary network Oracle.
jhs.push(tokio::spawn(services::oracle(
handles.extend(services::oracle(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
)));
));

jhs.push(tokio::spawn(services::exporter(
handles.extend(services::exporter(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
)));
));

// Spawn the secondary network Oracle, if needed.
if let Some(config) = &self.config.secondary_network {
jhs.push(tokio::spawn(services::oracle(
handles.extend(services::oracle(
config.clone(),
network::Network::Secondary,
state.clone(),
)));
));

jhs.push(tokio::spawn(services::exporter(
handles.extend(services::exporter(
config.clone(),
network::Network::Secondary,
state.clone(),
)));
));
}

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(services::notifier(state.clone())));
handles.push(tokio::spawn(services::notifier(state.clone())));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
handles.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
state.clone(),
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::spawn(
handles.push(tokio::spawn(metrics::spawn(
self.config.metrics_server.bind_address,
)));

// Spawn the remote keypair loader endpoint for both networks
jhs.append(
&mut services::keypairs(
handles.extend(
services::keypairs(
self.config.primary_network.rpc_url.clone(),
self.config
.secondary_network
Expand All @@ -185,7 +187,7 @@ impl Agent {
);

// Wait for all tasks to complete
join_all(jhs).await;
join_all(handles).await;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use {
sync::Arc,
},
tokio::sync::mpsc,
tracing::instrument,
warp::{
ws::{
Message,
Expand Down Expand Up @@ -411,6 +412,7 @@ impl Default for Config {
}
}

#[instrument(skip_all)]
pub async fn run<S>(config: Config, state: Arc<S>)
where
S: state::Prices,
Expand Down
4 changes: 4 additions & 0 deletions src/agent/pyth/rpc/update_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use {
Request,
Value,
},
tracing::instrument,
};

#[instrument(skip_all, fields(account))]
pub async fn update_price<S>(
state: &S,
request: &Request<Method, Value>,
Expand All @@ -28,6 +30,8 @@ where
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

tracing::Span::current().record("account", params.account.to_string());

state
.update_local_price(
&params.account.parse::<solana_sdk::pubkey::Pubkey>()?,
Expand Down
34 changes: 30 additions & 4 deletions src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use {
},
tokio::{
sync::watch,
task::JoinHandle,
time::Interval,
},
tracing::instrument,
};

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -118,6 +120,13 @@ struct NetworkStateQuerier {
}

impl NetworkStateQuerier {
#[instrument(
skip(rpc_endpoint, rpc_timeout, query_interval),
fields(
rpc_timeout = rpc_timeout.as_millis(),
query_interval = query_interval.period().as_millis(),
)
)]
pub fn new(
rpc_endpoint: &str,
rpc_timeout: Duration,
Expand All @@ -140,6 +149,7 @@ impl NetworkStateQuerier {
}
}

#[instrument(skip(self))]
async fn query_network_state(&mut self) -> Result<()> {
// Fetch the blockhash and current slot in parallel
let current_slot_future = self
Expand All @@ -160,12 +170,15 @@ impl NetworkStateQuerier {
}
}

pub async fn exporter<S>(config: network::Config, network: Network, state: Arc<S>)
#[instrument(skip(config, state))]
pub fn exporter<S>(config: network::Config, network: Network, state: Arc<S>) -> Vec<JoinHandle<()>>
where
S: Exporter,
S: Transactions,
S: Send + Sync + 'static,
{
let mut handles = Vec::new();

// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
let mut network_state_querier = NetworkStateQuerier::new(
Expand All @@ -175,12 +188,23 @@ where
network_state_tx,
);

tokio::spawn(transaction_monitor::transaction_monitor(
handles.push(tokio::spawn(transaction_monitor::transaction_monitor(
config.clone(),
state.clone(),
)));

handles.push(tokio::spawn(exporter::exporter(
config,
network,
state,
network_state_rx,
)));

handles.push(tokio::spawn(
async move { network_state_querier.run().await },
));
tokio::spawn(exporter::exporter(config, network, state, network_state_rx));
tokio::spawn(async move { network_state_querier.run().await });

handles
}

mod exporter {
Expand Down Expand Up @@ -294,6 +318,7 @@ mod transaction_monitor {
sync::Arc,
time::Duration,
},
tracing::instrument,
};

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand All @@ -318,6 +343,7 @@ mod transaction_monitor {
}
}

#[instrument(skip(config, state))]
pub async fn transaction_monitor<S>(config: network::Config, state: Arc<S>)
where
S: Transactions,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/services/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
use {
crate::agent::state::Prices,
std::sync::Arc,
tracing::instrument,
};

#[instrument(skip(state))]
pub async fn notifier<S>(state: Arc<S>)
where
S: Prices,
Expand Down
21 changes: 15 additions & 6 deletions src/agent/services/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,35 @@ use {
Instant,
},
},
tokio::task::JoinHandle,
tokio_stream::StreamExt,
tracing::instrument,
};

pub async fn oracle<S>(config: Config, network: Network, state: Arc<S>)
#[instrument(skip(config, state))]
pub fn oracle<S>(config: Config, network: Network, state: Arc<S>) -> Vec<JoinHandle<()>>
where
S: Oracle,
S: Send + Sync + 'static,
{
let mut handles = Vec::new();

let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
tracing::warn!("Key store not available, Oracle won't start.");
return;
return handles;
};

tokio::spawn(poller(
handles.push(tokio::spawn(poller(
config.clone(),
network,
state.clone(),
key_store.mapping_key,
key_store.publish_keypair,
config.oracle.max_lookup_batch_size,
));
)));

if config.oracle.subscriber_enabled {
tokio::spawn(async move {
handles.push(tokio::spawn(async move {
loop {
let current_time = Instant::now();
if let Err(ref err) = subscriber(
Expand All @@ -79,15 +84,18 @@ where
}
}
}
});
}));
}

handles
}

/// When an account RPC Subscription update is receiveed.
///
/// We check if the account is one we're aware of and tracking, and if so, spawn
/// a small background task that handles that update. We only do this for price
/// accounts, all other accounts are handled below in the poller.
#[instrument(skip(config, state))]
async fn subscriber<S>(
config: Config,
network: Network,
Expand Down Expand Up @@ -144,6 +152,7 @@ where
}

/// On poll lookup all Pyth Mapping/Product/Price accounts and sync.
#[instrument(skip(config, state))]
async fn poller<S>(
config: Config,
network: Network,
Expand Down
5 changes: 5 additions & 0 deletions src/agent/state/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use {
mpsc,
RwLock,
},
tracing::instrument,
};

// TODO: implement Display on PriceStatus and then just call PriceStatus::to_string
Expand Down Expand Up @@ -382,6 +383,10 @@ where
.map_err(|_| anyhow!("failed to send update to local store"))
}

#[instrument(skip(self, update), fields(update = match update {
Update::ProductAccountUpdate { account_key, .. } => account_key,
Update::PriceAccountUpdate { account_key, .. } => account_key,
}.to_string()))]
async fn update_global_price(&self, network: Network, update: &Update) -> Result<()> {
GlobalStore::update(self, network, update)
.await
Expand Down
Loading
Loading