Skip to content

Commit 669cb91

Browse files
committed
refactor(agent): move notifier into services
1 parent fee170f commit 669cb91

File tree

5 files changed

+45
-37
lines changed

5 files changed

+45
-37
lines changed

src/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ use {
6666
config::Config,
6767
pyth::rpc,
6868
solana::network,
69-
state::notifier,
7069
},
7170
anyhow::Result,
7271
futures_util::future::join_all,
@@ -79,6 +78,7 @@ pub mod legacy_schedule;
7978
pub mod market_schedule;
8079
pub mod metrics;
8180
pub mod pyth;
81+
pub mod services;
8282
pub mod solana;
8383
pub mod state;
8484

@@ -144,7 +144,7 @@ impl Agent {
144144
}
145145

146146
// Create the Notifier task for the Pythd RPC.
147-
jhs.push(tokio::spawn(notifier(state.clone())));
147+
jhs.push(tokio::spawn(services::notifier(state.clone())));
148148

149149
// Spawn the Pythd API Server
150150
jhs.push(tokio::spawn(rpc::run(

src/agent/services.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod notifier;
2+
3+
pub use notifier::notifier;

src/agent/services/notifier.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//! Notifier
2+
//!
3+
//! The notifier is responsible for notifying subscribers who have registered
4+
//! for price sched updates.
5+
6+
use {
7+
crate::agent::state::Prices,
8+
std::sync::Arc,
9+
};
10+
11+
pub async fn notifier<S>(state: Arc<S>)
12+
where
13+
S: Prices,
14+
{
15+
let mut interval = tokio::time::interval(state.notify_interval_duration());
16+
let mut exit = crate::agent::EXIT.subscribe();
17+
loop {
18+
Prices::drop_closed_subscriptions(&*state).await;
19+
tokio::select! {
20+
_ = exit.changed() => {
21+
tracing::info!("Shutdown signal received.");
22+
return;
23+
}
24+
_ = interval.tick() => {
25+
if let Err(err) = state.send_notify_price_sched().await {
26+
tracing::error!(err = ?err, "Notifier: failed to send notify price sched.");
27+
}
28+
}
29+
}
30+
}
31+
}

src/agent/state.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ pub mod api;
1919
pub mod global;
2020
pub mod keypairs;
2121
pub mod local;
22-
pub use api::{
23-
notifier,
24-
Prices,
25-
};
22+
pub use api::Prices;
2623

2724
#[derive(Clone, Serialize, Deserialize, Debug)]
2825
#[serde(default)]
@@ -92,7 +89,6 @@ mod tests {
9289
self,
9390
AllAccountsData,
9491
},
95-
notifier,
9692
Config,
9793
Prices,
9894
State,
@@ -108,6 +104,7 @@ mod tests {
108104
ProductAccountMetadata,
109105
PublisherAccount,
110106
},
107+
services::notifier,
111108
solana::{
112109
self,
113110
network::Network,

src/agent/state/api.rs

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,7 @@ use {
4646
},
4747
std::{
4848
collections::HashMap,
49-
sync::{
50-
atomic::AtomicI64,
51-
Arc,
52-
},
49+
sync::atomic::AtomicI64,
5350
time::Duration,
5451
},
5552
tokio::sync::{
@@ -171,7 +168,6 @@ pub trait Prices {
171168
account_pubkey: &solana_sdk::pubkey::Pubkey,
172169
notify_price_sched_tx: mpsc::Sender<NotifyPriceSched>,
173170
) -> SubscriptionID;
174-
fn next_subscription_id(&self) -> SubscriptionID;
175171
async fn subscribe_price(
176172
&self,
177173
account: &solana_sdk::pubkey::Pubkey,
@@ -187,7 +183,8 @@ pub trait Prices {
187183
status: String,
188184
) -> Result<()>;
189185
async fn update_global_price(&self, network: Network, update: &Update) -> Result<()>;
190-
// TODO: implement FromStr method on PriceStatus
186+
fn notify_interval_duration(&self) -> Duration;
187+
fn next_subscription_id(&self) -> SubscriptionID;
191188
fn map_status(status: &str) -> Result<PriceStatus>;
192189
}
193190

@@ -433,6 +430,10 @@ where
433430
}
434431
}
435432

433+
fn notify_interval_duration(&self) -> Duration {
434+
self.into().notify_price_sched_interval_duration
435+
}
436+
436437
// TODO: implement FromStr method on PriceStatus
437438
fn map_status(status: &str) -> Result<PriceStatus> {
438439
match status {
@@ -445,27 +446,3 @@ where
445446
}
446447
}
447448
}
448-
449-
pub async fn notifier<S>(state: Arc<S>)
450-
where
451-
for<'a> &'a S: Into<&'a PricesState>,
452-
S: Prices,
453-
{
454-
let prices: &PricesState = (&*state).into();
455-
let mut interval = tokio::time::interval(prices.notify_price_sched_interval_duration);
456-
let mut exit = crate::agent::EXIT.subscribe();
457-
loop {
458-
Prices::drop_closed_subscriptions(&*state).await;
459-
tokio::select! {
460-
_ = exit.changed() => {
461-
tracing::info!("Shutdown signal received.");
462-
return;
463-
}
464-
_ = interval.tick() => {
465-
if let Err(err) = state.send_notify_price_sched().await {
466-
tracing::error!(err = ?err, "Notifier: failed to send notify price sched.");
467-
}
468-
}
469-
}
470-
}
471-
}

0 commit comments

Comments
 (0)