Skip to content

Commit 09182ad

Browse files
refactor: forester(send_transaction): add timeout checks (#1631)
1 parent fa33116 commit 09182ad

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

forester/src/send_transaction.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
atomic::{AtomicBool, AtomicUsize, Ordering},
44
Arc,
55
},
6+
time::Duration,
67
vec,
78
};
89

@@ -144,7 +145,6 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
144145
return Ok(0);
145146
}
146147

147-
// Get priority fee and blockhash
148148
let (recent_blockhash, current_block_height) = {
149149
let mut rpc = pool.get_connection().await?;
150150
(
@@ -202,7 +202,6 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
202202
}
203203
});
204204

205-
// Process work items in chunks of `config.build_transaction_batch_config.batch_size`
206205
let work_items: Vec<WorkItem> = queue_item_data
207206
.into_iter()
208207
.map(|data| WorkItem {
@@ -211,10 +210,16 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
211210
})
212211
.collect();
213212

213+
let buffer_duration = Duration::from_secs(2);
214+
let adjusted_timeout = if config.retry_config.timeout > buffer_duration {
215+
config.retry_config.timeout - buffer_duration
216+
} else {
217+
return Ok(0);
218+
};
219+
let timeout_deadline = start_time + adjusted_timeout;
220+
214221
for work_chunk in work_items.chunks(config.build_transaction_batch_config.batch_size as usize) {
215-
if cancel_signal.load(Ordering::SeqCst)
216-
|| start_time.elapsed() >= config.retry_config.timeout
217-
{
222+
if cancel_signal.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline {
218223
break;
219224
}
220225

@@ -230,12 +235,22 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
230235
)
231236
.await?;
232237

233-
// Spawn transaction senders
238+
let now = Instant::now();
239+
if now >= timeout_deadline {
240+
break;
241+
}
242+
234243
for tx in transactions {
235244
if cancel_signal.load(Ordering::SeqCst) {
236245
break;
237246
}
238247

248+
let now = Instant::now();
249+
if now >= timeout_deadline {
250+
warn!("Reached timeout deadline, stopping batch processing");
251+
break;
252+
}
253+
239254
let tx_sender = tx_sender.clone();
240255
let pool_clone = pool.clone();
241256
let config = RpcSendTransactionConfig {
@@ -245,21 +260,25 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
245260
..Default::default()
246261
};
247262

263+
let cancel_signal_clone = cancel_signal.clone();
264+
let deadline = timeout_deadline;
265+
248266
tokio::spawn(async move {
267+
if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= deadline {
268+
return;
269+
}
270+
249271
if let Ok(mut rpc) = pool_clone.get_connection().await {
250272
let result = rpc.process_transaction_with_config(tx, config).await;
251-
let _ = tx_sender.send(result).await;
273+
if !cancel_signal_clone.load(Ordering::SeqCst) {
274+
let _ = tx_sender.send(result).await;
275+
}
252276
}
253277
});
254278
}
255279
}
256-
257-
// Drop sender to allow processor to complete
258280
drop(tx_sender);
259-
260-
// Wait for processor to complete
261281
processor_handle.await?;
262-
263282
Ok(num_sent_transactions.load(Ordering::SeqCst))
264283
}
265284

0 commit comments

Comments
 (0)