Skip to content

Commit 673e30b

Browse files
feat: forester: do not retry on not eligible slots (#1089)
* Introduced a new error variant `NotEligible` in `ForesterError`. Updated the eligibility check to return this error and handle it appropriately during the transaction processing loop. * format * Refactor retry delay calculation to prevent overflows Modified the retry delay calculation by using `saturating_mul` and `saturating_pow` to avoid potential overflow issues. This ensures the application remains stable even if the retry count becomes excessively high. * format
1 parent 57c2912 commit 673e30b

File tree

2 files changed

+57
-41
lines changed

2 files changed

+57
-41
lines changed

forester/src/epoch_manager.rs

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
720720
&self,
721721
registration_info: &ForesterEpochInfo,
722722
tree_account: &TreeAccounts,
723-
) -> Result<bool> {
723+
) -> Result<()> {
724724
let mut rpc = self.rpc_pool.get_connection().await?;
725725
let current_slot = rpc.get_slot().await?;
726726
let forester_epoch_pda = rpc
@@ -754,7 +754,11 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
754754
"tree_schedule.slots[{}] = {:?}",
755755
light_slot, tree_schedule.slots[light_slot as usize]
756756
);
757-
Ok(tree_schedule.is_eligible(light_slot))
757+
if tree_schedule.is_eligible(light_slot) {
758+
Ok(())
759+
} else {
760+
Err(ForesterError::NotEligible)
761+
}
758762
}
759763

760764
async fn process_transaction_batch_with_retry(
@@ -764,60 +768,68 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
764768
proof_chunk: &[Proof],
765769
indexer_chunk: &[WorkItem],
766770
) -> Result<Option<Signature>> {
767-
let first_work_item = indexer_chunk
771+
let work_item = indexer_chunk
768772
.first()
769773
.ok_or_else(|| ForesterError::Custom("Empty indexer chunk".to_string()))?;
770774
debug!(
771775
"Processing work item {:?} with {} instructions",
772-
first_work_item.queue_item_data.hash,
776+
work_item.queue_item_data.hash,
773777
transaction_chunk.len()
774778
);
775779
const BASE_RETRY_DELAY: Duration = Duration::from_millis(100);
776780

777781
let mut retries = 0;
778782
loop {
779-
if !self
780-
.check_eligibility(epoch_info, &first_work_item.tree_account)
781-
.await?
782-
{
783-
debug!("Forester not eligible for this slot, skipping batch");
784-
return Ok(None);
785-
}
786-
787783
match self
788-
.process_transaction_batch(
789-
epoch_info,
790-
transaction_chunk,
791-
proof_chunk,
792-
indexer_chunk,
793-
)
784+
.check_eligibility(epoch_info, &work_item.tree_account)
794785
.await
795786
{
796-
Ok(signature) => {
797-
debug!(
798-
"Work item {:?} processed successfully. Signature: {:?}",
799-
first_work_item.queue_item_data.hash, signature
800-
);
801-
self.increment_processed_items_count(epoch_info.epoch.epoch)
802-
.await;
803-
return Ok(Some(signature));
787+
Ok(_) => {
788+
match self
789+
.process_transaction_batch(
790+
epoch_info,
791+
transaction_chunk,
792+
proof_chunk,
793+
indexer_chunk,
794+
)
795+
.await
796+
{
797+
Ok(signature) => {
798+
debug!(
799+
"Work item {:?} processed successfully. Signature: {:?}",
800+
work_item.queue_item_data.hash, signature
801+
);
802+
self.increment_processed_items_count(epoch_info.epoch.epoch)
803+
.await;
804+
return Ok(Some(signature));
805+
}
806+
Err(e) => {
807+
if retries >= self.config.max_retries {
808+
error!(
809+
"Max retries reached for work item {:?}. Error: {:?}",
810+
work_item.queue_item_data.hash, e
811+
);
812+
return Err(e);
813+
}
814+
let delay = BASE_RETRY_DELAY
815+
.saturating_mul(2u32.saturating_pow(retries as u32));
816+
let jitter = rand::thread_rng().gen_range(0..=50);
817+
sleep(delay + Duration::from_millis(jitter)).await;
818+
retries += 1;
819+
warn!(
820+
"Retrying work item {:?}. Attempt {}/{}",
821+
work_item.queue_item_data.hash, retries, self.config.max_retries
822+
);
823+
}
824+
}
825+
}
826+
Err(ForesterError::NotEligible) => {
827+
debug!("Forester not eligible for this slot, skipping batch");
828+
return Ok(None);
804829
}
805830
Err(e) => {
806-
if retries >= self.config.max_retries {
807-
error!(
808-
"Max retries reached for work item {:?}. Error: {:?}",
809-
first_work_item.queue_item_data.hash, e
810-
);
811-
return Err(e);
812-
}
813-
let delay = BASE_RETRY_DELAY * 2u32.pow(retries as u32);
814-
let jitter = rand::thread_rng().gen_range(0..=50);
815-
sleep(delay + Duration::from_millis(jitter)).await;
816-
retries += 1;
817-
warn!(
818-
"Retrying work item {:?}. Attempt {}/{}",
819-
first_work_item.queue_item_data.hash, retries, self.config.max_retries
820-
);
831+
error!("Error checking eligibility: {:?}", e);
832+
return Err(e);
821833
}
822834
}
823835
}

forester/src/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use tokio::task::JoinError;
1212

1313
#[derive(Error, Debug)]
1414
pub enum ForesterError {
15+
#[error("Element is not eligible for foresting")]
16+
NotEligible,
1517
#[error("RPC Error: {0}")]
1618
RpcError(#[from] RpcError),
1719
#[error("failed to deserialize account data")]
@@ -55,6 +57,7 @@ pub enum ForesterError {
5557
impl Clone for ForesterError {
5658
fn clone(&self) -> Self {
5759
match self {
60+
ForesterError::NotEligible => ForesterError::NotEligible,
5861
ForesterError::RpcError(_) => ForesterError::Custom("RPC Error".to_string()),
5962
ForesterError::DeserializeError(e) => ForesterError::DeserializeError(e.clone()),
6063
ForesterError::CopyMerkleTreeError(_) => {
@@ -89,6 +92,7 @@ impl Clone for ForesterError {
8992
impl ForesterError {
9093
pub fn to_owned(&self) -> Self {
9194
match self {
95+
ForesterError::NotEligible => ForesterError::NotEligible,
9296
ForesterError::RpcError(e) => ForesterError::Custom(format!("RPC Error: {:?}", e)),
9397
ForesterError::DeserializeError(e) => {
9498
ForesterError::Custom(format!("Deserialize Error: {:?}", e))

0 commit comments

Comments
 (0)