Skip to content

Commit 98ee33a

Browse files
committed
refactor(agent): extract oracle component/service
1 parent 8edbea1 commit 98ee33a

File tree

11 files changed

+936
-962
lines changed

11 files changed

+936
-962
lines changed

src/agent.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,45 @@ impl Agent {
128128
// Create the Application State.
129129
let state = Arc::new(state::State::new(self.config.state.clone()).await);
130130

131-
// Spawn the primary network
132-
jhs.extend(network::spawn_network(
133-
self.config.primary_network.clone(),
134-
network::Network::Primary,
135-
state.clone(),
136-
)?);
131+
// Spawn the primary network Oracle.
132+
{
133+
// Publisher permissions updates between oracle and exporter
134+
let (publisher_permissions_tx, publisher_permissions_rx) =
135+
watch::channel(<_>::default());
136+
137+
jhs.push(tokio::spawn(services::oracle(
138+
self.config.primary_network.clone(),
139+
network::Network::Primary,
140+
state.clone(),
141+
publisher_permissions_tx.clone(),
142+
)));
143+
144+
// Spawn the primary network
145+
jhs.extend(network::spawn_network(
146+
self.config.primary_network.clone(),
147+
network::Network::Primary,
148+
state.clone(),
149+
publisher_permissions_rx.clone(),
150+
)?);
151+
}
137152

138-
// Spawn the secondary network, if needed
153+
// Spawn the secondary network Oracle, if needed.
139154
if let Some(config) = &self.config.secondary_network {
155+
let (publisher_permissions_tx, publisher_permissions_rx) =
156+
watch::channel(<_>::default());
157+
158+
jhs.push(tokio::spawn(services::oracle(
159+
config.clone(),
160+
network::Network::Secondary,
161+
state.clone(),
162+
publisher_permissions_tx.clone(),
163+
)));
164+
140165
jhs.extend(network::spawn_network(
141166
config.clone(),
142167
network::Network::Secondary,
143168
state.clone(),
169+
publisher_permissions_rx,
144170
)?);
145171
}
146172

src/agent/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
super::state::local::PriceInfo,
3-
crate::agent::solana::oracle::PriceEntry,
3+
crate::agent::state::oracle::PriceEntry,
44
lazy_static::lazy_static,
55
prometheus_client::{
66
encoding::{

src/agent/services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pub mod keypairs;
22
pub mod notifier;
3+
pub mod oracle;
34

45
pub use {
56
keypairs::keypairs,
67
notifier::notifier,
8+
oracle::oracle,
79
};

src/agent/services/oracle.rs

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
//! Oracle
2+
//!
3+
//! The Oracle service is respoinsible for reacting to all remote/on-chain events.
4+
5+
use {
6+
crate::agent::{
7+
solana::{
8+
key_store::KeyStore,
9+
network::{
10+
Config,
11+
Network,
12+
},
13+
},
14+
state::oracle::{
15+
Oracle,
16+
PricePublishingMetadata,
17+
},
18+
},
19+
anyhow::Result,
20+
solana_account_decoder::UiAccountEncoding,
21+
solana_client::{
22+
nonblocking::{
23+
pubsub_client::PubsubClient,
24+
rpc_client::RpcClient,
25+
},
26+
rpc_config::{
27+
RpcAccountInfoConfig,
28+
RpcProgramAccountsConfig,
29+
},
30+
rpc_response::{
31+
Response,
32+
RpcKeyedAccount,
33+
},
34+
},
35+
solana_sdk::{
36+
account::Account,
37+
commitment_config::CommitmentConfig,
38+
pubkey::Pubkey,
39+
},
40+
std::{
41+
collections::HashMap,
42+
sync::Arc,
43+
},
44+
tokio::{
45+
select,
46+
sync::watch::Sender,
47+
},
48+
tokio_stream::StreamExt,
49+
};
50+
51+
pub async fn oracle<S>(
52+
config: Config,
53+
network: Network,
54+
state: Arc<S>,
55+
publisher_permissions_tx: Sender<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,
56+
) where
57+
S: Oracle,
58+
S: Send + Sync + 'static,
59+
{
60+
loop {
61+
let _ = run(
62+
config.clone(),
63+
network,
64+
state.clone(),
65+
publisher_permissions_tx.clone(),
66+
)
67+
.await;
68+
}
69+
}
70+
71+
async fn run<S>(
72+
config: Config,
73+
network: Network,
74+
state: Arc<S>,
75+
publisher_permissions_tx: Sender<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,
76+
) -> Result<()>
77+
where
78+
S: Oracle,
79+
S: Send + Sync + 'static,
80+
{
81+
let key_store = KeyStore::new(config.key_store.clone())?;
82+
let mut poll_interval = tokio::time::interval(config.oracle.poll_interval_duration);
83+
84+
// Setup PubsubClient to listen for account changes on the Oracle program.
85+
let client = PubsubClient::new(config.wss_url.as_str()).await?;
86+
let (mut notifier, _unsub) = {
87+
let program_key = key_store.program_key;
88+
let commitment = config.oracle.commitment;
89+
let config = RpcProgramAccountsConfig {
90+
account_config: RpcAccountInfoConfig {
91+
commitment: Some(CommitmentConfig { commitment }),
92+
encoding: Some(UiAccountEncoding::Base64Zstd),
93+
..Default::default()
94+
},
95+
filters: None,
96+
with_context: Some(true),
97+
};
98+
client.program_subscribe(&program_key, Some(config)).await
99+
}?;
100+
101+
// Setup an RpcClient for manual polling.
102+
let client = {
103+
Arc::new(RpcClient::new_with_timeout_and_commitment(
104+
config.rpc_url,
105+
config.rpc_timeout,
106+
CommitmentConfig {
107+
commitment: config.oracle.commitment,
108+
},
109+
))
110+
};
111+
112+
loop {
113+
select! {
114+
update = notifier.next() => handle_account_update(state.clone(), network, update).await,
115+
_ = poll_interval.tick() => handle_poll_updates(
116+
state.clone(),
117+
network,
118+
key_store.mapping_key,
119+
client.clone(),
120+
config.oracle.max_lookup_batch_size,
121+
config.oracle.subscriber_enabled,
122+
publisher_permissions_tx.clone(),
123+
).await,
124+
}
125+
}
126+
}
127+
128+
/// When an account RPC Subscription update is receiveed.
129+
///
130+
/// We check if the account is one we're aware of and tracking, and if so, spawn
131+
/// a small background task that handles that update. We only do this for price
132+
/// accounts, all other accounts are handled below in the poller.
133+
async fn handle_account_update<S>(
134+
state: Arc<S>,
135+
network: Network,
136+
update: Option<Response<RpcKeyedAccount>>,
137+
) where
138+
S: Oracle,
139+
S: Send + Sync + 'static,
140+
{
141+
match update {
142+
Some(update) => {
143+
match update.value.account.decode::<Account>() {
144+
Some(account) => {
145+
tokio::spawn(async move {
146+
if let Err(err) = async move {
147+
let key: Pubkey = update.value.pubkey.as_str().try_into()?;
148+
Oracle::handle_price_account_update(&*state, network, &key, &account)
149+
.await
150+
}
151+
.await
152+
{
153+
tracing::error!(err = ?err, "Failed to handle account update.");
154+
}
155+
});
156+
}
157+
158+
None => {
159+
tracing::error!(
160+
update = ?update,
161+
"Failed to decode account from update.",
162+
);
163+
}
164+
};
165+
}
166+
None => {
167+
tracing::debug!("subscriber closed connection");
168+
}
169+
}
170+
}
171+
172+
/// On poll lookup all Pyth Mapping/Product/Price accounts and sync.
173+
async fn handle_poll_updates<S>(
174+
state: Arc<S>,
175+
network: Network,
176+
mapping_key: Pubkey,
177+
rpc: Arc<RpcClient>,
178+
max_lookup_batch_size: usize,
179+
enabled: bool,
180+
publisher_permissions_tx: Sender<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,
181+
) where
182+
S: Oracle,
183+
S: Send + Sync + 'static,
184+
{
185+
if !enabled {
186+
return;
187+
}
188+
189+
tokio::spawn(async move {
190+
if let Err(err) = async {
191+
Oracle::poll_updates(
192+
&*state,
193+
mapping_key,
194+
&rpc,
195+
max_lookup_batch_size,
196+
publisher_permissions_tx,
197+
)
198+
.await?;
199+
Oracle::sync_global_store(&*state, network).await
200+
}
201+
.await
202+
{
203+
tracing::error!(err = ?err, "Failed to handle poll updates.");
204+
}
205+
});
206+
}

src/agent/solana.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
pub mod exporter;
2-
pub mod oracle;
32

43
/// This module encapsulates all the interaction with a single Solana network:
54
/// - The Oracle, which reads data from the network
@@ -12,15 +11,22 @@ pub mod network {
1211
self,
1312
KeyStore,
1413
},
15-
oracle,
1614
},
17-
crate::agent::state::State,
15+
crate::agent::state::{
16+
oracle::{
17+
self,
18+
PricePublishingMetadata,
19+
},
20+
State,
21+
},
1822
anyhow::Result,
1923
serde::{
2024
Deserialize,
2125
Serialize,
2226
},
27+
solana_sdk::pubkey::Pubkey,
2328
std::{
29+
collections::HashMap,
2430
sync::Arc,
2531
time::Duration,
2632
},
@@ -70,25 +76,51 @@ pub mod network {
7076
pub exporter: exporter::Config,
7177
}
7278

79+
/// Spawn an Oracle, in-progress porting this to State.
80+
///
81+
/// Behaviour:
82+
/// - Spawns Oracle: (Obsolete, now Extracted to state/oracle.rs)
83+
/// - Spawns a Subscriber:
84+
/// o Subscribes to the Oracle program key.
85+
/// o Decodes account events related to the Oracle.
86+
/// o Sends update.
87+
/// - Spawns a Poller:
88+
/// o Fetches Mapping Accounts
89+
/// o Iterates Product+Price Accounts
90+
/// o Sends update.
91+
/// - Oracle then Listens for Updates from Subscriber
92+
/// o Filters for Price Account Updates.
93+
/// o Stores its own copy of the Price Account.
94+
/// o Updates the Global Store for that Price Account.
95+
/// - Oracle also Listens for Updates from Poller
96+
/// o Tracks if any new Mapping Accounts were found.
97+
/// o Update Local Data
98+
/// o Updates entire Global Store View.
99+
/// - Spawns Exporter:
100+
/// - Spawns NetworkQuerier
101+
/// - Queries BlockHash in a timer.
102+
/// - Sends BlockHash + Slot
103+
/// - Spawns Transaction Monitor:
104+
/// - Listens for for Transactions
105+
/// - Adds to tracked Transactions
106+
/// - Responds to queries about Tx status.
107+
/// - Spawns Exporter
108+
/// - On Publish tick: pushes updates to the network as a batch.
109+
/// - On Compute Unit Price Tick: calculates new median price fee from recent
110+
///
111+
/// Plan:
112+
/// - Subscriber & Poller Can Be Spawnable Tasks
113+
/// - Oracle becomes a State API
114+
/// -
73115
pub fn spawn_network(
74116
config: Config,
75117
network: Network,
76118
state: Arc<State>,
119+
publisher_permissions_rx: watch::Receiver<
120+
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
121+
>,
77122
) -> Result<Vec<JoinHandle<()>>> {
78-
// Publisher permissions updates between oracle and exporter
79-
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
80-
81-
// Spawn the Oracle
82-
let mut jhs = oracle::spawn_oracle(
83-
config.oracle.clone(),
84-
network,
85-
&config.rpc_url,
86-
&config.wss_url,
87-
config.rpc_timeout,
88-
publisher_permissions_tx,
89-
KeyStore::new(config.key_store.clone())?,
90-
state.clone(),
91-
);
123+
let mut jhs = vec![];
92124

93125
// Spawn the Exporter
94126
let exporter_jhs = exporter::spawn_exporter(
@@ -108,7 +140,7 @@ pub mod network {
108140
}
109141

110142
/// The key_store module is responsible for parsing the pythd key store.
111-
mod key_store {
143+
pub mod key_store {
112144
use {
113145
anyhow::Result,
114146
serde::{

src/agent/solana/exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use {
33
super::{
44
key_store,
55
network::Network,
6-
oracle::PricePublishingMetadata,
76
},
87
crate::agent::state::{
98
global::GlobalStore,
@@ -12,6 +11,7 @@ use {
1211
LocalStore,
1312
PriceInfo,
1413
},
14+
oracle::PricePublishingMetadata,
1515
State,
1616
},
1717
anyhow::{

0 commit comments

Comments
 (0)