Skip to content

Commit 4232db5

Browse files
committed
refactor(agent): refactor all references to adapter to state
1 parent 2a485ce commit 4232db5

17 files changed

+122
-133
lines changed

src/agent.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,15 @@ impl Agent {
123123
// job handles
124124
let mut jhs = vec![];
125125

126-
// Create the Pythd Adapter.
127-
let adapter =
128-
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);
126+
// Create the Application State.
127+
let state = Arc::new(state::State::new(self.config.state.clone(), logger.clone()).await);
129128

130129
// Spawn the primary network
131130
jhs.extend(network::spawn_network(
132131
self.config.primary_network.clone(),
133132
network::Network::Primary,
134133
logger.new(o!("primary" => true)),
135-
adapter.clone(),
134+
state.clone(),
136135
)?);
137136

138137
// Spawn the secondary network, if needed
@@ -141,25 +140,25 @@ impl Agent {
141140
config.clone(),
142141
network::Network::Secondary,
143142
logger.new(o!("primary" => false)),
144-
adapter.clone(),
143+
state.clone(),
145144
)?);
146145
}
147146

148147
// Create the Notifier task for the Pythd RPC.
149-
jhs.push(tokio::spawn(notifier(logger.clone(), adapter.clone())));
148+
jhs.push(tokio::spawn(notifier(logger.clone(), state.clone())));
150149

151150
// Spawn the Pythd API Server
152151
jhs.push(tokio::spawn(rpc::run(
153152
self.config.pythd_api_server.clone(),
154153
logger.clone(),
155-
adapter.clone(),
154+
state.clone(),
156155
)));
157156

158157
// Spawn the metrics server
159158
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
160159
self.config.metrics_server.bind_address,
161160
logger.clone(),
162-
adapter.clone(),
161+
state.clone(),
163162
)));
164163

165164
// Spawn the remote keypair loader endpoint for both networks
@@ -172,7 +171,7 @@ impl Agent {
172171
.map(|c| c.rpc_url.clone()),
173172
self.config.remote_keypair_loader.clone(),
174173
logger,
175-
adapter,
174+
state,
176175
)
177176
.await,
178177
);
@@ -210,7 +209,8 @@ pub mod config {
210209
pub primary_network: network::Config,
211210
pub secondary_network: Option<network::Config>,
212211
#[serde(default)]
213-
pub pythd_adapter: state::Config,
212+
#[serde(rename = "pythd_adapter")]
213+
pub state: state::Config,
214214
#[serde(default)]
215215
pub pythd_api_server: pythd::api::rpc::Config,
216216
#[serde(default)]

src/agent/metrics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,16 @@ lazy_static! {
6969
pub struct MetricsServer {
7070
pub start_time: Instant,
7171
pub logger: Logger,
72-
pub adapter: Arc<State>,
72+
pub state: Arc<State>,
7373
}
7474

7575
impl MetricsServer {
7676
/// Instantiate a metrics API.
77-
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
77+
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, state: Arc<State>) {
7878
let server = MetricsServer {
7979
start_time: Instant::now(),
8080
logger,
81-
adapter,
81+
state,
8282
};
8383

8484
let shared_state = Arc::new(Mutex::new(server));

src/agent/pythd/api/rpc.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ enum ConnectionError {
112112

113113
async fn handle_connection<S>(
114114
ws_conn: WebSocket,
115-
adapter: Arc<S>,
115+
state: Arc<S>,
116116
notify_price_tx_buffer: usize,
117117
notify_price_sched_tx_buffer: usize,
118118
logger: Logger,
@@ -131,7 +131,7 @@ async fn handle_connection<S>(
131131
loop {
132132
if let Err(err) = handle_next(
133133
&logger,
134-
&*adapter,
134+
&*state,
135135
&mut ws_tx,
136136
&mut ws_rx,
137137
&mut notify_price_tx,
@@ -156,7 +156,7 @@ async fn handle_connection<S>(
156156

157157
async fn handle_next<S>(
158158
logger: &Logger,
159-
adapter: &S,
159+
state: &S,
160160
ws_tx: &mut SplitSink<WebSocket, Message>,
161161
ws_rx: &mut SplitStream<WebSocket>,
162162
notify_price_tx: &mut mpsc::Sender<NotifyPrice>,
@@ -175,7 +175,7 @@ where
175175
handle(
176176
logger,
177177
ws_tx,
178-
adapter,
178+
state,
179179
notify_price_tx,
180180
notify_price_sched_tx,
181181
msg,
@@ -201,7 +201,7 @@ where
201201
async fn handle<S>(
202202
logger: &Logger,
203203
ws_tx: &mut SplitSink<WebSocket, Message>,
204-
adapter: &S,
204+
state: &S,
205205
notify_price_tx: &mpsc::Sender<NotifyPrice>,
206206
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
207207
msg: Message,
@@ -224,7 +224,7 @@ where
224224
for request in requests {
225225
let response = dispatch_and_catch_error(
226226
logger,
227-
adapter,
227+
state,
228228
notify_price_tx,
229229
notify_price_sched_tx,
230230
&request,
@@ -287,7 +287,7 @@ async fn parse(msg: Message) -> Result<(Vec<Request<Method, Value>>, bool)> {
287287

288288
async fn dispatch_and_catch_error<S>(
289289
logger: &Logger,
290-
adapter: &S,
290+
state: &S,
291291
notify_price_tx: &mpsc::Sender<NotifyPrice>,
292292
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
293293
request: &Request<Method, Value>,
@@ -302,13 +302,13 @@ where
302302
);
303303

304304
let result = match request.method {
305-
Method::GetProductList => get_product_list(adapter).await,
306-
Method::GetProduct => get_product(adapter, request).await,
307-
Method::GetAllProducts => get_all_products(adapter).await,
308-
Method::UpdatePrice => update_price(adapter, request).await,
309-
Method::SubscribePrice => subscribe_price(adapter, notify_price_tx, request).await,
305+
Method::GetProductList => get_product_list(state).await,
306+
Method::GetProduct => get_product(state, request).await,
307+
Method::GetAllProducts => get_all_products(state).await,
308+
Method::UpdatePrice => update_price(state, request).await,
309+
Method::SubscribePrice => subscribe_price(state, notify_price_tx, request).await,
310310
Method::SubscribePriceSched => {
311-
subscribe_price_sched(adapter, notify_price_sched_tx, request).await
311+
subscribe_price_sched(state, notify_price_sched_tx, request).await
312312
}
313313
Method::NotifyPrice | Method::NotifyPriceSched => {
314314
Err(anyhow!("unsupported method: {:?}", request.method))
@@ -410,10 +410,10 @@ pub struct Config {
410410
/// The address which the websocket API server will listen on.
411411
pub listen_address: String,
412412
/// Size of the buffer of each Server's channel on which `notify_price` events are
413-
/// received from the Adapter.
413+
/// received from the Price state.
414414
pub notify_price_tx_buffer: usize,
415415
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
416-
/// received from the Adapter.
416+
/// received from the Price state.
417417
pub notify_price_sched_tx_buffer: usize,
418418
}
419419

@@ -427,20 +427,20 @@ impl Default for Config {
427427
}
428428
}
429429

430-
pub async fn run<S>(config: Config, logger: Logger, adapter: Arc<S>)
430+
pub async fn run<S>(config: Config, logger: Logger, state: Arc<S>)
431431
where
432432
S: state::Prices,
433433
S: Send,
434434
S: Sync,
435435
S: 'static,
436436
{
437-
if let Err(err) = serve(config, &logger, adapter).await {
437+
if let Err(err) = serve(config, &logger, state).await {
438438
error!(logger, "{}", err);
439439
debug!(logger, "error context"; "context" => format!("{:?}", err));
440440
}
441441
}
442442

443-
async fn serve<S>(config: Config, logger: &Logger, adapter: Arc<S>) -> Result<()>
443+
async fn serve<S>(config: Config, logger: &Logger, state: Arc<S>) -> Result<()>
444444
where
445445
S: state::Prices,
446446
S: Send,
@@ -456,16 +456,16 @@ where
456456
let config = config.clone();
457457
warp::path::end()
458458
.and(warp::ws())
459-
.and(warp::any().map(move || adapter.clone()))
459+
.and(warp::any().map(move || state.clone()))
460460
.and(warp::any().map(move || with_logger.clone()))
461461
.and(warp::any().map(move || config.clone()))
462462
.map(
463-
|ws: Ws, adapter: Arc<S>, with_logger: WithLogger, config: Config| {
463+
|ws: Ws, state: Arc<S>, with_logger: WithLogger, config: Config| {
464464
ws.on_upgrade(move |conn| async move {
465465
info!(with_logger.logger, "websocket user connected");
466466
handle_connection(
467467
conn,
468-
adapter,
468+
state,
469469
config.notify_price_tx_buffer,
470470
config.notify_price_sched_tx_buffer,
471471
with_logger.logger,

src/agent/pythd/api/rpc/get_all_products.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use {
33
anyhow::Result,
44
};
55

6-
pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
6+
pub async fn get_all_products<S>(state: &S) -> Result<serde_json::Value>
77
where
88
S: state::Prices,
99
{
10-
let products = adapter.get_all_products().await?;
10+
let products = state.get_all_products().await?;
1111
Ok(serde_json::to_value(products)?)
1212
}

src/agent/pythd/api/rpc/get_product.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use {
1515
};
1616

1717
pub async fn get_product<S>(
18-
adapter: &S,
18+
state: &S,
1919
request: &Request<Method, Value>,
2020
) -> Result<serde_json::Value>
2121
where
@@ -27,6 +27,6 @@ where
2727
}?;
2828

2929
let account = params.account.parse::<solana_sdk::pubkey::Pubkey>()?;
30-
let product = adapter.get_product(&account).await?;
30+
let product = state.get_product(&account).await?;
3131
Ok(serde_json::to_value(product)?)
3232
}

src/agent/pythd/api/rpc/get_product_list.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use {
33
anyhow::Result,
44
};
55

6-
pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
6+
pub async fn get_product_list<S>(state: &S) -> Result<serde_json::Value>
77
where
88
S: state::Prices,
99
{
10-
let product_list = adapter.get_product_list().await?;
10+
let product_list = state.get_product_list().await?;
1111
Ok(serde_json::to_value(product_list)?)
1212
}

src/agent/pythd/api/rpc/subscribe_price.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use {
1818
};
1919

2020
pub async fn subscribe_price<S>(
21-
adapter: &S,
21+
state: &S,
2222
notify_price_tx: &mpsc::Sender<NotifyPrice>,
2323
request: &Request<Method, Value>,
2424
) -> Result<serde_json::Value>
@@ -33,7 +33,7 @@ where
3333
)?;
3434

3535
let account = params.account.parse::<solana_sdk::pubkey::Pubkey>()?;
36-
let subscription = adapter
36+
let subscription = state
3737
.subscribe_price(&account, notify_price_tx.clone())
3838
.await;
3939

src/agent/pythd/api/rpc/subscribe_price_sched.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use {
1818
};
1919

2020
pub async fn subscribe_price_sched<S>(
21-
adapter: &S,
21+
state: &S,
2222
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
2323
request: &Request<Method, Value>,
2424
) -> Result<serde_json::Value>
@@ -33,7 +33,7 @@ where
3333
)?;
3434

3535
let account = params.account.parse::<solana_sdk::pubkey::Pubkey>()?;
36-
let subscription = adapter
36+
let subscription = state
3737
.subscribe_price_sched(&account, notify_price_sched_tx.clone())
3838
.await;
3939

src/agent/pythd/api/rpc/update_price.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use {
1515
};
1616

1717
pub async fn update_price<S>(
18-
adapter: &S,
18+
state: &S,
1919
request: &Request<Method, Value>,
2020
) -> Result<serde_json::Value>
2121
where
@@ -28,7 +28,7 @@ where
2828
.ok_or_else(|| anyhow!("Missing request parameters"))?,
2929
)?;
3030

31-
adapter
31+
state
3232
.update_local_price(
3333
&params.account.parse::<solana_sdk::pubkey::Pubkey>()?,
3434
params.price,

src/agent/solana.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub mod network {
7575
config: Config,
7676
network: Network,
7777
logger: Logger,
78-
adapter: Arc<State>,
78+
state: Arc<State>,
7979
) -> Result<Vec<JoinHandle<()>>> {
8080
// Publisher permissions updates between oracle and exporter
8181
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
@@ -90,7 +90,7 @@ pub mod network {
9090
publisher_permissions_tx,
9191
KeyStore::new(config.key_store.clone(), &logger)?,
9292
logger.clone(),
93-
adapter.clone(),
93+
state.clone(),
9494
);
9595

9696
// Spawn the Exporter
@@ -102,7 +102,7 @@ pub mod network {
102102
publisher_permissions_rx,
103103
KeyStore::new(config.key_store.clone(), &logger)?,
104104
logger,
105-
adapter,
105+
state,
106106
)?;
107107

108108
jhs.extend(exporter_jhs);

0 commit comments

Comments
 (0)