Skip to content

Commit 866a293

Browse files
authored
Improve logging and reliability (#45)
* Add a key for primary vs secondary in logs * Add timeout for exporter * Bump version * Move timeout to network level * Bump warp dependency Suggested by dependabot
1 parent 3bf50bf commit 866a293

File tree

8 files changed

+61
-75
lines changed

8 files changed

+61
-75
lines changed

Cargo.lock

Lines changed: 9 additions & 54 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "0.1.2"
3+
version = "0.1.3"
44
edition = "2021"
55

66
[[bin]]
@@ -11,7 +11,7 @@ path = "src/bin/agent.rs"
1111
anyhow = "1.0.55"
1212
serde = { version = "1.0.136", features = ["derive"] }
1313
async-trait = "0.1.52"
14-
warp = { version = "0.3.1", features = ["websocket"] }
14+
warp = { version = "0.3.3", features = ["websocket"] }
1515
tokio = { version = "1.0", features = ["full"] }
1616
tokio-stream = "0.1.1"
1717
futures-util = { version = "0.3", default-features = false, features = [

config/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ key_store.root_path = "/path/to/keystore"
6767
# This can be omitted when oracle.subscriber_enabled is set to false.
6868
# wss_url = "ws://api.devnet.solana.com"
6969

70+
# Timeout for the requests to the RPC
71+
# rpc_timeout = "10s"
72+
7073
# Path to the key store.
7174
# key_store.root_path = "/path/to/keystore"
7275

src/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl Agent {
103103
self.config.primary_network.clone(),
104104
local_store_tx.clone(),
105105
primary_oracle_updates_tx,
106-
logger.clone(),
106+
logger.new(o!("primary" => true)),
107107
)?);
108108

109109
// Spawn the secondary network, if needed
@@ -112,7 +112,7 @@ impl Agent {
112112
config.clone(),
113113
local_store_tx.clone(),
114114
secondary_oracle_updates_tx,
115-
logger.clone(),
115+
logger.new(o!("primary" => false)),
116116
)?);
117117
}
118118

src/agent/solana.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod network {
2424
Serialize,
2525
},
2626
slog::Logger,
27+
std::time::Duration,
2728
tokio::{
2829
sync::{
2930
mpsc,
@@ -38,25 +39,29 @@ pub mod network {
3839
#[serde(default)]
3940
pub struct Config {
4041
/// HTTP RPC endpoint
41-
pub rpc_url: String,
42+
pub rpc_url: String,
4243
/// WSS RPC endpoint
43-
pub wss_url: String,
44+
pub wss_url: String,
45+
/// Timeout for the requests to the RPC
46+
#[serde(with = "humantime_serde")]
47+
pub rpc_timeout: Duration,
4448
/// Keystore
45-
pub key_store: key_store::Config,
49+
pub key_store: key_store::Config,
4650
/// Configuration for the Oracle reading data from this network
47-
pub oracle: oracle::Config,
51+
pub oracle: oracle::Config,
4852
/// Configuration for the Exporter publishing data to this network
49-
pub exporter: exporter::Config,
53+
pub exporter: exporter::Config,
5054
}
5155

5256
impl Default for Config {
5357
fn default() -> Self {
5458
Self {
55-
rpc_url: "http://localhost:8899".to_string(),
56-
wss_url: "ws://localhost:8900".to_string(),
57-
key_store: Default::default(),
58-
oracle: Default::default(),
59-
exporter: Default::default(),
59+
rpc_url: "http://localhost:8899".to_string(),
60+
wss_url: "ws://localhost:8900".to_string(),
61+
rpc_timeout: Duration::from_secs(10),
62+
key_store: Default::default(),
63+
oracle: Default::default(),
64+
exporter: Default::default(),
6065
}
6166
}
6267
}
@@ -72,6 +77,7 @@ pub mod network {
7277
config.oracle.clone(),
7378
&config.rpc_url,
7479
&config.wss_url,
80+
config.rpc_timeout,
7581
KeyStore::new(config.key_store.clone())?,
7682
global_store_update_tx,
7783
logger.clone(),
@@ -81,6 +87,7 @@ pub mod network {
8187
let exporter_jhs = exporter::spawn_exporter(
8288
config.exporter,
8389
&config.rpc_url,
90+
config.rpc_timeout,
8491
KeyStore::new(config.key_store.clone())?,
8592
local_store_tx,
8693
logger,

src/agent/solana/exporter.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,16 @@ impl Default for Config {
130130
pub fn spawn_exporter(
131131
config: Config,
132132
rpc_url: &str,
133+
rpc_timeout: Duration,
133134
key_store: KeyStore,
134135
local_store_tx: Sender<store::local::Message>,
135136
logger: Logger,
136137
) -> Result<Vec<JoinHandle<()>>> {
137138
// Create and spawn the network state querier
138139
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
139140
let mut network_state_querier = NetworkStateQuerier::new(
140-
&rpc_url,
141+
rpc_url,
142+
rpc_timeout,
141143
time::interval(config.refresh_network_state_interval_duration),
142144
network_state_tx,
143145
logger.clone(),
@@ -150,6 +152,7 @@ pub fn spawn_exporter(
150152
let mut transaction_monitor = TransactionMonitor::new(
151153
config.transaction_monitor.clone(),
152154
rpc_url,
155+
rpc_timeout,
153156
transactions_rx,
154157
logger.clone(),
155158
);
@@ -159,6 +162,7 @@ pub fn spawn_exporter(
159162
let mut exporter = Exporter::new(
160163
config,
161164
rpc_url,
165+
rpc_timeout,
162166
key_store,
163167
local_store_tx,
164168
network_state_rx,
@@ -206,6 +210,7 @@ impl Exporter {
206210
pub fn new(
207211
config: Config,
208212
rpc_url: &str,
213+
rpc_timeout: Duration,
209214
key_store: KeyStore,
210215
local_store_tx: Sender<store::local::Message>,
211216
network_state_rx: watch::Receiver<NetworkState>,
@@ -214,7 +219,7 @@ impl Exporter {
214219
) -> Self {
215220
let publish_interval = time::interval(config.publish_interval_duration);
216221
Exporter {
217-
rpc_client: RpcClient::new(rpc_url.to_string()),
222+
rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout),
218223
config,
219224
publish_interval,
220225
key_store,
@@ -445,12 +450,13 @@ struct NetworkStateQuerier {
445450
impl NetworkStateQuerier {
446451
pub fn new(
447452
rpc_endpoint: &str,
453+
rpc_timeout: Duration,
448454
query_interval: Interval,
449455
network_state_tx: watch::Sender<NetworkState>,
450456
logger: Logger,
451457
) -> Self {
452458
NetworkStateQuerier {
453-
rpc_client: RpcClient::new(rpc_endpoint.to_string()),
459+
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
454460
query_interval,
455461
network_state_tx,
456462
logger,
@@ -558,11 +564,12 @@ mod transaction_monitor {
558564
pub fn new(
559565
config: Config,
560566
rpc_url: &str,
567+
rpc_timeout: Duration,
561568
transactions_rx: mpsc::Receiver<Signature>,
562569
logger: Logger,
563570
) -> Self {
564571
let poll_interval = time::interval(config.poll_interval_duration);
565-
let rpc_client = RpcClient::new(rpc_url.to_string());
572+
let rpc_client = RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout);
566573
TransactionMonitor {
567574
config,
568575
rpc_client,

0 commit comments

Comments
 (0)