Skip to content

Commit d4e4528

Browse files
refactor: RPC pool configuration and connection handling (#1745)
* Refactor RPC pool configuration and connection handling - Introduced `RpcPoolConfig` struct for better organization of RPC pool settings. - Updated CLI arguments to include new RPC pool parameters: connection timeout, idle timeout, max retries, and retry delays. - Replaced direct usage of `SolanaRpcPool` with a builder pattern for improved configurability. - Enhanced connection retrieval with retry logic and exponential backoff. - Removed unused `log` dependency from `sdk-libs/client/Cargo.toml`. * rebase * fix after rebase
1 parent a160153 commit d4e4528

File tree

13 files changed

+1113
-636
lines changed

13 files changed

+1113
-636
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

forester-utils/src/rpc_pool.rs

Lines changed: 167 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::time::Duration;
1+
use std::{cmp::min, time::Duration};
22

33
use async_trait::async_trait;
44
use bb8::{Pool, PooledConnection};
55
use light_client::rpc::{rpc_connection::RpcConnectionConfig, RpcConnection, RpcError};
66
use solana_sdk::commitment_config::CommitmentConfig;
77
use thiserror::Error;
88
use tokio::time::sleep;
9-
use tracing::{debug, error};
9+
use tracing::{error, trace, warn};
1010

1111
use crate::rate_limiter::RateLimiter;
1212

@@ -18,6 +18,10 @@ pub enum PoolError {
1818
RpcRequest(#[from] RpcError),
1919
#[error("Pool error: {0}")]
2020
Pool(String),
21+
#[error("Failed to get connection after {0} retries: {1}")]
22+
MaxRetriesExceeded(u32, String),
23+
#[error("Missing required field for RpcPoolBuilder: {0}")]
24+
BuilderMissingField(String),
2125
}
2226

2327
pub struct SolanaConnectionManager<R: RpcConnection + 'static> {
@@ -57,6 +61,7 @@ impl<R: RpcConnection + 'static> bb8::ManageConnection for SolanaConnectionManag
5761
commitment_config: Some(self.commitment),
5862
with_indexer: false,
5963
};
64+
6065
Ok(R::new(config))
6166
}
6267

@@ -72,67 +77,182 @@ impl<R: RpcConnection + 'static> bb8::ManageConnection for SolanaConnectionManag
7277
#[derive(Debug)]
7378
pub struct SolanaRpcPool<R: RpcConnection + 'static> {
7479
pool: Pool<SolanaConnectionManager<R>>,
80+
max_retries: u32,
81+
initial_retry_delay: Duration,
82+
max_retry_delay: Duration,
7583
}
7684

77-
impl<R: RpcConnection + 'static> SolanaRpcPool<R> {
78-
pub async fn new(
79-
url: String,
80-
commitment: CommitmentConfig,
81-
max_size: u32,
82-
rpc_rate_limiter: Option<RateLimiter>,
83-
send_tx_rate_limiter: Option<RateLimiter>,
84-
) -> Result<Self, PoolError> {
85-
let manager =
86-
SolanaConnectionManager::new(url, commitment, rpc_rate_limiter, send_tx_rate_limiter);
85+
#[derive(Debug)]
86+
pub struct SolanaRpcPoolBuilder<R: RpcConnection> {
87+
url: Option<String>,
88+
commitment: Option<CommitmentConfig>,
89+
90+
max_size: u32,
91+
connection_timeout_secs: u64,
92+
idle_timeout_secs: u64,
93+
max_retries: u32,
94+
initial_retry_delay_ms: u64,
95+
max_retry_delay_ms: u64,
96+
97+
rpc_rate_limiter: Option<RateLimiter>,
98+
send_tx_rate_limiter: Option<RateLimiter>,
99+
_phantom: std::marker::PhantomData<R>,
100+
}
101+
102+
impl<R: RpcConnection> Default for SolanaRpcPoolBuilder<R> {
103+
fn default() -> Self {
104+
Self::new()
105+
}
106+
}
107+
108+
impl<R: RpcConnection> SolanaRpcPoolBuilder<R> {
109+
pub fn new() -> Self {
110+
Self {
111+
url: None,
112+
commitment: None,
113+
max_size: 50,
114+
connection_timeout_secs: 15,
115+
idle_timeout_secs: 300,
116+
max_retries: 3,
117+
initial_retry_delay_ms: 1000,
118+
max_retry_delay_ms: 16000,
119+
rpc_rate_limiter: None,
120+
send_tx_rate_limiter: None,
121+
_phantom: std::marker::PhantomData,
122+
}
123+
}
124+
125+
pub fn url(mut self, url: String) -> Self {
126+
self.url = Some(url);
127+
self
128+
}
129+
130+
pub fn commitment(mut self, commitment: CommitmentConfig) -> Self {
131+
self.commitment = Some(commitment);
132+
self
133+
}
134+
135+
pub fn max_size(mut self, max_size: u32) -> Self {
136+
self.max_size = max_size;
137+
self
138+
}
139+
140+
pub fn connection_timeout_secs(mut self, secs: u64) -> Self {
141+
self.connection_timeout_secs = secs;
142+
self
143+
}
144+
145+
pub fn idle_timeout_secs(mut self, secs: u64) -> Self {
146+
self.idle_timeout_secs = secs;
147+
self
148+
}
149+
150+
pub fn max_retries(mut self, retries: u32) -> Self {
151+
self.max_retries = retries;
152+
self
153+
}
154+
155+
pub fn initial_retry_delay_ms(mut self, ms: u64) -> Self {
156+
self.initial_retry_delay_ms = ms;
157+
self
158+
}
159+
160+
pub fn max_retry_delay_ms(mut self, ms: u64) -> Self {
161+
self.max_retry_delay_ms = ms;
162+
self
163+
}
164+
165+
pub fn rpc_rate_limiter(mut self, limiter: RateLimiter) -> Self {
166+
self.rpc_rate_limiter = Some(limiter);
167+
self
168+
}
169+
170+
pub fn send_tx_rate_limiter(mut self, limiter: RateLimiter) -> Self {
171+
self.send_tx_rate_limiter = Some(limiter);
172+
self
173+
}
174+
175+
pub async fn build(self) -> Result<SolanaRpcPool<R>, PoolError> {
176+
let url = self
177+
.url
178+
.ok_or_else(|| PoolError::BuilderMissingField("url".to_string()))?;
179+
let commitment = self
180+
.commitment
181+
.ok_or_else(|| PoolError::BuilderMissingField("commitment".to_string()))?;
182+
183+
let manager = SolanaConnectionManager::new(
184+
url,
185+
commitment,
186+
self.rpc_rate_limiter,
187+
self.send_tx_rate_limiter,
188+
);
189+
87190
let pool = Pool::builder()
88-
.max_size(max_size)
89-
.connection_timeout(Duration::from_secs(15))
90-
.idle_timeout(Some(Duration::from_secs(60 * 5)))
191+
.max_size(self.max_size)
192+
.connection_timeout(Duration::from_secs(self.connection_timeout_secs))
193+
.idle_timeout(Some(Duration::from_secs(self.idle_timeout_secs)))
91194
.build(manager)
92195
.await
93196
.map_err(|e| PoolError::Pool(e.to_string()))?;
94197

95-
Ok(Self { pool })
198+
Ok(SolanaRpcPool {
199+
pool,
200+
max_retries: self.max_retries,
201+
initial_retry_delay: Duration::from_millis(self.initial_retry_delay_ms),
202+
max_retry_delay: Duration::from_millis(self.max_retry_delay_ms),
203+
})
96204
}
205+
}
97206

207+
impl<R: RpcConnection> SolanaRpcPool<R> {
98208
pub async fn get_connection(
99209
&self,
100210
) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
101-
debug!("Attempting to get RPC connection...");
102-
let result = self
103-
.pool
104-
.get()
105-
.await
106-
.map_err(|e| PoolError::Pool(e.to_string()));
211+
let mut current_retries = 0;
212+
let mut current_delay = self.initial_retry_delay;
107213

108-
match result {
109-
Ok(_) => {
110-
debug!("Successfully got RPC connection");
111-
}
112-
Err(ref e) => {
113-
error!("Failed to get RPC connection: {:?}", e);
114-
}
115-
}
116-
117-
result
118-
}
119-
120-
pub async fn get_connection_with_retry(
121-
&self,
122-
max_retries: u32,
123-
delay: Duration,
124-
) -> Result<PooledConnection<'_, SolanaConnectionManager<R>>, PoolError> {
125-
let mut retries = 0;
126214
loop {
215+
trace!(
216+
"Attempting to get RPC connection... (Attempt {})",
217+
current_retries + 1
218+
);
127219
match self.pool.get().await {
128-
Ok(conn) => return Ok(conn),
129-
Err(e) if retries < max_retries => {
130-
retries += 1;
131-
eprintln!("Failed to get connection (attempt {}): {:?}", retries, e);
132-
tokio::task::yield_now().await;
133-
sleep(delay).await;
220+
Ok(conn) => {
221+
trace!(
222+
"Successfully got RPC connection (Attempt {})",
223+
current_retries + 1
224+
);
225+
return Ok(conn);
226+
}
227+
Err(e) => {
228+
error!(
229+
"Failed to get RPC connection (Attempt {}): {:?}",
230+
current_retries + 1,
231+
e
232+
);
233+
if current_retries < self.max_retries {
234+
current_retries += 1;
235+
warn!(
236+
"Retrying to get RPC connection in {:?} (Attempt {}/{})",
237+
current_delay,
238+
current_retries + 1,
239+
self.max_retries + 1
240+
);
241+
tokio::task::yield_now().await;
242+
sleep(current_delay).await;
243+
current_delay = min(current_delay * 2, self.max_retry_delay);
244+
} else {
245+
error!(
246+
"Failed to get RPC connection after {} attempts. Last error: {:?}",
247+
self.max_retries + 1,
248+
e
249+
);
250+
return Err(PoolError::MaxRetriesExceeded(
251+
self.max_retries + 1,
252+
e.to_string(),
253+
));
254+
}
134255
}
135-
Err(e) => return Err(PoolError::Pool(e.to_string())),
136256
}
137257
}
138258
}

forester/src/cli.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,39 @@ pub struct StartArgs {
7272
#[arg(long, env = "FORESTER_ENABLE_PRIORITY_FEES", default_value = "false")]
7373
pub enable_priority_fees: bool,
7474

75-
#[arg(long, env = "FORESTER_RPC_POOL_SIZE", default_value = "98")]
76-
pub rpc_pool_size: usize,
75+
#[arg(long, env = "FORESTER_RPC_POOL_SIZE", default_value = "50")]
76+
pub rpc_pool_size: u32,
77+
78+
#[arg(
79+
long,
80+
env = "FORESTER_RPC_POOL_CONNECTION_TIMEOUT_SECS",
81+
default_value = "15"
82+
)]
83+
pub rpc_pool_connection_timeout_secs: u64,
84+
85+
#[arg(
86+
long,
87+
env = "FORESTER_RPC_POOL_IDLE_TIMEOUT_SECS",
88+
default_value = "300"
89+
)]
90+
pub rpc_pool_idle_timeout_secs: u64,
91+
92+
#[arg(long, env = "FORESTER_RPC_POOL_MAX_RETRIES", default_value = "100")]
93+
pub rpc_pool_max_retries: u32,
94+
95+
#[arg(
96+
long,
97+
env = "FORESTER_RPC_POOL_INITIAL_RETRY_DELAY_MS",
98+
default_value = "1000"
99+
)]
100+
pub rpc_pool_initial_retry_delay_ms: u64,
101+
102+
#[arg(
103+
long,
104+
env = "FORESTER_RPC_POOL_MAX_RETRY_DELAY_MS",
105+
default_value = "16000"
106+
)]
107+
pub rpc_pool_max_retry_delay_ms: u64,
77108

78109
#[arg(
79110
long,

0 commit comments

Comments
 (0)