Skip to content

Commit 2d074fb

Browse files
authored
refactor(agent): convert global store to an Api (#122)
1 parent 8754c47 commit 2d074fb

File tree

12 files changed

+503
-587
lines changed

12 files changed

+503
-587
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ ENV PATH="${PATH}:/root/.local/bin"
1212
RUN poetry config virtualenvs.in-project true
1313

1414
# Install Solana Tool Suite
15-
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.11/install)"
15+
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.17/install)"
1616
ENV PATH="${PATH}:/root/.local/share/solana/install/active_release/bin"
1717

1818
ADD . /agent

src/agent.rs

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -118,26 +118,29 @@ impl Agent {
118118
// Create the channels
119119
// TODO: make all components listen to shutdown signal
120120
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
121-
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
122-
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
123-
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
124-
mpsc::channel(self.config.channel_capacities.secondary_oracle_updates);
125-
let (global_store_lookup_tx, global_store_lookup_rx) =
126-
mpsc::channel(self.config.channel_capacities.global_store_lookup);
127121
let (local_store_tx, local_store_rx) =
128122
mpsc::channel(self.config.channel_capacities.local_store);
129123
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
130124
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);
131125

126+
// Create the Pythd Adapter.
127+
let adapter = Arc::new(
128+
pythd::adapter::Adapter::new(
129+
self.config.pythd_adapter.clone(),
130+
local_store_tx.clone(),
131+
logger.clone(),
132+
)
133+
.await,
134+
);
135+
132136
// Spawn the primary network
133137
jhs.extend(network::spawn_network(
134138
self.config.primary_network.clone(),
135139
network::Network::Primary,
136140
local_store_tx.clone(),
137-
global_store_lookup_tx.clone(),
138-
primary_oracle_updates_tx,
139141
primary_keypair_loader_tx,
140142
logger.new(o!("primary" => true)),
143+
adapter.clone(),
141144
)?);
142145

143146
// Spawn the secondary network, if needed
@@ -146,53 +149,35 @@ impl Agent {
146149
config.clone(),
147150
network::Network::Secondary,
148151
local_store_tx.clone(),
149-
global_store_lookup_tx.clone(),
150-
secondary_oracle_updates_tx,
151152
secondary_keypair_loader_tx,
152153
logger.new(o!("primary" => false)),
154+
adapter.clone(),
153155
)?);
154156
}
155157

156-
// Create the Pythd Adapter.
157-
let adapter = Arc::new(pythd::adapter::Adapter::new(
158-
self.config.pythd_adapter.clone(),
159-
global_store_lookup_tx.clone(),
160-
local_store_tx.clone(),
161-
logger.clone(),
162-
));
163-
164158
// Create the Notifier task for the Pythd RPC.
165159
jhs.push(tokio::spawn(notifier(
166160
adapter.clone(),
167161
shutdown_tx.subscribe(),
168162
)));
169163

170-
// Spawn the Global Store
171-
jhs.push(store::global::spawn_store(
172-
global_store_lookup_rx,
173-
primary_oracle_updates_rx,
174-
secondary_oracle_updates_rx,
175-
adapter.clone(),
176-
logger.clone(),
177-
));
178-
179164
// Spawn the Local Store
180165
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));
181166

182167
// Spawn the Pythd API Server
183168
jhs.push(tokio::spawn(rpc::run(
184169
self.config.pythd_api_server.clone(),
185170
logger.clone(),
186-
adapter,
171+
adapter.clone(),
187172
shutdown_tx.subscribe(),
188173
)));
189174

190175
// Spawn the metrics server
191176
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
192177
self.config.metrics_server.bind_address,
193178
local_store_tx,
194-
global_store_lookup_tx,
195179
logger.clone(),
180+
adapter,
196181
)));
197182

198183
// Spawn the remote keypair loader endpoint for both networks

src/agent/dashboard.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use {
22
super::{
3-
solana::oracle::PriceEntry,
4-
store::{
5-
global::{
6-
AllAccountsData,
7-
AllAccountsMetadata,
8-
Lookup,
9-
PriceAccountMetadata,
10-
},
11-
local::{
12-
Message,
13-
PriceInfo,
14-
},
3+
pythd::adapter::global::GlobalStore,
4+
solana::{
5+
network::Network,
6+
oracle::PriceEntry,
7+
},
8+
store::local::{
9+
Message,
10+
PriceInfo,
11+
},
12+
},
13+
crate::agent::{
14+
metrics::MetricsServer,
15+
pythd::adapter::global::{
16+
AllAccountsData,
17+
AllAccountsMetadata,
18+
PriceAccountMetadata,
1519
},
1620
},
17-
crate::agent::metrics::MetricsServer,
1821
chrono::DateTime,
1922
pyth_sdk::{
2023
Identifier,
@@ -44,8 +47,6 @@ impl MetricsServer {
4447
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
4548
// Prepare response channel for requests
4649
let (local_tx, local_rx) = oneshot::channel();
47-
let (global_data_tx, global_data_rx) = oneshot::channel();
48-
let (global_metadata_tx, global_metadata_rx) = oneshot::channel();
4950

5051
// Request price data from local and global store
5152
self.local_store_tx
@@ -54,23 +55,11 @@ impl MetricsServer {
5455
})
5556
.await?;
5657

57-
self.global_store_lookup_tx
58-
.send(Lookup::LookupAllAccountsData {
59-
network: super::solana::network::Network::Primary,
60-
result_tx: global_data_tx,
61-
})
62-
.await?;
63-
64-
self.global_store_lookup_tx
65-
.send(Lookup::LookupAllAccountsMetadata {
66-
result_tx: global_metadata_tx,
67-
})
68-
.await?;
58+
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
59+
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;
6960

7061
// Await the results
7162
let local_data = local_rx.await?;
72-
let global_data = global_data_rx.await??;
73-
let global_metadata = global_metadata_rx.await??;
7463

7564
let symbol_view =
7665
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);

src/agent/metrics.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use {
2-
super::store::{
3-
global::Lookup,
4-
local::Message,
2+
super::{
3+
pythd::adapter::Adapter,
4+
store::local::Message,
55
},
66
crate::agent::{
77
solana::oracle::PriceEntry,
@@ -74,25 +74,25 @@ lazy_static! {
7474
/// dashboard and metrics.
7575
pub struct MetricsServer {
7676
/// Used to pull the state of all symbols in local store
77-
pub local_store_tx: mpsc::Sender<Message>,
78-
pub global_store_lookup_tx: mpsc::Sender<Lookup>,
79-
pub start_time: Instant,
80-
pub logger: Logger,
77+
pub local_store_tx: mpsc::Sender<Message>,
78+
pub start_time: Instant,
79+
pub logger: Logger,
80+
pub adapter: Arc<Adapter>,
8181
}
8282

8383
impl MetricsServer {
8484
/// Instantiate a metrics API with a dashboard
8585
pub async fn spawn(
8686
addr: impl Into<SocketAddr> + 'static,
8787
local_store_tx: mpsc::Sender<Message>,
88-
global_store_lookup_tx: mpsc::Sender<Lookup>,
8988
logger: Logger,
89+
adapter: Arc<Adapter>,
9090
) {
9191
let server = MetricsServer {
9292
local_store_tx,
93-
global_store_lookup_tx,
9493
start_time: Instant::now(),
9594
logger,
95+
adapter,
9696
};
9797

9898
let shared_state = Arc::new(Mutex::new(server));
@@ -109,7 +109,7 @@ impl MetricsServer {
109109
.await
110110
.unwrap_or_else(|e| {
111111
// Add logging here
112-
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
112+
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
113113

114114
// Withhold failure details from client
115115
"Could not render dashboard! See the logs for details".to_owned()

0 commit comments

Comments
 (0)