diff --git a/Cargo.lock b/Cargo.lock index 054b095..6735e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.12.1" +version = "2.12.2" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index b92d9a1..7aec16e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.12.1" +version = "2.12.2" edition = "2021" [[bin]] diff --git a/config/config.toml b/config/config.toml index 51a4498..d38b5bb 100644 --- a/config/config.toml +++ b/config/config.toml @@ -73,6 +73,11 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN # takes to fetch all symbols. # oracle.max_lookup_batch_size = 100 +# Minimum time for a subscriber to run +# oracle.subscriber_finished_min_time = "30s" +# Time to sleep if the subscriber do not run for more than the minimum time +# oracle.subscriber_finished_sleep_time = "1s" + # How often to refresh the cached network state (current slot and blockhash). # It is recommended to set this to slightly less than the network's block time, # as the slot fetched will be used as the time of the price update. diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index b64dbe4..e89ccbe 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -33,10 +33,7 @@ use { }, std::{ sync::Arc, - time::{ - Duration, - Instant, - }, + time::Instant, }, tokio::task::JoinHandle, tokio_stream::StreamExt, @@ -67,6 +64,9 @@ where ))); if config.oracle.subscriber_enabled { + let min_elapsed_time = config.oracle.subscriber_finished_min_time; + let sleep_time = config.oracle.subscriber_finished_sleep_time; + handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); @@ -78,10 +78,10 @@ where ) .await { - tracing::error!(err = ?err, "Subscriber exited unexpectedly."); - if current_time.elapsed() < Duration::from_secs(30) { - tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second."); - tokio::time::sleep(Duration::from_secs(1)).await; + tracing::error!(?err, "Subscriber exited unexpectedly"); + if current_time.elapsed() < min_elapsed_time { + tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping"); + tokio::time::sleep(sleep_time).await; } } } @@ -134,7 +134,7 @@ where Oracle::handle_price_account_update(&*state, network, &pubkey, &account) .await { - tracing::error!(err = ?err, "Failed to handle account update."); + tracing::error!(?err, "Failed to handle account update"); } }); } diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 761280d..90a0fa8 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -138,6 +138,14 @@ pub struct Data { pub publisher_buffer_key: Option, } +fn default_subscriber_finished_min_time() -> Duration { + Duration::from_secs(30) +} + +fn default_subscriber_finished_sleep_time() -> Duration { + Duration::from_secs(1) +} + #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(default)] pub struct Config { @@ -159,17 +167,26 @@ pub struct Config { /// socket count at bay, the batches are looked up sequentially, /// trading off overall time it takes to fetch all symbols. pub max_lookup_batch_size: usize, + + /// Minimum time for a subscriber to run + #[serde(default = "default_subscriber_finished_min_time")] + pub subscriber_finished_min_time: Duration, + /// Time to sleep if the subscriber do not run for more than the minimum time + #[serde(default = "default_subscriber_finished_sleep_time")] + pub subscriber_finished_sleep_time: Duration, } impl Default for Config { fn default() -> Self { Self { - commitment: CommitmentLevel::Confirmed, - poll_interval_duration: Duration::from_secs(5), - subscriber_enabled: true, - updates_channel_capacity: 10000, - data_channel_capacity: 10000, - max_lookup_batch_size: 100, + commitment: CommitmentLevel::Confirmed, + poll_interval_duration: Duration::from_secs(5), + subscriber_enabled: true, + updates_channel_capacity: 10000, + data_channel_capacity: 10000, + max_lookup_batch_size: 100, + subscriber_finished_min_time: default_subscriber_finished_min_time(), + subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(), } } } @@ -241,6 +258,7 @@ where ); data.price_accounts.insert(*account_key, price_entry.into()); + drop(data); Prices::update_global_price( self, @@ -333,13 +351,16 @@ where let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; + let data_publisher_permissions = data.publisher_permissions.clone(); + let data_publisher_buffer_key = data.publisher_buffer_key; + drop(data); Exporter::update_on_chain_state( self, network, publish_keypair, - data.publisher_permissions.clone(), - data.publisher_buffer_key, + data_publisher_permissions, + data_publisher_buffer_key, ) .await?;