Skip to content

Commit d7d5507

Browse files
feat: bb8-based SolanaRpcPool (#1075)
* Replace the old RpcPool implementation with bb8-based connection pooling for improved resource management. * Fix missing newline at end of rpc_pool.rs * Remove unused code in forester/src/lib.rs * chore: export environment variables in install.sh Export the environment variables RUSTUP_HOME, CARGO_HOME, and PATH in the install.sh script.
1 parent 4926032 commit d7d5507

File tree

14 files changed

+281
-187
lines changed

14 files changed

+281
-187
lines changed

Cargo.lock

Lines changed: 16 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

forester/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ dotenvy = "0.15.7"
4343
crossbeam-channel = "0.5.12"
4444
tokio-stream = "0.1.14"
4545
base64 = "0.12.3"
46+
async-trait = "0.1.81"
47+
bb8 = "0.8.5"
4648

4749
[dev-dependencies]
4850
function_name = "0.3.0"

forester/src/epoch_manager.rs

Lines changed: 62 additions & 81 deletions
Large diffs are not rendered by default.

forester/src/errors.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::rpc_pool::PoolError;
12
use account_compression::initialize_address_merkle_tree::Error as AccountCompressionError;
23
use light_hash_set::HashSetError;
34
use light_test_utils::indexer::IndexerError;
@@ -157,3 +158,9 @@ impl From<JoinError> for ForesterError {
157158
ForesterError::JoinError(err.to_string())
158159
}
159160
}
161+
162+
impl From<PoolError> for ForesterError {
163+
fn from(err: PoolError) -> Self {
164+
ForesterError::Custom(err.to_string())
165+
}
166+
}

forester/src/lib.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod utils;
1111

1212
use crate::epoch_manager::{fetch_queue_item_data, run_service, WorkReport};
1313
use crate::errors::ForesterError;
14+
use crate::rpc_pool::SolanaRpcPool;
1415
use crate::utils::get_protocol_config;
1516
pub use config::{ForesterConfig, ForesterEpochInfo};
1617
use env_logger::Env;
@@ -19,8 +20,8 @@ use light_test_utils::indexer::Indexer;
1920
use light_test_utils::rpc::rpc_connection::RpcConnection;
2021
use light_test_utils::rpc::SolanaRpcConnection;
2122
use log::info;
22-
pub use rpc_pool::RpcPool;
2323
pub use settings::init_config;
24+
use solana_sdk::commitment_config::CommitmentConfig;
2425
use solana_sdk::native_token::LAMPORTS_PER_SOL;
2526
use solana_sdk::signature::Signer;
2627
use std::sync::Arc;
@@ -36,16 +37,15 @@ pub async fn run_queue_info(
3637
trees: Vec<TreeAccounts>,
3738
queue_type: TreeType,
3839
) {
39-
let rpc = SolanaRpcConnection::new(config.external_services.rpc_url.to_string(), None);
40-
let rpc = Arc::new(Mutex::new(rpc));
40+
let mut rpc = SolanaRpcConnection::new(config.external_services.rpc_url.to_string(), None);
4141
let state_trees: Vec<_> = trees
4242
.iter()
4343
.filter(|t| t.tree_type == queue_type)
4444
.cloned()
4545
.collect();
4646

4747
for tree_data in state_trees {
48-
let queue_length = fetch_queue_item_data(rpc.clone(), &tree_data.queue)
48+
let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue)
4949
.await
5050
.unwrap()
5151
.len();
@@ -62,26 +62,31 @@ pub async fn run_pipeline<R: RpcConnection, I: Indexer<R>>(
6262
shutdown: oneshot::Receiver<()>,
6363
work_report_sender: mpsc::Sender<WorkReport>,
6464
) -> Result<(), ForesterError> {
65-
let rpc_pool = Arc::new(RpcPool::<R>::new(config.clone()).await);
65+
let rpc_pool = SolanaRpcPool::<R>::new(
66+
config.external_services.rpc_url.to_string(),
67+
CommitmentConfig::confirmed(),
68+
config.rpc_pool_size as u32,
69+
)
70+
.await
71+
.map_err(|e| ForesterError::Custom(e.to_string()))?;
6672

6773
{
68-
let rpc = rpc_pool.get_connection().await;
69-
let mut rpc = rpc.lock().await;
74+
let mut rpc = rpc_pool.get_connection().await?;
7075
rpc.airdrop_lamports(&config.payer_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000)
7176
.await
7277
.unwrap();
7378
}
7479

7580
let protocol_config = {
76-
let rpc = rpc_pool.get_connection().await;
77-
Arc::new(get_protocol_config(rpc.clone()).await)
81+
let mut rpc = rpc_pool.get_connection().await?;
82+
get_protocol_config(&mut *rpc).await
7883
};
7984

8085
info!("Starting Forester pipeline");
8186
run_service(
8287
config,
83-
protocol_config,
84-
rpc_pool,
88+
Arc::new(protocol_config),
89+
Arc::new(rpc_pool),
8590
indexer,
8691
shutdown,
8792
work_report_sender,

forester/src/rollover/operations.rs

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::ops::DerefMut;
21
use std::sync::Arc;
32

43
use light_registry::account_compression_cpi::sdk::{
@@ -15,7 +14,7 @@ use solana_sdk::transaction::Transaction;
1514
use tokio::sync::Mutex;
1615

1716
use crate::errors::ForesterError;
18-
use crate::{ForesterConfig, RpcPool};
17+
use crate::ForesterConfig;
1918
use account_compression::utils::constants::{
2019
STATE_MERKLE_TREE_CANOPY_DEPTH, STATE_MERKLE_TREE_HEIGHT,
2120
};
@@ -40,15 +39,14 @@ use light_test_utils::{
4039
};
4140

4241
pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
43-
rpc: Arc<Mutex<R>>,
42+
rpc: &mut R,
4443
tree_pubkey: Pubkey,
4544
tree_type: TreeType,
4645
) -> Result<bool, ForesterError> {
4746
info!(
4847
"Checking if tree is ready for rollover: {:?}",
4948
tree_pubkey.to_string()
5049
);
51-
let mut rpc = rpc.lock().await;
5250
match tree_type {
5351
TreeType::State => {
5452
let account = rpc
@@ -63,7 +61,7 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
6361
}
6462
let merkle_tree =
6563
get_concurrent_merkle_tree::<StateMerkleTreeAccount, R, Poseidon, 26>(
66-
&mut rpc,
64+
rpc,
6765
tree_pubkey,
6866
)
6967
.await;
@@ -87,7 +85,7 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
8785

8886
let merkle_tree =
8987
get_indexed_merkle_tree::<AddressMerkleTreeAccount, R, Poseidon, usize, 26, 16>(
90-
&mut rpc,
88+
rpc,
9189
tree_pubkey,
9290
)
9391
.await;
@@ -104,18 +102,17 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
104102
#[allow(dead_code)]
105103
pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
106104
config: Arc<ForesterConfig>,
107-
rpc_pool: Arc<RpcPool<R>>,
105+
rpc: &mut R,
108106
indexer: Arc<Mutex<I>>,
109107
tree_accounts: &TreeAccounts,
110108
) -> Result<(), ForesterError> {
111109
let new_nullifier_queue_keypair = Keypair::new();
112110
let new_merkle_tree_keypair = Keypair::new();
113111
let new_cpi_signature_keypair = Keypair::new();
114112

115-
let rpc = rpc_pool.get_connection().await;
116113
let rollover_signature = perform_state_merkle_tree_roll_over_forester(
117114
&config.payer_keypair,
118-
rpc.clone(),
115+
rpc,
119116
&new_nullifier_queue_keypair,
120117
&new_merkle_tree_keypair,
121118
&new_cpi_signature_keypair,
@@ -146,7 +143,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
146143
#[allow(clippy::too_many_arguments)]
147144
pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
148145
payer: &Keypair,
149-
context: Arc<Mutex<R>>,
146+
context: &mut R,
150147
new_queue_keypair: &Keypair,
151148
new_address_merkle_tree_keypair: &Keypair,
152149
new_cpi_context_keypair: &Keypair,
@@ -155,7 +152,7 @@ pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
155152
old_cpi_context_pubkey: &Pubkey,
156153
) -> Result<solana_sdk::signature::Signature, RpcError> {
157154
let instructions = create_rollover_state_merkle_tree_instructions(
158-
context.clone(),
155+
context,
159156
&payer.pubkey(),
160157
new_queue_keypair,
161158
new_address_merkle_tree_keypair,
@@ -165,7 +162,6 @@ pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
165162
old_cpi_context_pubkey,
166163
)
167164
.await;
168-
let mut context = context.lock().await;
169165
let blockhash = context.get_latest_blockhash().await.unwrap();
170166
let transaction = Transaction::new_signed_with_payer(
171167
&instructions,
@@ -178,16 +174,15 @@ pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
178174

179175
pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
180176
config: Arc<ForesterConfig>,
181-
rpc_pool: Arc<RpcPool<R>>,
177+
rpc: &mut R,
182178
indexer: Arc<Mutex<I>>,
183179
tree_data: &TreeAccounts,
184180
) -> Result<(), ForesterError> {
185181
let new_nullifier_queue_keypair = Keypair::new();
186182
let new_merkle_tree_keypair = Keypair::new();
187-
let rpc = rpc_pool.get_connection().await;
188183
perform_address_merkle_tree_roll_over(
189184
&config.payer_keypair,
190-
rpc.clone(),
185+
rpc,
191186
&new_nullifier_queue_keypair,
192187
&new_merkle_tree_keypair,
193188
&tree_data.merkle_tree,
@@ -205,22 +200,21 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
205200

206201
pub async fn perform_address_merkle_tree_roll_over<R: RpcConnection>(
207202
payer: &Keypair,
208-
context: Arc<Mutex<R>>,
203+
context: &mut R,
209204
new_queue_keypair: &Keypair,
210205
new_address_merkle_tree_keypair: &Keypair,
211206
old_merkle_tree_pubkey: &Pubkey,
212207
old_queue_pubkey: &Pubkey,
213208
) -> Result<solana_sdk::signature::Signature, RpcError> {
214209
let instructions = create_rollover_address_merkle_tree_instructions(
215-
context.clone(),
210+
context,
216211
&payer.pubkey(),
217212
new_queue_keypair,
218213
new_address_merkle_tree_keypair,
219214
old_merkle_tree_pubkey,
220215
old_queue_pubkey,
221216
)
222217
.await;
223-
let mut context = context.lock().await;
224218
let blockhash = context.get_latest_blockhash().await.unwrap();
225219
let transaction = Transaction::new_signed_with_payer(
226220
&instructions,
@@ -232,16 +226,15 @@ pub async fn perform_address_merkle_tree_roll_over<R: RpcConnection>(
232226
}
233227

234228
pub async fn create_rollover_address_merkle_tree_instructions<R: RpcConnection>(
235-
rpc: Arc<Mutex<R>>,
229+
rpc: &mut R,
236230
authority: &Pubkey,
237231
new_nullifier_queue_keypair: &Keypair,
238232
new_address_merkle_tree_keypair: &Keypair,
239233
merkle_tree_pubkey: &Pubkey,
240234
nullifier_queue_pubkey: &Pubkey,
241235
) -> Vec<Instruction> {
242-
let mut rpc = rpc.lock().await;
243236
let (merkle_tree_config, queue_config) = get_address_bundle_config(
244-
rpc.deref_mut(),
237+
rpc,
245238
AddressMerkleTreeAccounts {
246239
merkle_tree: *merkle_tree_pubkey,
247240
queue: *nullifier_queue_pubkey,
@@ -250,7 +243,7 @@ pub async fn create_rollover_address_merkle_tree_instructions<R: RpcConnection>(
250243
.await;
251244
let (merkle_tree_rent_exemption, queue_rent_exemption) =
252245
get_rent_exemption_for_address_merkle_tree_and_queue(
253-
rpc.deref_mut(),
246+
rpc,
254247
&merkle_tree_config,
255248
&queue_config,
256249
)
@@ -291,7 +284,7 @@ pub async fn create_rollover_address_merkle_tree_instructions<R: RpcConnection>(
291284

292285
#[allow(clippy::too_many_arguments)]
293286
pub async fn create_rollover_state_merkle_tree_instructions<R: RpcConnection>(
294-
rpc: Arc<Mutex<R>>,
287+
rpc: &mut R,
295288
authority: &Pubkey,
296289
new_nullifier_queue_keypair: &Keypair,
297290
new_state_merkle_tree_keypair: &Keypair,
@@ -300,9 +293,8 @@ pub async fn create_rollover_state_merkle_tree_instructions<R: RpcConnection>(
300293
nullifier_queue_pubkey: &Pubkey,
301294
old_cpi_context_pubkey: &Pubkey,
302295
) -> Vec<Instruction> {
303-
let mut rpc = rpc.lock().await;
304296
let (merkle_tree_config, queue_config) = get_state_bundle_config(
305-
rpc.deref_mut(),
297+
rpc,
306298
StateMerkleTreeAccounts {
307299
merkle_tree: *merkle_tree_pubkey,
308300
nullifier_queue: *nullifier_queue_pubkey,
@@ -311,12 +303,8 @@ pub async fn create_rollover_state_merkle_tree_instructions<R: RpcConnection>(
311303
)
312304
.await;
313305
let (state_merkle_tree_rent_exemption, queue_rent_exemption) =
314-
get_rent_exemption_for_state_merkle_tree_and_queue(
315-
rpc.deref_mut(),
316-
&merkle_tree_config,
317-
&queue_config,
318-
)
319-
.await;
306+
get_rent_exemption_for_state_merkle_tree_and_queue(rpc, &merkle_tree_config, &queue_config)
307+
.await;
320308
let create_nullifier_queue_instruction = create_account_instruction(
321309
authority,
322310
queue_rent_exemption.size,

0 commit comments

Comments
 (0)