Skip to content

Commit 2a485ce

Browse files
committed
refactor(agent): StateApi -> Prices and refactor module
The StateApi is left over from the initial Adapter, but all functionality is for pricing/product accounts. This refactors that module and fixes the cyclic dependency between it and GlobalStore. The new logic performs updates within the Prices API (Which is where the state relevant to subscriptions already was, so is the better place for it). File rename left for a future commit to keep the diffs clean.
1 parent ba4018f commit 2a485ce

File tree

15 files changed

+305
-208
lines changed

15 files changed

+305
-208
lines changed

src/agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl Agent {
146146
}
147147

148148
// Create the Notifier task for the Pythd RPC.
149-
jhs.push(tokio::spawn(notifier(adapter.clone())));
149+
jhs.push(tokio::spawn(notifier(logger.clone(), adapter.clone())));
150150

151151
// Spawn the Pythd API Server
152152
jhs.push(tokio::spawn(rpc::run(

src/agent/metrics.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
11
use {
2-
super::state::{local::PriceInfo, State},
3-
crate::agent::{solana::oracle::PriceEntry, store::PriceIdentifier},
2+
super::state::{
3+
local::PriceInfo,
4+
State,
5+
},
6+
crate::agent::{
7+
solana::oracle::PriceEntry,
8+
store::PriceIdentifier,
9+
},
410
lazy_static::lazy_static,
511
prometheus_client::{
6-
encoding::{text::encode, EncodeLabelSet},
7-
metrics::{counter::Counter, family::Family, gauge::Gauge},
12+
encoding::{
13+
text::encode,
14+
EncodeLabelSet,
15+
},
16+
metrics::{
17+
counter::Counter,
18+
family::Family,
19+
gauge::Gauge,
20+
},
821
registry::Registry,
922
},
1023
serde::Deserialize,
1124
slog::Logger,
1225
solana_sdk::pubkey::Pubkey,
1326
std::{
1427
net::SocketAddr,
15-
sync::{atomic::AtomicU64, Arc},
28+
sync::{
29+
atomic::AtomicU64,
30+
Arc,
31+
},
1632
time::Instant,
1733
},
1834
tokio::sync::Mutex,
19-
warp::{hyper::StatusCode, reply, Filter, Rejection, Reply},
35+
warp::{
36+
hyper::StatusCode,
37+
reply,
38+
Filter,
39+
Rejection,
40+
Reply,
41+
},
2042
};
2143

2244
pub fn default_bind_address() -> SocketAddr {
@@ -46,8 +68,8 @@ lazy_static! {
4668
/// metrics.
4769
pub struct MetricsServer {
4870
pub start_time: Instant,
49-
pub logger: Logger,
50-
pub adapter: Arc<State>,
71+
pub logger: Logger,
72+
pub adapter: Arc<State>,
5173
}
5274

5375
impl MetricsServer {
@@ -151,12 +173,12 @@ pub struct PriceGlobalMetrics {
151173

152174
/// f64 is used to get u64 support. Official docs:
153175
/// https://docs.rs/prometheus-client/latest/prometheus_client/metrics/gauge/struct.Gauge.html#using-atomicu64-as-storage-and-f64-on-the-interface
154-
conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
176+
conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
155177
timestamp: Family<PriceGlobalLabels, Gauge>,
156178

157179
/// Note: the exponent is not applied to this metric
158-
prev_price: Family<PriceGlobalLabels, Gauge>,
159-
prev_conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
180+
prev_price: Family<PriceGlobalLabels, Gauge>,
181+
prev_conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
160182
prev_timestamp: Family<PriceGlobalLabels, Gauge>,
161183

162184
/// How many times this Price was updated in the global store
@@ -299,10 +321,10 @@ pub struct PriceLocalLabels {
299321
/// Metrics exposed to Prometheus by the local store for each price
300322
#[derive(Default)]
301323
pub struct PriceLocalMetrics {
302-
price: Family<PriceLocalLabels, Gauge>,
324+
price: Family<PriceLocalLabels, Gauge>,
303325
/// f64 is used to get u64 support. Official docs:
304326
/// https://docs.rs/prometheus-client/latest/prometheus_client/metrics/gauge/struct.Gauge.html#using-atomicu64-as-storage-and-f64-on-the-interface
305-
conf: Family<PriceLocalLabels, Gauge<f64, AtomicU64>>,
327+
conf: Family<PriceLocalLabels, Gauge<f64, AtomicU64>>,
306328
timestamp: Family<PriceLocalLabels, Gauge>,
307329

308330
/// How many times this price was updated in the local store

src/agent/pythd/api/rpc.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ async fn handle_connection<S>(
117117
notify_price_sched_tx_buffer: usize,
118118
logger: Logger,
119119
) where
120-
S: state::StateApi,
120+
S: state::Prices,
121121
S: Send,
122122
S: Sync,
123123
S: 'static,
@@ -165,7 +165,7 @@ async fn handle_next<S>(
165165
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
166166
) -> Result<()>
167167
where
168-
S: state::StateApi,
168+
S: state::Prices,
169169
{
170170
tokio::select! {
171171
msg = ws_rx.next() => {
@@ -207,7 +207,7 @@ async fn handle<S>(
207207
msg: Message,
208208
) -> Result<()>
209209
where
210-
S: state::StateApi,
210+
S: state::Prices,
211211
{
212212
// Ignore control and binary messages
213213
if !msg.is_text() {
@@ -293,7 +293,7 @@ async fn dispatch_and_catch_error<S>(
293293
request: &Request<Method, Value>,
294294
) -> Response<serde_json::Value>
295295
where
296-
S: state::StateApi,
296+
S: state::Prices,
297297
{
298298
debug!(
299299
logger,
@@ -429,7 +429,7 @@ impl Default for Config {
429429

430430
pub async fn run<S>(config: Config, logger: Logger, adapter: Arc<S>)
431431
where
432-
S: state::StateApi,
432+
S: state::Prices,
433433
S: Send,
434434
S: Sync,
435435
S: 'static,
@@ -442,7 +442,7 @@ where
442442

443443
async fn serve<S>(config: Config, logger: &Logger, adapter: Arc<S>) -> Result<()>
444444
where
445-
S: state::StateApi,
445+
S: state::Prices,
446446
S: Send,
447447
S: Sync,
448448
S: 'static,

src/agent/pythd/api/rpc/get_all_products.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use {
55

66
pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
77
where
8-
S: state::StateApi,
8+
S: state::Prices,
99
{
1010
let products = adapter.get_all_products().await?;
1111
Ok(serde_json::to_value(products)?)

src/agent/pythd/api/rpc/get_product.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub async fn get_product<S>(
1919
request: &Request<Method, Value>,
2020
) -> Result<serde_json::Value>
2121
where
22-
S: state::StateApi,
22+
S: state::Prices,
2323
{
2424
let params: GetProductParams = {
2525
let value = request.params.clone();

src/agent/pythd/api/rpc/get_product_list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use {
55

66
pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
77
where
8-
S: state::StateApi,
8+
S: state::Prices,
99
{
1010
let product_list = adapter.get_product_list().await?;
1111
Ok(serde_json::to_value(product_list)?)

src/agent/pythd/api/rpc/subscribe_price.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub async fn subscribe_price<S>(
2323
request: &Request<Method, Value>,
2424
) -> Result<serde_json::Value>
2525
where
26-
S: state::StateApi,
26+
S: state::Prices,
2727
{
2828
let params: SubscribePriceParams = serde_json::from_value(
2929
request

src/agent/pythd/api/rpc/subscribe_price_sched.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub async fn subscribe_price_sched<S>(
2323
request: &Request<Method, Value>,
2424
) -> Result<serde_json::Value>
2525
where
26-
S: state::StateApi,
26+
S: state::Prices,
2727
{
2828
let params: SubscribePriceSchedParams = serde_json::from_value(
2929
request

src/agent/pythd/api/rpc/update_price.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub async fn update_price<S>(
1919
request: &Request<Method, Value>,
2020
) -> Result<serde_json::Value>
2121
where
22-
S: state::StateApi,
22+
S: state::Prices,
2323
{
2424
let params: UpdatePriceParams = serde_json::from_value(
2525
request
@@ -29,7 +29,7 @@ where
2929
)?;
3030

3131
adapter
32-
.update_price(
32+
.update_local_price(
3333
&params.account.parse::<solana_sdk::pubkey::Pubkey>()?,
3434
params.price,
3535
params.conf,

src/agent/solana/oracle.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ use {
1010
legacy_schedule::LegacySchedule,
1111
market_schedule::MarketSchedule,
1212
state::{
13-
global::{
14-
GlobalStore,
15-
Update,
16-
},
13+
global::Update,
14+
Prices,
1715
State,
1816
},
1917
},
@@ -416,7 +414,7 @@ impl Oracle {
416414
account_key: &Pubkey,
417415
account: &ProductEntry,
418416
) -> Result<()> {
419-
GlobalStore::update(
417+
Prices::update_global_price(
420418
&*self.adapter,
421419
self.network,
422420
&Update::ProductAccountUpdate {
@@ -433,7 +431,7 @@ impl Oracle {
433431
account_key: &Pubkey,
434432
account: &PriceEntry,
435433
) -> Result<()> {
436-
GlobalStore::update(
434+
Prices::update_global_price(
437435
&*self.adapter,
438436
self.network,
439437
&Update::PriceAccountUpdate {

0 commit comments

Comments
 (0)