Skip to content

Commit 05ef724

Browse files
Reisenali-behjati
andauthored
refactor(agent): extract oracle component/service (#128)
* refactor(agent): move notifier into services * refactor(agent): move keypairs into services * refactor(agent): extract config module * refactor(agent): extract oracle component/service * refactor(agent): extract exporter component/service * chore: bump version --------- Co-authored-by: Ali Behjati <bahjatia@gmail.com>
1 parent fee170f commit 05ef724

20 files changed

+2552
-1359
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "2.8.0"
3+
version = "2.9.0"
44
edition = "2021"
55

66
[[bin]]

src/agent.rs

Lines changed: 24 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,23 @@ Note that there is an Oracle and Exporter for each network, but only one Local S
6363
################################################################################################################################## */
6464
use {
6565
self::{
66-
config::Config,
6766
pyth::rpc,
6867
solana::network,
69-
state::notifier,
7068
},
7169
anyhow::Result,
70+
config::Config,
7271
futures_util::future::join_all,
7372
lazy_static::lazy_static,
7473
std::sync::Arc,
7574
tokio::sync::watch,
7675
};
7776

77+
pub mod config;
7878
pub mod legacy_schedule;
7979
pub mod market_schedule;
8080
pub mod metrics;
8181
pub mod pyth;
82+
pub mod services;
8283
pub mod solana;
8384
pub mod state;
8485

@@ -125,26 +126,38 @@ impl Agent {
125126
let mut jhs = vec![];
126127

127128
// Create the Application State.
128-
let state = Arc::new(state::State::new(self.config.state.clone()).await);
129+
let state = Arc::new(state::State::new(&self.config).await);
129130

130-
// Spawn the primary network
131-
jhs.extend(network::spawn_network(
131+
// Spawn the primary network Oracle.
132+
jhs.push(tokio::spawn(services::oracle(
132133
self.config.primary_network.clone(),
133134
network::Network::Primary,
134135
state.clone(),
135-
)?);
136+
)));
137+
138+
jhs.push(tokio::spawn(services::exporter(
139+
self.config.primary_network.clone(),
140+
network::Network::Primary,
141+
state.clone(),
142+
)));
136143

137-
// Spawn the secondary network, if needed
144+
// Spawn the secondary network Oracle, if needed.
138145
if let Some(config) = &self.config.secondary_network {
139-
jhs.extend(network::spawn_network(
146+
jhs.push(tokio::spawn(services::oracle(
140147
config.clone(),
141148
network::Network::Secondary,
142149
state.clone(),
143-
)?);
150+
)));
151+
152+
jhs.push(tokio::spawn(services::exporter(
153+
config.clone(),
154+
network::Network::Secondary,
155+
state.clone(),
156+
)));
144157
}
145158

146159
// Create the Notifier task for the Pythd RPC.
147-
jhs.push(tokio::spawn(notifier(state.clone())));
160+
jhs.push(tokio::spawn(services::notifier(state.clone())));
148161

149162
// Spawn the Pythd API Server
150163
jhs.push(tokio::spawn(rpc::run(
@@ -159,7 +172,7 @@ impl Agent {
159172

160173
// Spawn the remote keypair loader endpoint for both networks
161174
jhs.append(
162-
&mut state::keypairs::spawn(
175+
&mut services::keypairs(
163176
self.config.primary_network.rpc_url.clone(),
164177
self.config
165178
.secondary_network
@@ -177,90 +190,3 @@ impl Agent {
177190
Ok(())
178191
}
179192
}
180-
181-
pub mod config {
182-
use {
183-
super::{
184-
metrics,
185-
pyth,
186-
solana::network,
187-
state,
188-
},
189-
anyhow::Result,
190-
config as config_rs,
191-
config_rs::{
192-
Environment,
193-
File,
194-
},
195-
serde::Deserialize,
196-
std::path::Path,
197-
};
198-
199-
/// Configuration for all components of the Agent
200-
#[derive(Deserialize, Debug)]
201-
pub struct Config {
202-
#[serde(default)]
203-
pub channel_capacities: ChannelCapacities,
204-
pub primary_network: network::Config,
205-
pub secondary_network: Option<network::Config>,
206-
#[serde(default)]
207-
#[serde(rename = "pythd_adapter")]
208-
pub state: state::Config,
209-
#[serde(default)]
210-
pub pythd_api_server: pyth::rpc::Config,
211-
#[serde(default)]
212-
pub metrics_server: metrics::Config,
213-
#[serde(default)]
214-
pub remote_keypair_loader: state::keypairs::Config,
215-
}
216-
217-
impl Config {
218-
pub fn new(config_file: impl AsRef<Path>) -> Result<Self> {
219-
// Build a new configuration object, allowing the default values to be
220-
// overridden by those in the config_file or "AGENT_"-prefixed environment
221-
// variables.
222-
config_rs::Config::builder()
223-
.add_source(File::from(config_file.as_ref()))
224-
.add_source(Environment::with_prefix("agent"))
225-
.build()?
226-
.try_deserialize()
227-
.map_err(|e| e.into())
228-
}
229-
}
230-
231-
/// Capacities of the channels top-level components use to communicate
232-
#[derive(Deserialize, Debug)]
233-
pub struct ChannelCapacities {
234-
/// Capacity of the channel used to broadcast shutdown events to all components
235-
pub shutdown: usize,
236-
/// Capacity of the channel used to send updates from the primary Oracle to the Global Store
237-
pub primary_oracle_updates: usize,
238-
/// Capacity of the channel used to send updates from the secondary Oracle to the Global Store
239-
pub secondary_oracle_updates: usize,
240-
/// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store
241-
pub global_store_lookup: usize,
242-
/// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store
243-
pub local_store_lookup: usize,
244-
/// Capacity of the channel on which the Local Store receives messages
245-
pub local_store: usize,
246-
/// Capacity of the channel on which the Pythd API Adapter receives messages
247-
pub pythd_adapter: usize,
248-
/// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog
249-
pub logger_buffer: usize,
250-
}
251-
252-
impl Default for ChannelCapacities {
253-
fn default() -> Self {
254-
Self {
255-
shutdown: 10000,
256-
primary_oracle_updates: 10000,
257-
secondary_oracle_updates: 10000,
258-
global_store_lookup: 10000,
259-
local_store_lookup: 10000,
260-
local_store: 10000,
261-
pythd_adapter: 10000,
262-
logger_buffer: 10000,
263-
}
264-
}
265-
}
266-
}

src/agent/config.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use {
2+
super::{
3+
metrics,
4+
pyth,
5+
services,
6+
solana::network,
7+
state,
8+
},
9+
anyhow::Result,
10+
config as config_rs,
11+
config_rs::{
12+
Environment,
13+
File,
14+
},
15+
serde::Deserialize,
16+
std::path::Path,
17+
};
18+
19+
/// Configuration for all components of the Agent
20+
#[derive(Deserialize, Debug)]
21+
pub struct Config {
22+
#[serde(default)]
23+
pub channel_capacities: ChannelCapacities,
24+
pub primary_network: network::Config,
25+
pub secondary_network: Option<network::Config>,
26+
#[serde(default)]
27+
#[serde(rename = "pythd_adapter")]
28+
pub state: state::Config,
29+
#[serde(default)]
30+
pub pythd_api_server: pyth::rpc::Config,
31+
#[serde(default)]
32+
pub metrics_server: metrics::Config,
33+
#[serde(default)]
34+
pub remote_keypair_loader: services::keypairs::Config,
35+
}
36+
37+
impl Config {
38+
pub fn new(config_file: impl AsRef<Path>) -> Result<Self> {
39+
// Build a new configuration object, allowing the default values to be
40+
// overridden by those in the config_file or "AGENT_"-prefixed environment
41+
// variables.
42+
config_rs::Config::builder()
43+
.add_source(File::from(config_file.as_ref()))
44+
.add_source(Environment::with_prefix("agent"))
45+
.build()?
46+
.try_deserialize()
47+
.map_err(|e| e.into())
48+
}
49+
}
50+
51+
/// Capacities of the channels top-level components use to communicate
52+
#[derive(Deserialize, Debug)]
53+
pub struct ChannelCapacities {
54+
/// Capacity of the channel used to broadcast shutdown events to all components
55+
pub shutdown: usize,
56+
/// Capacity of the channel used to send updates from the primary Oracle to the Global Store
57+
pub primary_oracle_updates: usize,
58+
/// Capacity of the channel used to send updates from the secondary Oracle to the Global Store
59+
pub secondary_oracle_updates: usize,
60+
/// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store
61+
pub global_store_lookup: usize,
62+
/// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store
63+
pub local_store_lookup: usize,
64+
/// Capacity of the channel on which the Local Store receives messages
65+
pub local_store: usize,
66+
/// Capacity of the channel on which the Pythd API Adapter receives messages
67+
pub pythd_adapter: usize,
68+
/// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog
69+
pub logger_buffer: usize,
70+
}
71+
72+
impl Default for ChannelCapacities {
73+
fn default() -> Self {
74+
Self {
75+
shutdown: 10000,
76+
primary_oracle_updates: 10000,
77+
secondary_oracle_updates: 10000,
78+
global_store_lookup: 10000,
79+
local_store_lookup: 10000,
80+
local_store: 10000,
81+
pythd_adapter: 10000,
82+
logger_buffer: 10000,
83+
}
84+
}
85+
}

src/agent/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
super::state::local::PriceInfo,
3-
crate::agent::solana::oracle::PriceEntry,
3+
crate::agent::state::oracle::PriceEntry,
44
lazy_static::lazy_static,
55
prometheus_client::{
66
encoding::{

src/agent/services.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
pub mod exporter;
2+
pub mod keypairs;
3+
pub mod notifier;
4+
pub mod oracle;
5+
6+
pub use {
7+
exporter::exporter,
8+
keypairs::keypairs,
9+
notifier::notifier,
10+
oracle::oracle,
11+
};

0 commit comments

Comments
 (0)