Skip to content

Commit 046d452

Browse files
feat: forester: add proper error handling and V2 tree support to (#1821)
`status` cmd
1 parent 9858add commit 046d452

File tree

4 files changed

+92
-34
lines changed

4 files changed

+92
-34
lines changed

forester/src/forester_status.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
ForesterConfig,
2020
};
2121

22-
pub async fn fetch_forester_status(args: &StatusArgs) {
22+
pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> {
2323
let commitment_config = CommitmentConfig::confirmed();
2424

2525
let client = solana_client::rpc_client::RpcClient::new_with_commitment(
@@ -34,7 +34,7 @@ pub async fn fetch_forester_status(args: &StatusArgs) {
3434
let mut epoch_pdas = vec![];
3535
let mut protocol_config_pdas = vec![];
3636
for (_, account) in registry_accounts {
37-
match account.data()[0..8].try_into().unwrap() {
37+
match account.data()[0..8].try_into()? {
3838
ForesterEpochPda::DISCRIMINATOR => {
3939
let forester_epoch_pda =
4040
ForesterEpochPda::try_deserialize_unchecked(&mut account.data())
@@ -63,12 +63,10 @@ pub async fn fetch_forester_status(args: &StatusArgs) {
6363

6464
let current_active_epoch = protocol_config_pdas[0]
6565
.config
66-
.get_current_active_epoch(slot)
67-
.unwrap();
66+
.get_current_active_epoch(slot)?;
6867
let current_registration_epoch = protocol_config_pdas[0]
6968
.config
70-
.get_latest_register_epoch(slot)
71-
.unwrap();
69+
.get_latest_register_epoch(slot)?;
7270
println!("Current active epoch: {:?}", current_active_epoch);
7371

7472
println!(
@@ -165,7 +163,7 @@ pub async fn fetch_forester_status(args: &StatusArgs) {
165163
println!("protocol config: {:?}", protocol_config_pdas[0]);
166164
}
167165

168-
let config = Arc::new(ForesterConfig::new_for_status(args).unwrap());
166+
let config = Arc::new(ForesterConfig::new_for_status(args)?);
169167

170168
if config.general_config.enable_metrics {
171169
register_metrics();
@@ -179,19 +177,20 @@ pub async fn fetch_forester_status(args: &StatusArgs) {
179177
fetch_active_tree: false,
180178
with_indexer: false,
181179
})
182-
.await
183-
.unwrap();
184-
let trees = fetch_trees(&rpc).await.unwrap();
180+
.await?;
181+
let trees = fetch_trees(&rpc).await?;
185182
if trees.is_empty() {
186183
warn!("No trees found. Exiting.");
187184
}
188-
run_queue_info(config.clone(), trees.clone(), TreeType::StateV1).await;
189-
run_queue_info(config.clone(), trees.clone(), TreeType::AddressV1).await;
185+
run_queue_info(config.clone(), trees.clone(), TreeType::StateV1).await?;
186+
run_queue_info(config.clone(), trees.clone(), TreeType::AddressV1).await?;
187+
188+
run_queue_info(config.clone(), trees.clone(), TreeType::StateV2).await?;
189+
run_queue_info(config.clone(), trees.clone(), TreeType::AddressV2).await?;
190+
190191
for tree in &trees {
191192
let tree_type = format!("[{}]", tree.tree_type);
192-
let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type)
193-
.await
194-
.unwrap();
193+
let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type).await?;
195194
let fullness_percentage = tree_info.fullness * 100.0;
196195
println!(
197196
"{} Tree {}: Fullness: {:.4}% | Next Index: {} | Threshold: {}",
@@ -255,9 +254,9 @@ pub async fn fetch_forester_status(args: &StatusArgs) {
255254
println!("No active foresters found for the current epoch.");
256255
}
257256

258-
push_metrics(&config.external_services.pushgateway_url)
259-
.await
260-
.unwrap();
257+
push_metrics(&config.external_services.pushgateway_url).await?;
258+
259+
Ok(())
261260
}
262261

263262
fn print_current_forester_assignments(

forester/src/lib.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ use crate::{
4141
indexer_type::IndexerType,
4242
metrics::QUEUE_LENGTH,
4343
processor::tx_cache::ProcessedHashCache,
44-
queue_helpers::fetch_queue_item_data,
44+
queue_helpers::{
45+
fetch_address_v2_queue_length, fetch_queue_item_data, fetch_state_v2_queue_length,
46+
},
4547
slot_tracker::SlotTracker,
4648
utils::get_protocol_config,
4749
};
@@ -50,7 +52,7 @@ pub async fn run_queue_info(
5052
config: Arc<ForesterConfig>,
5153
trees: Vec<TreeAccounts>,
5254
queue_type: TreeType,
53-
) {
55+
) -> Result<()> {
5456
let mut rpc = LightClient::new(LightClientConfig {
5557
url: config.external_services.rpc_url.to_string(),
5658
commitment_config: None,
@@ -67,28 +69,47 @@ pub async fn run_queue_info(
6769
.collect();
6870

6971
for tree_data in trees {
70-
if tree_data.tree_type == TreeType::StateV2 {
71-
continue;
72-
}
73-
74-
let length = if tree_data.tree_type == TreeType::StateV1 {
75-
STATE_NULLIFIER_QUEUE_VALUES
76-
} else {
77-
ADDRESS_QUEUE_VALUES
72+
let queue_length = match tree_data.tree_type {
73+
TreeType::StateV1 => fetch_queue_item_data(
74+
&mut rpc,
75+
&tree_data.queue,
76+
0,
77+
STATE_NULLIFIER_QUEUE_VALUES,
78+
STATE_NULLIFIER_QUEUE_VALUES,
79+
)
80+
.await?
81+
.len(),
82+
TreeType::AddressV1 => fetch_queue_item_data(
83+
&mut rpc,
84+
&tree_data.queue,
85+
0,
86+
ADDRESS_QUEUE_VALUES,
87+
ADDRESS_QUEUE_VALUES,
88+
)
89+
.await?
90+
.len(),
91+
TreeType::StateV2 => fetch_state_v2_queue_length(&mut rpc, &tree_data.queue).await?,
92+
TreeType::AddressV2 => {
93+
fetch_address_v2_queue_length(&mut rpc, &tree_data.merkle_tree).await?
94+
}
7895
};
7996

80-
let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0, length, length)
81-
.await
82-
.unwrap()
83-
.len();
8497
QUEUE_LENGTH
8598
.with_label_values(&[&*queue_type.to_string(), &tree_data.merkle_tree.to_string()])
8699
.set(queue_length as i64);
100+
101+
let queue_identifier = if tree_data.tree_type == TreeType::AddressV2 {
102+
tree_data.merkle_tree.to_string()
103+
} else {
104+
tree_data.queue.to_string()
105+
};
106+
87107
println!(
88108
"{:?} queue {} length: {}",
89-
queue_type, tree_data.queue, queue_length
109+
queue_type, queue_identifier, queue_length
90110
);
91111
}
112+
Ok(())
92113
}
93114

94115
pub async fn run_pipeline<R: Rpc, I: Indexer + IndexerType<R> + 'static>(

forester/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async fn main() -> Result<(), ForesterError> {
7676
.await?
7777
}
7878
Commands::Status(args) => {
79-
forester_status::fetch_forester_status(args).await;
79+
forester_status::fetch_forester_status(args).await?;
8080
}
8181
}
8282
Ok(())

forester/src/queue_helpers.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use account_compression::QueueAccount;
2+
use light_batched_merkle_tree::{
3+
merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount,
4+
};
25
use light_client::rpc::Rpc;
36
use light_hash_set::HashSet;
47
use solana_sdk::pubkey::Pubkey;
@@ -40,6 +43,41 @@ pub async fn fetch_queue_item_data<R: Rpc>(
4043
Ok(filtered_queue)
4144
}
4245

46+
pub async fn fetch_state_v2_queue_length<R: Rpc>(
47+
rpc: &mut R,
48+
output_queue_pubkey: &Pubkey,
49+
) -> Result<usize> {
50+
trace!(
51+
"Fetching StateV2 queue length for {:?}",
52+
output_queue_pubkey
53+
);
54+
if let Some(mut account) = rpc.get_account(*output_queue_pubkey).await? {
55+
let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?;
56+
Ok(output_queue.get_metadata().batch_metadata.next_index as usize)
57+
} else {
58+
Err(anyhow::anyhow!("account not found"))
59+
}
60+
}
61+
62+
pub async fn fetch_address_v2_queue_length<R: Rpc>(
63+
rpc: &mut R,
64+
merkle_tree_pubkey: &Pubkey,
65+
) -> Result<usize> {
66+
trace!(
67+
"Fetching AddressV2 queue length for {:?}",
68+
merkle_tree_pubkey
69+
);
70+
if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? {
71+
let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes(
72+
account.data.as_mut_slice(),
73+
&(*merkle_tree_pubkey).into(),
74+
)?;
75+
Ok(merkle_tree.queue_batches.next_index as usize)
76+
} else {
77+
Err(anyhow::anyhow!("account not found"))
78+
}
79+
}
80+
4381
#[derive(Debug)]
4482
pub struct QueueUpdate {
4583
pub pubkey: Pubkey,

0 commit comments

Comments
 (0)