Skip to content

Commit 660b216

Browse files
authored
refactor(agent): various refactors (#126)
* refactor(agent): convert keypair service to api * refactor(agent): use exit flag everywhere * refactor(agent): StateApi -> Prices and refactor module The StateApi is left over from the initial Adapter, but all functionality is for pricing/product accounts. This refactors that module and fixes the cyclic dependency between it and GlobalStore. The new logic performs updates within the Prices API (Which is where the state relevant to subscriptions already was, so is the better place for it). File rename left for a future commit to keep the diffs clean. * refactor(agent): refactor all references to adapter to state * refactor(agent): remove pythd module, raise pyth module * refactor(agent): remove store module * refactor(agent): convert to a tracing logger
1 parent 0cd76df commit 660b216

28 files changed

+1090
-1397
lines changed

Cargo.lock

Lines changed: 94 additions & 230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,11 @@ solana-account-decoder = "1.18.8"
3333
solana-client = "1.18.8"
3434
solana-sdk = "1.18.8"
3535
bincode = "1.3.3"
36-
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] }
37-
slog-term = "2.9.1"
3836
rand = "0.8.5"
39-
slog-async = "2.8.0"
4037
config = "0.14.0"
4138
thiserror = "1.0.58"
4239
clap = { version = "4.5.4", features = ["derive"] }
4340
humantime-serde = "1.1.1"
44-
slog-envlogger = "2.2.0"
4541
serde-this-or-that = "0.4.2"
4642
# The public typed-html 0.2.2 release is causing a recursion limit
4743
# error that cannot be fixed from outside the crate.
@@ -52,17 +48,17 @@ humantime = "2.1.0"
5248
prometheus-client = "0.22.2"
5349
lazy_static = "1.4.0"
5450
toml_edit = "0.22.9"
55-
slog-bunyan = "2.5.0"
5651
winnow = "0.6.5"
5752
proptest = "1.4.0"
53+
tracing = { version = "0.1.40", features = ["log"] }
54+
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
5855

5956
[dev-dependencies]
6057
tokio-util = { version = "0.7.10", features = ["full"] }
6158
soketto = "0.8.0"
6259
portpicker = "0.1.1"
6360
rand = "0.8.5"
6461
tokio-retry = "0.3.0"
65-
slog-extlog = "8.1.0"
6662
iobuffer = "0.2.0"
6763

6864
[profile.release]

integration-tests/tests/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ async def test_update_price_discards_unpermissioned(self, client: PythAgentClien
695695
lines_found += 1
696696
expected_unperm_pubkey = final_price_account_unperm["account"]
697697
# Must point at the expected account as all other attempts must be valid
698-
assert f"price_account: {expected_unperm_pubkey}" in line
698+
assert f'"unpermissioned_price_account":"{expected_unperm_pubkey}"' in line
699699

700700
# Must find at least one log discarding the account
701701
assert lines_found > 0

src/agent.rs

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -61,32 +61,40 @@ Metrics Server:
6161
Note that there is an Oracle and Exporter for each network, but only one Local Store and Global Store.
6262
6363
################################################################################################################################## */
64-
65-
pub mod legacy_schedule;
66-
pub mod market_schedule;
67-
pub mod metrics;
68-
pub mod pythd;
69-
pub mod remote_keypair_loader;
70-
pub mod solana;
71-
pub mod state;
72-
pub mod store;
7364
use {
7465
self::{
7566
config::Config,
76-
pythd::api::rpc,
67+
pyth::rpc,
7768
solana::network,
7869
state::notifier,
7970
},
8071
anyhow::Result,
8172
futures_util::future::join_all,
82-
slog::Logger,
73+
lazy_static::lazy_static,
8374
std::sync::Arc,
84-
tokio::sync::{
85-
broadcast,
86-
mpsc,
87-
},
75+
tokio::sync::watch,
8876
};
8977

78+
pub mod legacy_schedule;
79+
pub mod market_schedule;
80+
pub mod metrics;
81+
pub mod pyth;
82+
pub mod solana;
83+
pub mod state;
84+
85+
lazy_static! {
86+
/// A static exit flag to indicate to running threads that we're shutting down. This is used to
87+
/// gracefully shut down the application.
88+
///
89+
/// We make this global based on the fact the:
90+
/// - The `Sender` side does not rely on any async runtime.
91+
/// - Exit logic doesn't really require carefully threading this value through the app.
92+
/// - The `Receiver` side of a watch channel performs the detection based on if the change
93+
/// happened after the subscribe, so it means all listeners should always be notified
94+
/// correctly.
95+
pub static ref EXIT: watch::Sender<bool> = watch::channel(false).0;
96+
}
97+
9098
pub struct Agent {
9199
config: Config,
92100
}
@@ -96,86 +104,69 @@ impl Agent {
96104
Agent { config }
97105
}
98106

99-
pub async fn start(&self, logger: Logger) {
100-
info!(logger, "Starting {}", env!("CARGO_PKG_NAME");
101-
"config" => format!("{:?}", &self.config),
102-
"version" => env!("CARGO_PKG_VERSION"),
103-
"cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("<could not get current directory>".to_owned())
107+
pub async fn start(&self) {
108+
tracing::info!(
109+
config = format!("{:?}", &self.config),
110+
version = env!("CARGO_PKG_VERSION"),
111+
cwd = std::env::current_dir()
112+
.map(|p| format!("{}", p.display()))
113+
.unwrap_or("<could not get current directory>".to_owned()),
114+
"Starting {}",
115+
env!("CARGO_PKG_NAME"),
104116
);
105117

106-
if let Err(err) = self.spawn(logger.clone()).await {
107-
error!(logger, "{}", err);
108-
debug!(logger, "error context"; "context" => format!("{:?}", err));
118+
if let Err(err) = self.spawn().await {
119+
tracing::error!(err = ?err, "Agent spawn failed.");
109120
};
110121
}
111122

112-
async fn spawn(&self, logger: Logger) -> Result<()> {
123+
async fn spawn(&self) -> Result<()> {
113124
// job handles
114125
let mut jhs = vec![];
115126

116-
// Create the channels
117-
// TODO: make all components listen to shutdown signal
118-
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
119-
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
120-
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);
121-
122-
// Create the Pythd Adapter.
123-
let adapter =
124-
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);
127+
// Create the Application State.
128+
let state = Arc::new(state::State::new(self.config.state.clone()).await);
125129

126130
// Spawn the primary network
127131
jhs.extend(network::spawn_network(
128132
self.config.primary_network.clone(),
129133
network::Network::Primary,
130-
primary_keypair_loader_tx,
131-
logger.new(o!("primary" => true)),
132-
adapter.clone(),
134+
state.clone(),
133135
)?);
134136

135137
// Spawn the secondary network, if needed
136138
if let Some(config) = &self.config.secondary_network {
137139
jhs.extend(network::spawn_network(
138140
config.clone(),
139141
network::Network::Secondary,
140-
secondary_keypair_loader_tx,
141-
logger.new(o!("primary" => false)),
142-
adapter.clone(),
142+
state.clone(),
143143
)?);
144144
}
145145

146146
// Create the Notifier task for the Pythd RPC.
147-
jhs.push(tokio::spawn(notifier(
148-
adapter.clone(),
149-
shutdown_tx.subscribe(),
150-
)));
147+
jhs.push(tokio::spawn(notifier(state.clone())));
151148

152149
// Spawn the Pythd API Server
153150
jhs.push(tokio::spawn(rpc::run(
154151
self.config.pythd_api_server.clone(),
155-
logger.clone(),
156-
adapter.clone(),
157-
shutdown_tx.subscribe(),
152+
state.clone(),
158153
)));
159154

160155
// Spawn the metrics server
161-
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
156+
jhs.push(tokio::spawn(metrics::spawn(
162157
self.config.metrics_server.bind_address,
163-
logger.clone(),
164-
adapter,
165158
)));
166159

167160
// Spawn the remote keypair loader endpoint for both networks
168161
jhs.append(
169-
&mut remote_keypair_loader::RemoteKeypairLoader::spawn(
170-
primary_keypair_loader_rx,
171-
secondary_keypair_loader_rx,
162+
&mut state::keypairs::spawn(
172163
self.config.primary_network.rpc_url.clone(),
173164
self.config
174165
.secondary_network
175166
.as_ref()
176167
.map(|c| c.rpc_url.clone()),
177168
self.config.remote_keypair_loader.clone(),
178-
logger,
169+
state,
179170
)
180171
.await,
181172
);
@@ -191,8 +182,7 @@ pub mod config {
191182
use {
192183
super::{
193184
metrics,
194-
pythd,
195-
remote_keypair_loader,
185+
pyth,
196186
solana::network,
197187
state,
198188
},
@@ -214,13 +204,14 @@ pub mod config {
214204
pub primary_network: network::Config,
215205
pub secondary_network: Option<network::Config>,
216206
#[serde(default)]
217-
pub pythd_adapter: state::Config,
207+
#[serde(rename = "pythd_adapter")]
208+
pub state: state::Config,
218209
#[serde(default)]
219-
pub pythd_api_server: pythd::api::rpc::Config,
210+
pub pythd_api_server: pyth::rpc::Config,
220211
#[serde(default)]
221212
pub metrics_server: metrics::Config,
222213
#[serde(default)]
223-
pub remote_keypair_loader: remote_keypair_loader::Config,
214+
pub remote_keypair_loader: state::keypairs::Config,
224215
}
225216

226217
impl Config {

src/agent/metrics.rs

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
use {
2-
super::state::{
3-
local::PriceInfo,
4-
State,
5-
},
6-
crate::agent::{
7-
solana::oracle::PriceEntry,
8-
store::PriceIdentifier,
9-
},
2+
super::state::local::PriceInfo,
3+
crate::agent::solana::oracle::PriceEntry,
104
lazy_static::lazy_static,
115
prometheus_client::{
126
encoding::{
@@ -21,15 +15,13 @@ use {
2115
registry::Registry,
2216
},
2317
serde::Deserialize,
24-
slog::Logger,
2518
solana_sdk::pubkey::Pubkey,
2619
std::{
2720
net::SocketAddr,
2821
sync::{
2922
atomic::AtomicU64,
3023
Arc,
3124
},
32-
time::Instant,
3325
},
3426
tokio::sync::Mutex,
3527
warp::{
@@ -64,49 +56,33 @@ lazy_static! {
6456
Arc::new(Mutex::new(<Registry>::default()));
6557
}
6658

67-
/// Internal metrics server state, holds state needed for serving
68-
/// metrics.
69-
pub struct MetricsServer {
70-
pub start_time: Instant,
71-
pub logger: Logger,
72-
pub adapter: Arc<State>,
73-
}
74-
75-
impl MetricsServer {
76-
/// Instantiate a metrics API.
77-
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
78-
let server = MetricsServer {
79-
start_time: Instant::now(),
80-
logger,
81-
adapter,
82-
};
83-
84-
let shared_state = Arc::new(Mutex::new(server));
85-
let shared_state4metrics = shared_state.clone();
86-
let metrics_route = warp::path("metrics")
87-
.and(warp::path::end())
88-
.and_then(move || {
89-
let shared_state = shared_state4metrics.clone();
90-
async move {
91-
let locked_state = shared_state.lock().await;
92-
let mut buf = String::new();
93-
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
94-
.map_err(|e| -> Box<dyn std::error::Error> {
95-
e.into()
96-
})
97-
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
98-
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
99-
}).unwrap_or_else(|e| {
100-
error!(locked_state.logger, "Metrics: Could not gather metrics from registry"; "error" => e.to_string());
101-
Box::new(reply::with_status("Could not gather metrics. See logs for details".to_string(), StatusCode::INTERNAL_SERVER_ERROR))
102-
});
103-
104-
Result::<Box<dyn Reply>, Rejection>::Ok(response)
105-
}
106-
});
107-
108-
warp::serve(metrics_route).bind(addr).await;
109-
}
59+
/// Instantiate a metrics API.
60+
pub async fn spawn(addr: impl Into<SocketAddr> + 'static) {
61+
let metrics_route = warp::path("metrics")
62+
.and(warp::path::end())
63+
.and_then(move || async move {
64+
let mut buf = String::new();
65+
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
66+
.map_err(|e| -> Box<dyn std::error::Error> { e.into() })
67+
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
68+
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
69+
})
70+
.unwrap_or_else(|e| {
71+
tracing::error!(err = ?e, "Metrics: Could not gather metrics from registry");
72+
Box::new(reply::with_status(
73+
"Could not gather metrics. See logs for details".to_string(),
74+
StatusCode::INTERNAL_SERVER_ERROR,
75+
))
76+
});
77+
78+
Result::<Box<dyn Reply>, Rejection>::Ok(response)
79+
});
80+
81+
let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
82+
let _ = crate::agent::EXIT.subscribe().changed().await;
83+
});
84+
85+
serve.await
11086
}
11187

11288
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
@@ -362,7 +338,7 @@ impl PriceLocalMetrics {
362338
metrics
363339
}
364340

365-
pub fn update(&self, price_id: &PriceIdentifier, price_info: &PriceInfo) {
341+
pub fn update(&self, price_id: &pyth_sdk::Identifier, price_info: &PriceInfo) {
366342
#[deny(unused_variables)]
367343
let Self {
368344
price,

src/agent/pythd/api.rs renamed to src/agent/pyth.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use {
66
std::collections::BTreeMap,
77
};
88

9+
pub mod rpc;
10+
911
pub type Pubkey = String;
1012
pub type Attrs = BTreeMap<String, String>;
1113

@@ -83,5 +85,3 @@ pub struct PriceUpdate {
8385
pub valid_slot: Slot,
8486
pub pub_slot: Slot,
8587
}
86-
87-
pub mod rpc;

0 commit comments

Comments
 (0)