Skip to content

feat: RPC redundancy #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ solana-account-decoder = "1.18.8"
solana-client = "1.18.8"
solana-pubkey = "2.3.0"
solana-sdk = "1.18.8"
solana-transaction-status = "1.18.26"
bincode = { version = "2.0.1", features = ["serde"] }
rand = "0.8.5"
config = "0.14.0"
Expand Down
5 changes: 3 additions & 2 deletions config/config.sample.pythnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually
# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually
# rate-limited, so a private endpoint should be used in most cases.
rpc_url = "https://api2.pythnet.pyth.network"
# API calls will cycle through each on failure.
rpc_urls = ["https://api2.pythnet.pyth.network"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks backward compatibility, let's bump the version as a major one. (we can ship it with lazer in one go)


# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
Expand Down
5 changes: 3 additions & 2 deletions config/config.sample.pythtest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node.
rpc_url = "https://api.pythtest.pyth.network"
# HTTP(S) endpoints of the RPC node.
# API calls will cycle through each on failure.
rpc_urls = ["https://api.pythtest.pyth.network"]

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes
# on the network. This can be omitted when oracle.subscriber_enabled is set to
Expand Down
5 changes: 3 additions & 2 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910"
[primary_network]
### Required fields ###

# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually
# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually
# rate-limited for Pythnet, and so a private endpoint should be used in most
# cases. For Pythtest, the public endpoint can be used.
rpc_url = "https://api.pythtest.pyth.network"
# API calls will cycle through each on failure.
rpc_urls = ["https://api.pythtest.pyth.network"]

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
Expand Down
4 changes: 2 additions & 2 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl Agent {
// Spawn the remote keypair loader endpoint for both networks
handles.extend(
services::keypairs(
self.config.primary_network.rpc_url.clone(),
self.config.primary_network.rpc_urls.clone(),
self.config
.secondary_network
.as_ref()
.map(|c| c.rpc_url.clone()),
.map(|c| c.rpc_urls.clone()),
self.config.remote_keypair_loader.clone(),
state,
)
Expand Down
55 changes: 38 additions & 17 deletions src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct NetworkState {
/// fetching the blockhash and slot number.
struct NetworkStateQuerier {
/// The RPC client
rpc_client: RpcClient,
rpc_clients: Vec<RpcClient>,

/// The interval with which to query the network state
query_interval: Interval,
Expand All @@ -122,20 +122,24 @@ struct NetworkStateQuerier {

impl NetworkStateQuerier {
#[instrument(
skip(rpc_endpoint, rpc_timeout, query_interval),
skip(rpc_urls, rpc_timeout, query_interval),
fields(
rpc_timeout = rpc_timeout.as_millis(),
query_interval = query_interval.period().as_millis(),
)
)]
pub fn new(
rpc_endpoint: &str,
rpc_urls: &Vec<String>,
rpc_timeout: Duration,
query_interval: Interval,
network_state_tx: watch::Sender<NetworkState>,
) -> Self {
let rpc_clients = rpc_urls
.iter()
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), rpc_timeout))
.collect();
NetworkStateQuerier {
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
rpc_clients,
query_interval,
network_state_tx,
}
Expand All @@ -152,11 +156,12 @@ impl NetworkStateQuerier {

#[instrument(skip(self))]
async fn query_network_state(&mut self) -> Result<()> {
// TODO: These are polled every 200ms and errors are simply logged.
// TODO: Should we retry/fallback on failure?
// Fetch the blockhash and current slot in parallel
let current_slot_future = self
.rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed());
let latest_blockhash_future = self.rpc_client.get_latest_blockhash();
let current_slot_future =
self.rpc_clients[0].get_slot_with_commitment(CommitmentConfig::confirmed());
let latest_blockhash_future = self.rpc_clients[0].get_latest_blockhash();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's important for this to work properly as the data from these two are used in the transactions. Please loop through urls


let (current_slot_result, latest_blockhash_result) =
future::join(current_slot_future, latest_blockhash_future).await;
Expand All @@ -183,7 +188,7 @@ where
// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
let mut network_state_querier = NetworkStateQuerier::new(
&config.rpc_url,
&config.rpc_urls,
config.rpc_timeout,
tokio::time::interval(config.exporter.refresh_network_state_interval_duration),
network_state_tx,
Expand Down Expand Up @@ -226,6 +231,7 @@ mod exporter {
},
},
solana_client::nonblocking::rpc_client::RpcClient,
solana_sdk::commitment_config::CommitmentConfig,
std::sync::Arc,
tokio::sync::watch,
};
Expand All @@ -243,10 +249,21 @@ mod exporter {
let mut dynamic_compute_unit_price_update_interval =
tokio::time::interval(config.exporter.publish_interval_duration);

let client = Arc::new(RpcClient::new_with_timeout(
config.rpc_url.to_string(),
config.rpc_timeout,
));
let clients: Arc<Vec<RpcClient>> = Arc::new(
config
.rpc_urls
.iter()
.map(|rpc_url| {
RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
config.rpc_timeout,
CommitmentConfig {
commitment: config.oracle.commitment,
},
)
})
.collect(),
);
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
tracing::warn!("Key store not available, Exporter won't start.");
return;
Expand All @@ -265,7 +282,7 @@ mod exporter {
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
if let Err(err) = publish_batches(
state.clone(),
client.clone(),
clients.clone(),
network,
&network_state_rx,
key_store.accumulator_key,
Expand Down Expand Up @@ -293,7 +310,7 @@ mod exporter {
if let Err(err) = Exporter::update_recent_compute_unit_price(
&*state,
&publish_keypair,
&client,
&clients,
config.exporter.staleness_threshold,
config.exporter.unchanged_publish_threshold,
).await {
Expand Down Expand Up @@ -352,13 +369,17 @@ mod transaction_monitor {
where
S: Transactions,
{
let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout);
let rpc_clients = config
.rpc_urls
.iter()
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), config.rpc_timeout))
.collect();
let mut poll_interval =
tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration);

loop {
poll_interval.tick().await;
if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await {
if let Err(err) = Transactions::poll_transactions_status(&*state, &rpc_clients).await {
tracing::error!(err = ?err, "Transaction monitor failed.");
}
}
Expand Down
45 changes: 27 additions & 18 deletions src/agent/services/keypairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
state::keypairs::Keypairs,
},
anyhow::{
Context,
bail,
Result,
},
serde::Deserialize,
Expand Down Expand Up @@ -61,8 +61,8 @@ impl Default for Config {
}

pub async fn keypairs<S>(
primary_rpc_url: String,
secondary_rpc_url: Option<String>,
primary_rpc_urls: Vec<String>,
secondary_rpc_urls: Option<Vec<String>>,
config: Config,
state: Arc<S>,
) -> Vec<JoinHandle<()>>
Expand All @@ -81,7 +81,7 @@ where

let primary_upload_route = {
let state = state.clone();
let rpc_url = primary_rpc_url.clone();
let rpc_urls = primary_rpc_urls.clone();
let min_balance = config.primary_min_keypair_balance_sol;
warp::path!("primary" / "load_keypair")
.and(warp::post())
Expand All @@ -90,14 +90,14 @@ where
.and(warp::path::end())
.and_then(move |kp: Vec<u8>| {
let state = state.clone();
let rpc_url = rpc_url.clone();
let rpc_urls = rpc_urls.clone();
async move {
let response = handle_new_keypair(
state,
Network::Primary,
kp,
min_balance,
rpc_url,
rpc_urls,
"primary",
)
.await;
Expand All @@ -113,16 +113,16 @@ where
.and(warp::path::end())
.and_then(move |kp: Vec<u8>| {
let state = state.clone();
let rpc_url = secondary_rpc_url.clone();
let rpc_urls = secondary_rpc_urls.clone();
async move {
if let Some(rpc_url) = rpc_url {
if let Some(rpc_urls) = rpc_urls {
let min_balance = config.secondary_min_keypair_balance_sol;
let response = handle_new_keypair(
state,
Network::Secondary,
kp,
min_balance,
rpc_url,
rpc_urls,
"secondary",
)
.await;
Expand Down Expand Up @@ -160,15 +160,15 @@ async fn handle_new_keypair<'a, 'b: 'a, S>(
network: Network,
new_keypair_bytes: Vec<u8>,
min_keypair_balance_sol: u64,
rpc_url: String,
rpc_urls: Vec<String>,
network_name: &'b str,
) -> WithStatus<&'static str>
where
S: Keypairs,
{
let mut upload_ok = true;
match Keypair::from_bytes(&new_keypair_bytes) {
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await {
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await {
Ok(()) => {
Keypairs::update_keypair(&*state, network, kp).await;
}
Expand Down Expand Up @@ -205,14 +205,12 @@ where
pub async fn validate_keypair(
kp: &Keypair,
min_keypair_balance_sol: u64,
rpc_url: String,
rpc_urls: Vec<String>,
) -> Result<()> {
let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());

let balance_lamports = c
.get_balance(&kp.pubkey())
.await
.context("Could not check keypair's balance")?;
let balance_lamports = match get_balance(kp, rpc_urls).await {
Ok(balance_lamports) => balance_lamports,
Err(_) => bail!("Could not check keypair's balance"),
};

let lamports_in_sol = 1_000_000_000;

Expand All @@ -227,3 +225,14 @@ pub async fn validate_keypair(
)))
}
}

async fn get_balance(kp: &Keypair, rpc_urls: Vec<String>) -> Result<u64> {
for rpc_url in rpc_urls {
let c = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
match c.get_balance(&kp.pubkey()).await {
Ok(balance) => return Ok(balance),
Err(e) => tracing::warn!("getBalance error for rpc endpoint {}: {}", rpc_url, e),
}
}
bail!("getBalance failed for all RPC endpoints")
}
24 changes: 16 additions & 8 deletions src/agent/services/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,21 @@ async fn poller<S>(
{
// Setup an RpcClient for manual polling.
let mut tick = tokio::time::interval(config.oracle.poll_interval_duration);
let client = Arc::new(RpcClient::new_with_timeout_and_commitment(
config.rpc_url,
config.rpc_timeout,
CommitmentConfig {
commitment: config.oracle.commitment,
},
));
let clients: Arc<Vec<RpcClient>> = Arc::new(
config
.rpc_urls
.iter()
.map(|rpc_url| {
RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
config.rpc_timeout,
CommitmentConfig {
commitment: config.oracle.commitment,
},
)
})
.collect(),
);

loop {
if let Err(err) = async {
Expand All @@ -186,7 +194,7 @@ async fn poller<S>(
oracle_program_key,
publish_keypair.as_ref(),
pyth_price_store_program_key,
&client,
&clients,
max_lookup_batch_size,
)
.await?;
Expand Down
Loading
Loading