Skip to content

Commit e0cc087

Browse files
feat: add retry mechanism to rate-limited requests in Photon indexer (#1722)
* feat: add retry mechanism to rate-limited requests in Photon indexer Replaced `rate_limited_request` with `rate_limited_request_with_retry` to include a retry mechanism for handling transient errors. * format * Reduced initial retry delay to 100ms and capped exponential backoff at 4000ms to improve responsiveness and mitigate excessive delays. * refactor: simplify retry logic in get_validity_proof_v2 request
1 parent 033960e commit e0cc087

File tree

1 file changed

+138
-85
lines changed

1 file changed

+138
-85
lines changed

sdk-libs/client/src/indexer/photon_indexer.rs

Lines changed: 138 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Debug, str::FromStr};
1+
use std::{fmt::Debug, str::FromStr, time::Duration};
22

33
use async_trait::async_trait;
44
use light_compressed_account::compressed_account::{
@@ -17,7 +17,7 @@ use photon_api::{
1717
};
1818
use solana_program::pubkey::Pubkey;
1919
use solana_sdk::bs58;
20-
use tracing::{debug, error};
20+
use tracing::{debug, error, warn};
2121

2222
use super::{AddressQueueIndex, BatchAddressUpdateIndexerResponse, MerkleProofWithContext};
2323
use crate::{
@@ -78,15 +78,86 @@ impl<R: RpcConnection> PhotonIndexer<R> {
7878
&mut self.rpc
7979
}
8080

81-
async fn rate_limited_request<F, Fut, T>(&self, operation: F) -> Result<T, IndexerError>
81+
async fn rate_limited_request_with_retry<F, Fut, T>(
82+
&self,
83+
mut operation: F,
84+
) -> Result<T, IndexerError>
8285
where
83-
F: FnOnce() -> Fut,
86+
F: FnMut() -> Fut,
8487
Fut: std::future::Future<Output = Result<T, IndexerError>>,
8588
{
86-
if let Some(limiter) = &self.rate_limiter {
87-
limiter.acquire_with_wait().await;
89+
let max_retries = 10;
90+
let mut attempts = 0;
91+
let mut delay_ms = 100;
92+
let max_delay_ms = 4000;
93+
94+
loop {
95+
attempts += 1;
96+
97+
if let Some(limiter) = &self.rate_limiter {
98+
debug!(
99+
"Attempt {}/{}: Acquiring rate limiter",
100+
attempts, max_retries
101+
);
102+
limiter.acquire_with_wait().await;
103+
debug!(
104+
"Attempt {}/{}: Rate limiter acquired",
105+
attempts, max_retries
106+
);
107+
} else {
108+
debug!(
109+
"Attempt {}/{}: No rate limiter configured",
110+
attempts, max_retries
111+
);
112+
}
113+
114+
debug!("Attempt {}/{}: Executing operation", attempts, max_retries);
115+
let result = operation().await;
116+
117+
match result {
118+
Ok(value) => {
119+
debug!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
120+
return Ok(value);
121+
}
122+
Err(e) => {
123+
let is_retryable = match &e {
124+
IndexerError::ApiError(_) => {
125+
warn!("API Error: {}", e);
126+
true
127+
}
128+
IndexerError::PhotonError {
129+
context: _,
130+
message: _,
131+
} => {
132+
warn!("Operation failed, checking if retryable...");
133+
true
134+
}
135+
IndexerError::Base58DecodeError { .. } => false,
136+
IndexerError::AccountNotFound => false,
137+
IndexerError::InvalidParameters(_) => false,
138+
IndexerError::NotImplemented(_) => false,
139+
_ => false,
140+
};
141+
142+
if is_retryable && attempts < max_retries {
143+
warn!(
144+
"Attempt {}/{}: Operation failed. Retrying",
145+
attempts, max_retries
146+
);
147+
148+
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
149+
delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
150+
} else {
151+
if is_retryable {
152+
error!("Operation failed after max retries.");
153+
} else {
154+
error!("Operation failed with non-retryable error.");
155+
}
156+
return Err(e);
157+
}
158+
}
159+
}
88160
}
89-
operation().await
90161
}
91162

92163
fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
@@ -130,7 +201,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
130201
num_elements: u16,
131202
start_offset: Option<u64>,
132203
) -> Result<Vec<MerkleProofWithContext>, IndexerError> {
133-
self.rate_limited_request(|| async {
204+
self.rate_limited_request_with_retry(|| async {
134205
let request: photon_api::models::GetQueueElementsPostRequest =
135206
photon_api::models::GetQueueElementsPostRequest {
136207
params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
@@ -225,10 +296,12 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
225296
&self,
226297
hashes: Vec<String>,
227298
) -> Result<Vec<MerkleProof>, IndexerError> {
228-
self.rate_limited_request(|| async {
299+
self.rate_limited_request_with_retry(|| async {
300+
let hashes_for_async = hashes.clone();
301+
229302
let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
230303
photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
231-
params: hashes,
304+
params: hashes_for_async,
232305
..Default::default()
233306
};
234307

@@ -240,7 +313,6 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
240313
request,
241314
)
242315
.await?;
243-
debug!("Raw API response: {:?}", result);
244316

245317
if let Some(error) = &result.error {
246318
let error_msg = error.message.as_deref().unwrap_or("Unknown error");
@@ -295,7 +367,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
295367
&self,
296368
owner: &Pubkey,
297369
) -> Result<Vec<CompressedAccountWithMerkleContext>, IndexerError> {
298-
self.rate_limited_request(|| async {
370+
self.rate_limited_request_with_retry(|| async {
299371
let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
300372
params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
301373
cursor: None,
@@ -361,7 +433,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
361433
owner: &Pubkey,
362434
mint: Option<Pubkey>,
363435
) -> Result<Vec<TokenDataWithMerkleContext>, IndexerError> {
364-
self.rate_limited_request(|| async {
436+
self.rate_limited_request_with_retry(|| async {
365437
let request = GetCompressedTokenAccountsByOwnerV2PostRequest {
366438
params: Box::from(GetCompressedTokenAccountsByOwnerPostRequestParams {
367439
cursor: None,
@@ -448,7 +520,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
448520
address: Option<Address>,
449521
hash: Option<Hash>,
450522
) -> Result<Account, IndexerError> {
451-
self.rate_limited_request(|| async {
523+
self.rate_limited_request_with_retry(|| async {
452524
let params = self.build_account_params(address, hash)?;
453525
let request = photon_api::models::GetCompressedAccountPostRequest {
454526
params: Box::new(params),
@@ -474,16 +546,14 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
474546
owner: &Pubkey,
475547
mint: Option<Pubkey>,
476548
) -> Result<Vec<TokenDataWithMerkleContext>, IndexerError> {
477-
self.rate_limited_request(|| async {
549+
self.rate_limited_request_with_retry(|| async {
478550
let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
479-
params: Box::new(
480-
photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
481-
owner: owner.to_string(),
482-
mint: mint.map(|x| x.to_string()),
483-
cursor: None,
484-
limit: None,
485-
},
486-
),
551+
params: Box::new(GetCompressedTokenAccountsByOwnerPostRequestParams {
552+
owner: owner.to_string(),
553+
mint: mint.map(|x| x.to_string()),
554+
cursor: None,
555+
limit: None,
556+
}),
487557
..Default::default()
488558
};
489559

@@ -506,7 +576,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
506576
address: Option<Address>,
507577
hash: Option<Hash>,
508578
) -> Result<u64, IndexerError> {
509-
self.rate_limited_request(|| async {
579+
self.rate_limited_request_with_retry(|| async {
510580
let params = self.build_account_params(address, hash)?;
511581
let request = photon_api::models::GetCompressedAccountBalancePostRequest {
512582
params: Box::new(params),
@@ -530,7 +600,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
530600
address: Option<Address>,
531601
hash: Option<Hash>,
532602
) -> Result<u64, IndexerError> {
533-
self.rate_limited_request(|| async {
603+
self.rate_limited_request_with_retry(|| async {
534604
let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
535605
params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
536606
address: address.map(|x| x.to_base58()),
@@ -557,12 +627,16 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
557627
addresses: Option<Vec<Address>>,
558628
hashes: Option<Vec<Hash>>,
559629
) -> Result<Vec<Account>, IndexerError> {
560-
self.rate_limited_request(|| async {
630+
self.rate_limited_request_with_retry(|| async {
631+
let addresses_for_async = addresses.clone();
632+
let hashes_for_async = hashes.clone();
633+
561634
let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
562635
params: Box::new(
563636
photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
564-
addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
565-
hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
637+
addresses: addresses_for_async
638+
.map(|x| x.iter().map(|x| x.to_base58()).collect()),
639+
hashes: hashes_for_async.map(|x| x.iter().map(|x| x.to_base58()).collect()),
566640
},
567641
),
568642
..Default::default()
@@ -585,16 +659,14 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
585659
owner: &Pubkey,
586660
mint: Option<Pubkey>,
587661
) -> Result<TokenBalanceList, IndexerError> {
588-
self.rate_limited_request(|| async {
662+
self.rate_limited_request_with_retry(|| async {
589663
let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
590-
params: Box::new(
591-
photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
592-
owner: owner.to_string(),
593-
mint: mint.map(|x| x.to_string()),
594-
cursor: None,
595-
limit: None,
596-
},
597-
),
664+
params: Box::new(GetCompressedTokenAccountsByOwnerPostRequestParams {
665+
owner: owner.to_string(),
666+
mint: mint.map(|x| x.to_string()),
667+
cursor: None,
668+
limit: None,
669+
}),
598670
..Default::default()
599671
};
600672

@@ -616,7 +688,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
616688
&self,
617689
hash: Hash,
618690
) -> Result<Vec<String>, IndexerError> {
619-
self.rate_limited_request(|| async {
691+
self.rate_limited_request_with_retry(|| async {
620692
let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
621693
params: Box::new(
622694
photon_api::models::GetCompressedAccountProofPostRequestParams {
@@ -650,7 +722,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
650722
merkle_tree_pubkey: [u8; 32],
651723
addresses: Vec<[u8; 32]>,
652724
) -> Result<Vec<NewAddressProofWithContext<16>>, IndexerError> {
653-
self.rate_limited_request(|| async {
725+
self.rate_limited_request_with_retry(|| async {
654726
let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
655727
.iter()
656728
.map(|x| photon_api::models::address_with_tree::AddressWithTree {
@@ -758,7 +830,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
758830
hashes: Vec<Hash>,
759831
new_addresses_with_trees: Vec<AddressWithTree>,
760832
) -> Result<CompressedProofWithContext, IndexerError> {
761-
self.rate_limited_request(|| async {
833+
self.rate_limited_request_with_retry(|| async {
762834
let request = photon_api::models::GetValidityProofPostRequest {
763835
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
764836
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
@@ -792,52 +864,33 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
792864
hashes: Vec<Hash>,
793865
new_addresses_with_trees: Vec<AddressWithTree>,
794866
) -> Result<CompressedProofWithContextV2, IndexerError> {
795-
let max_retries = 4;
796-
let mut retries = 0;
797-
let mut delay = 1000;
798-
799-
loop {
800-
match self
801-
.rate_limited_request(|| async {
802-
let request = photon_api::models::GetValidityProofV2PostRequest {
803-
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
804-
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
805-
new_addresses_with_trees: Some(
806-
new_addresses_with_trees
807-
.iter()
808-
.map(|x| photon_api::models::AddressWithTree {
809-
address: x.address.to_base58(),
810-
tree: x.tree.to_string(),
811-
})
812-
.collect(),
813-
),
814-
}),
815-
..Default::default()
816-
};
867+
self.rate_limited_request_with_retry(|| async {
868+
let request = photon_api::models::GetValidityProofV2PostRequest {
869+
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
870+
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
871+
new_addresses_with_trees: Some(
872+
new_addresses_with_trees
873+
.iter()
874+
.map(|x| photon_api::models::AddressWithTree {
875+
address: x.address.to_base58(),
876+
tree: x.tree.to_string(),
877+
})
878+
.collect(),
879+
),
880+
}),
881+
..Default::default()
882+
};
817883

818-
let result = photon_api::apis::default_api::get_validity_proof_v2_post(
819-
&self.configuration,
820-
request,
821-
)
822-
.await?;
884+
let result = photon_api::apis::default_api::get_validity_proof_v2_post(
885+
&self.configuration,
886+
request,
887+
)
888+
.await?;
823889

824-
let result = Self::extract_result("get_validity_proof_v2", result.result)?;
825-
Ok(*result.value)
826-
})
827-
.await
828-
{
829-
Ok(result) => return Ok(result),
830-
Err(e) => {
831-
if retries >= max_retries {
832-
return Err(e);
833-
}
834-
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
835-
retries += 1;
836-
delay *= 2;
837-
continue;
838-
}
839-
}
840-
}
890+
let result = Self::extract_result("get_validity_proof_v2", result.result)?;
891+
Ok(*result.value)
892+
})
893+
.await
841894
}
842895

843896
async fn get_indexer_slot(&self, _r: &mut R) -> Result<u64, IndexerError> {
@@ -862,7 +915,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
862915
merkle_tree_pubkey: &Pubkey,
863916
zkp_batch_size: u16,
864917
) -> Result<BatchAddressUpdateIndexerResponse, IndexerError> {
865-
self.rate_limited_request(|| async {
918+
self.rate_limited_request_with_retry(|| async {
866919
let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
867920
let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
868921
params: Box::new(

0 commit comments

Comments
 (0)