Skip to content

Commit 78a1148

Browse files
authored
feat(fortuna): better retry mechanism (#2780)
* feat(fortuna): better retry mechanism
1 parent d65df37 commit 78a1148

File tree

5 files changed

+153
-47
lines changed

5 files changed

+153
-47
lines changed

apps/fortuna/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/fortuna/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fortuna"
3-
version = "7.6.3"
3+
version = "7.6.4"
44
edition = "2021"
55

66
[lib]

apps/fortuna/src/chain/ethereum.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use {
3333
// contract in the same repo.
3434
abigen!(
3535
PythRandom,
36-
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
36+
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json";
37+
PythRandomErrors,
38+
"../../target_chains/ethereum/entropy_sdk/solidity/abis/EntropyErrors.json"
3739
);
3840

3941
pub type MiddlewaresWrapper<T> = LegacyTxMiddleware<

apps/fortuna/src/eth_utils/utils.rs

Lines changed: 80 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@ use {
44
backoff::ExponentialBackoff,
55
ethabi::ethereum_types::U64,
66
ethers::{
7-
contract::ContractCall,
7+
contract::{ContractCall, ContractError},
88
middleware::Middleware,
9-
types::{TransactionReceipt, U256},
9+
providers::ProviderError,
10+
types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
11+
},
12+
std::{
13+
fmt::Display,
14+
sync::{atomic::AtomicU64, Arc},
1015
},
11-
std::sync::{atomic::AtomicU64, Arc},
1216
tokio::time::{timeout, Duration},
1317
tracing,
1418
};
@@ -151,12 +155,17 @@ pub async fn estimate_tx_cost<T: Middleware + 'static>(
151155
/// the transaction exceeds this limit, the transaction is not submitted.
152156
/// Note however that any gas_escalation policy is applied to the estimate, so the actual gas used may exceed the limit.
153157
/// The transaction is retried until it is confirmed on chain or the maximum number of retries is reached.
158+
/// You can pass an `error_mapper` function that will be called on each retry with the number of retries and the error.
159+
/// This lets you customize the backoff behavior based on the error type.
154160
pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
155161
middleware: Arc<T>,
156162
call: ContractCall<T, ()>,
157163
gas_limit: U256,
158164
escalation_policy: EscalationPolicy,
159-
) -> Result<SubmitTxResult> {
165+
error_mapper: Option<
166+
impl Fn(u64, backoff::Error<SubmitTxError<T>>) -> backoff::Error<SubmitTxError<T>>,
167+
>,
168+
) -> Result<SubmitTxResult, SubmitTxError<T>> {
160169
let start_time = std::time::Instant::now();
161170

162171
tracing::info!("Started processing event");
@@ -176,14 +185,19 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
176185

177186
let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries);
178187
let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
179-
submit_tx(
188+
let result = submit_tx(
180189
middleware.clone(),
181190
&call,
182191
padded_gas_limit,
183192
gas_multiplier_pct,
184193
fee_multiplier_pct,
185194
)
186-
.await
195+
.await;
196+
if let Some(ref mapper) = error_mapper {
197+
result.map_err(|e| mapper(num_retries, e))
198+
} else {
199+
result
200+
}
187201
},
188202
|e, dur| {
189203
let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
@@ -210,6 +224,51 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
210224
})
211225
}
212226

227+
pub enum SubmitTxError<T: Middleware + NonceManaged + 'static> {
228+
GasUsageEstimateError(ContractError<T>),
229+
GasLimitExceeded { estimate: U256, limit: U256 },
230+
GasPriceEstimateError(<T as Middleware>::Error),
231+
SubmissionError(TypedTransaction, <T as Middleware>::Error),
232+
ConfirmationTimeout(TypedTransaction),
233+
ConfirmationError(TypedTransaction, ProviderError),
234+
ReceiptError(TypedTransaction, TransactionReceipt),
235+
}
236+
237+
impl<T> Display for SubmitTxError<T>
238+
where
239+
T: Middleware + NonceManaged + 'static,
240+
{
241+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242+
match self {
243+
SubmitTxError::GasUsageEstimateError(e) => {
244+
write!(f, "Error estimating gas for reveal: {:?}", e)
245+
}
246+
SubmitTxError::GasLimitExceeded { estimate, limit } => write!(
247+
f,
248+
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
249+
estimate, limit
250+
),
251+
SubmitTxError::GasPriceEstimateError(e) => write!(f, "Gas price estimate error: {}", e),
252+
SubmitTxError::SubmissionError(tx, e) => write!(
253+
f,
254+
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
255+
tx, e
256+
),
257+
SubmitTxError::ConfirmationTimeout(tx) => {
258+
write!(f, "Tx stuck in mempool. Resetting nonce. Tx:{:?}", tx)
259+
}
260+
SubmitTxError::ConfirmationError(tx, e) => write!(
261+
f,
262+
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
263+
tx, e
264+
),
265+
SubmitTxError::ReceiptError(tx, _) => {
266+
write!(f, "Reveal transaction reverted on-chain. Tx:{:?}", tx,)
267+
}
268+
}
269+
}
270+
}
271+
213272
/// Submit a transaction to the blockchain. It estimates the gas for the transaction,
214273
/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction.
215274
/// It will return a permanent or transient error depending on the error type and whether
@@ -221,24 +280,23 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
221280
// A value of 100 submits the tx with the same gas/fee as the estimate.
222281
gas_estimate_multiplier_pct: u64,
223282
fee_estimate_multiplier_pct: u64,
224-
) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> {
283+
) -> Result<TransactionReceipt, backoff::Error<SubmitTxError<T>>> {
225284
let gas_estimate_res = call.estimate_gas().await;
226285

227286
let gas_estimate = gas_estimate_res.map_err(|e| {
228287
// we consider the error transient even if it is a contract revert since
229288
// it can be because of routing to a lagging RPC node. Retrying such errors will
230289
// incur a few additional RPC calls, but it is fine.
231-
backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
290+
backoff::Error::transient(SubmitTxError::GasUsageEstimateError(e))
232291
})?;
233292

234293
// The gas limit on the simulated transaction is the maximum expected tx gas estimate,
235294
// but we are willing to pad the gas a bit to ensure reliable submission.
236295
if gas_estimate > gas_limit {
237-
return Err(backoff::Error::permanent(anyhow!(
238-
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
239-
gas_estimate,
240-
gas_limit
241-
)));
296+
return Err(backoff::Error::permanent(SubmitTxError::GasLimitExceeded {
297+
estimate: gas_estimate,
298+
limit: gas_limit,
299+
}));
242300
}
243301

244302
// Pad the gas estimate after checking it against the simulation gas limit.
@@ -247,13 +305,11 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
247305
let call = call.clone().gas(gas_estimate);
248306
let mut transaction = call.tx.clone();
249307

250-
// manually fill the tx with the gas info, so we can log the details in case of error
308+
// manually fill the tx with the gas price info, so we can log the details in case of error
251309
client
252310
.fill_transaction(&mut transaction, None)
253311
.await
254-
.map_err(|e| {
255-
backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
256-
})?;
312+
.map_err(|e| backoff::Error::transient(SubmitTxError::GasPriceEstimateError(e)))?;
257313

258314
// Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle
259315
// in the client that sets the gas price.
@@ -271,11 +327,7 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
271327
.send_transaction(transaction.clone(), None)
272328
.await
273329
.map_err(|e| {
274-
backoff::Error::transient(anyhow!(
275-
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
276-
transaction,
277-
e
278-
))
330+
backoff::Error::transient(SubmitTxError::SubmissionError(transaction.clone(), e))
279331
})?;
280332

281333
let reset_nonce = || {
@@ -292,34 +344,24 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
292344
// in this case ethers internal polling will not reduce the number of retries
293345
// and keep retrying indefinitely. So we set a manual timeout here and reset the nonce.
294346
reset_nonce();
295-
backoff::Error::transient(anyhow!(
296-
"Tx stuck in mempool. Resetting nonce. Tx:{:?}",
297-
transaction
298-
))
347+
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
299348
})?;
300349

301350
let receipt = pending_receipt
302351
.map_err(|e| {
303-
backoff::Error::transient(anyhow!(
304-
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
305-
transaction,
306-
e
307-
))
352+
backoff::Error::transient(SubmitTxError::ConfirmationError(transaction.clone(), e))
308353
})?
309354
.ok_or_else(|| {
310355
// RPC may not return an error on tx submission if the nonce is too high.
311356
// But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
312357
reset_nonce();
313-
backoff::Error::transient(anyhow!(
314-
"Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}",
315-
transaction
316-
))
358+
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
317359
})?;
318360

319361
if receipt.status == Some(U64::from(0)) {
320-
return Err(backoff::Error::transient(anyhow!(
321-
"Reveal transaction reverted on-chain. Tx:{:?}",
322-
transaction
362+
return Err(backoff::Error::transient(SubmitTxError::ReceiptError(
363+
transaction.clone(),
364+
receipt.clone(),
323365
)));
324366
}
325367

apps/fortuna/src/keeper/process_event.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use {
22
super::keeper_metrics::AccountLabel,
33
crate::{
4-
chain::reader::RequestedWithCallbackEvent,
5-
eth_utils::utils::submit_tx_with_backoff,
4+
chain::{ethereum::PythRandomErrorsErrors, reader::RequestedWithCallbackEvent},
5+
eth_utils::utils::{submit_tx_with_backoff, SubmitTxError},
66
history::{RequestEntryState, RequestStatus},
77
keeper::block::ProcessParams,
88
},
99
anyhow::{anyhow, Result},
10+
ethers::{abi::AbiDecode, contract::ContractError},
11+
std::time::Duration,
1012
tracing,
1113
};
1214

@@ -74,12 +76,43 @@ pub async fn process_event_with_backoff(
7476
event.user_random_number,
7577
provider_revelation,
7678
);
79+
let error_mapper = |num_retries, e| {
80+
if let backoff::Error::Transient {
81+
err: SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)),
82+
..
83+
} = &e
84+
{
85+
if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) =
86+
PythRandomErrorsErrors::decode(revert)
87+
{
88+
let err =
89+
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert.clone()));
90+
// Slow down the retries if the request is not found.
91+
// This probably means that the request is already fulfilled via another process.
92+
// After 5 retries, we return the error permanently.
93+
if num_retries >= 5 {
94+
return backoff::Error::Permanent(err);
95+
}
96+
let retry_after_seconds = match num_retries {
97+
0 => 5,
98+
1 => 10,
99+
_ => 60,
100+
};
101+
return backoff::Error::Transient {
102+
err,
103+
retry_after: Some(Duration::from_secs(retry_after_seconds)),
104+
};
105+
}
106+
}
107+
e
108+
};
77109

78110
let success = submit_tx_with_backoff(
79111
contract.client(),
80112
contract_call,
81113
gas_limit,
82114
escalation_policy,
115+
Some(error_mapper),
83116
)
84117
.await;
85118

@@ -160,16 +193,45 @@ pub async fn process_event_with_backoff(
160193
.get_request(event.provider_address, event.sequence_number)
161194
.await;
162195

163-
tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);
164-
165196
// We only count failures for cases where we are completely certain that the callback failed.
166-
if req.is_ok_and(|x| x.is_some()) {
197+
if req.as_ref().is_ok_and(|x| x.is_some()) {
198+
tracing::error!("Failed to process event: {}. Request: {:?}", e, req);
167199
metrics
168200
.requests_processed_failure
169201
.get_or_create(&account_label)
170202
.inc();
203+
// Do not display the internal error, it might include RPC details.
204+
let reason = match e {
205+
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)) => {
206+
format!("Reverted: {}", revert)
207+
}
208+
SubmitTxError::GasLimitExceeded { limit, estimate } => format!(
209+
"Gas limit exceeded: limit = {}, estimate = {}",
210+
limit, estimate
211+
),
212+
SubmitTxError::GasUsageEstimateError(_) => {
213+
"Unable to estimate gas usage".to_string()
214+
}
215+
SubmitTxError::GasPriceEstimateError(_) => {
216+
"Unable to estimate gas price".to_string()
217+
}
218+
SubmitTxError::SubmissionError(_, _) => {
219+
"Error submitting the transaction on-chain".to_string()
220+
}
221+
SubmitTxError::ConfirmationTimeout(tx) => format!(
222+
"Transaction was submitted, but never confirmed. Hash: {}",
223+
tx.sighash()
224+
),
225+
SubmitTxError::ConfirmationError(tx, _) => format!(
226+
"Transaction was submitted, but never confirmed. Hash: {}",
227+
tx.sighash()
228+
),
229+
SubmitTxError::ReceiptError(tx, _) => {
230+
format!("Reveal transaction failed on-chain. Hash: {}", tx.sighash())
231+
}
232+
};
171233
status.state = RequestEntryState::Failed {
172-
reason: format!("Error revealing: {:?}", e),
234+
reason,
173235
provider_random_number: Some(provider_revelation),
174236
};
175237
history.add(&status);

0 commit comments

Comments
 (0)