Skip to content

Commit 09b69df

Browse files
feat: implement batch ops cache with configurable TTL for batch ops (#1833)
* feat: implement transaction cache with configurable TTL for batch processing * refactor: rename transaction cache TTL to operations cache TTL * restore tx cache TTL configuration for v1 updates * fix: reduce transaction cache TTL from 180 to 15 seconds * fix: correct ops_cache reference and adjust tx_cache TTL in tests * refactor: ops cache cleanup
1 parent 6eaaeb5 commit 09b69df

File tree

11 files changed

+154
-13
lines changed

11 files changed

+154
-13
lines changed

forester/.env.example

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ export FORESTER_INDEXER_URL="http://localhost:8784"
44
export FORESTER_PROVER_URL="http://localhost:3001"
55
export FORESTER_PUSH_GATEWAY_URL="http://localhost:9092/metrics/job/forester"
66
export FORESTER_PAYER=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64]
7-
export PHOTON_API_KEY="00000000-0000-0000-0000-000000000000"
7+
export PHOTON_API_KEY="00000000-0000-0000-0000-000000000000"
8+
export FORESTER_OPS_CACHE_TTL_SECONDS=300
9+
export FORESTER_TX_CACHE_TTL_SECONDS=15

forester/src/cli.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ pub struct StartArgs {
6666
)]
6767
pub transaction_max_concurrent_batches: usize,
6868

69+
#[arg(
70+
long,
71+
env = "FORESTER_TX_CACHE_TTL_SECONDS",
72+
default_value = "180",
73+
help = "TTL in seconds to prevent duplicate transaction processing"
74+
)]
75+
pub tx_cache_ttl_seconds: u64,
76+
77+
#[arg(
78+
long,
79+
env = "FORESTER_OPS_CACHE_TTL_SECONDS",
80+
default_value = "180",
81+
help = "TTL in seconds to prevent duplicate batch operations processing"
82+
)]
83+
pub ops_cache_ttl_seconds: u64,
84+
6985
#[arg(long, env = "FORESTER_CU_LIMIT", default_value = "1000000")]
7086
pub cu_limit: u32,
7187

forester/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub struct TransactionConfig {
6464
pub max_concurrent_batches: usize,
6565
pub cu_limit: u32,
6666
pub enable_priority_fees: bool,
67+
pub tx_cache_ttl_seconds: u64,
68+
pub ops_cache_ttl_seconds: u64,
6769
}
6870

6971
#[derive(Debug, Clone)]
@@ -155,6 +157,8 @@ impl Default for TransactionConfig {
155157
max_concurrent_batches: 20,
156158
cu_limit: 1_000_000,
157159
enable_priority_fees: false,
160+
tx_cache_ttl_seconds: 15,
161+
ops_cache_ttl_seconds: 180,
158162
}
159163
}
160164
}
@@ -234,6 +238,8 @@ impl ForesterConfig {
234238
max_concurrent_batches: args.transaction_max_concurrent_batches,
235239
cu_limit: args.cu_limit,
236240
enable_priority_fees: args.enable_priority_fees,
241+
tx_cache_ttl_seconds: args.tx_cache_ttl_seconds,
242+
ops_cache_ttl_seconds: args.ops_cache_ttl_seconds,
237243
},
238244
general_config: GeneralConfig {
239245
slot_update_interval_seconds: args.slot_update_interval_seconds,

forester/src/epoch_manager.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub struct EpochManager<R: Rpc, I: Indexer> {
101101
processing_epochs: Arc<DashMap<u64, Arc<AtomicBool>>>,
102102
new_tree_sender: broadcast::Sender<TreeAccounts>,
103103
tx_cache: Arc<Mutex<ProcessedHashCache>>,
104+
ops_cache: Arc<Mutex<ProcessedHashCache>>,
104105
}
105106

106107
impl<R: Rpc, I: Indexer> Clone for EpochManager<R, I> {
@@ -117,6 +118,7 @@ impl<R: Rpc, I: Indexer> Clone for EpochManager<R, I> {
117118
processing_epochs: self.processing_epochs.clone(),
118119
new_tree_sender: self.new_tree_sender.clone(),
119120
tx_cache: self.tx_cache.clone(),
121+
ops_cache: self.ops_cache.clone(),
120122
}
121123
}
122124
}
@@ -133,6 +135,7 @@ impl<R: Rpc, I: Indexer + IndexerType<R> + 'static> EpochManager<R, I> {
133135
slot_tracker: Arc<SlotTracker>,
134136
new_tree_sender: broadcast::Sender<TreeAccounts>,
135137
tx_cache: Arc<Mutex<ProcessedHashCache>>,
138+
ops_cache: Arc<Mutex<ProcessedHashCache>>,
136139
) -> Result<Self> {
137140
Ok(Self {
138141
config,
@@ -146,6 +149,7 @@ impl<R: Rpc, I: Indexer + IndexerType<R> + 'static> EpochManager<R, I> {
146149
processing_epochs: Arc::new(DashMap::new()),
147150
new_tree_sender,
148151
tx_cache,
152+
ops_cache,
149153
})
150154
}
151155

@@ -987,6 +991,10 @@ impl<R: Rpc, I: Indexer + IndexerType<R> + 'static> EpochManager<R, I> {
987991
}
988992
};
989993

994+
debug!(
995+
"Processed {} items in slot {:?}",
996+
items_processed_this_iteration, forester_slot_details.slot
997+
);
990998
self.update_metrics_and_counts(
991999
epoch_info.epoch,
9921000
items_processed_this_iteration,
@@ -1138,6 +1146,7 @@ impl<R: Rpc, I: Indexer + IndexerType<R> + 'static> EpochManager<R, I> {
11381146
.unwrap_or_else(|| "http://127.0.0.1:3001".to_string()),
11391147
prover_polling_interval: Duration::from_secs(1),
11401148
prover_max_wait_time: Duration::from_secs(120),
1149+
ops_cache: self.ops_cache.clone(),
11411150
};
11421151

11431152
process_batched_operations(batch_context, tree_accounts.tree_type)
@@ -1363,6 +1372,7 @@ pub async fn run_service<R: Rpc, I: Indexer + IndexerType<R> + 'static>(
13631372
work_report_sender: mpsc::Sender<WorkReport>,
13641373
slot_tracker: Arc<SlotTracker>,
13651374
tx_cache: Arc<Mutex<ProcessedHashCache>>,
1375+
ops_cache: Arc<Mutex<ProcessedHashCache>>,
13661376
) -> Result<()> {
13671377
info_span!("run_service", forester = %config.payer_keypair.pubkey())
13681378
.in_scope(|| async {
@@ -1406,6 +1416,7 @@ pub async fn run_service<R: Rpc, I: Indexer + IndexerType<R> + 'static>(
14061416
slot_tracker.clone(),
14071417
new_tree_sender.clone(),
14081418
tx_cache.clone(),
1419+
ops_cache.clone(),
14091420
)
14101421
.await
14111422
{

forester/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,13 @@ pub async fn run_pipeline<R: Rpc, I: Indexer + IndexerType<R> + 'static>(
166166
SlotTracker::run(arc_slot_tracker_clone, &mut *rpc).await;
167167
});
168168

169-
let tx_cache = Arc::new(Mutex::new(ProcessedHashCache::new(15)));
169+
let tx_cache = Arc::new(Mutex::new(ProcessedHashCache::new(
170+
config.transaction_config.tx_cache_ttl_seconds,
171+
)));
172+
173+
let ops_cache = Arc::new(Mutex::new(ProcessedHashCache::new(
174+
config.transaction_config.ops_cache_ttl_seconds,
175+
)));
170176

171177
debug!("Starting Forester pipeline");
172178
run_service(
@@ -178,6 +184,7 @@ pub async fn run_pipeline<R: Rpc, I: Indexer + IndexerType<R> + 'static>(
178184
work_report_sender,
179185
arc_slot_tracker,
180186
tx_cache,
187+
ops_cache,
181188
)
182189
.await?;
183190
Ok(())

forester/src/processor/tx_cache.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ impl ProcessedHashCache {
3030
self.entries
3131
.retain(|_, timestamp| now.duration_since(*timestamp) < self.ttl);
3232
}
33+
34+
pub fn cleanup_by_key(&mut self, key: &str) {
35+
self.entries.remove(key);
36+
}
3337
}

forester/src/processor/v2/address.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use light_client::{indexer::Indexer, rpc::Rpc};
44
use light_merkle_tree_metadata::events::MerkleTreeEvent;
55
use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction;
66
use solana_sdk::signer::Signer;
7-
use tracing::{debug, info, instrument, log::error};
7+
use tracing::{debug, info, instrument, log::error, trace};
88

99
use super::{
1010
common::BatchContext,
@@ -16,7 +16,8 @@ use crate::indexer_type::{finalize_batch_address_tree_update, IndexerType};
1616
pub(crate) async fn process_batch<R: Rpc, I: Indexer + IndexerType<R>>(
1717
context: &BatchContext<R, I>,
1818
) -> Result<usize> {
19-
info!("Processing address batch operation");
19+
trace!("Processing address batch operation");
20+
2021
let mut rpc = context.rpc_pool.get_connection().await?;
2122

2223
let (instruction_data_vec, zkp_batch_size) = create_batch_update_address_tree_instruction_data(
@@ -37,7 +38,9 @@ pub(crate) async fn process_batch<R: Rpc, I: Indexer + IndexerType<R>>(
3738
})?;
3839

3940
if instruction_data_vec.is_empty() {
40-
debug!("No ZKP batches to process for address tree");
41+
trace!("No ZKP batches to process for address tree");
42+
let mut cache = context.ops_cache.lock().await;
43+
cache.cleanup();
4144
return Ok(0);
4245
}
4346

forester/src/processor/v2/common.rs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ use light_compressed_account::TreeType;
1111
use solana_program::pubkey::Pubkey;
1212
use solana_sdk::signature::Keypair;
1313
use tokio::sync::Mutex;
14-
use tracing::{error, trace};
14+
use tracing::{debug, error, trace};
1515

1616
use super::{address, error::Result, state, BatchProcessError};
17-
use crate::indexer_type::IndexerType;
17+
use crate::{indexer_type::IndexerType, processor::tx_cache::ProcessedHashCache};
1818

1919
#[derive(Debug)]
2020
pub struct BatchContext<R: Rpc, I: Indexer> {
@@ -29,6 +29,7 @@ pub struct BatchContext<R: Rpc, I: Indexer> {
2929
pub prover_url: String,
3030
pub prover_polling_interval: Duration,
3131
pub prover_max_wait_time: Duration,
32+
pub ops_cache: Arc<Mutex<ProcessedHashCache>>,
3233
}
3334

3435
#[derive(Debug)]
@@ -58,7 +59,40 @@ impl<R: Rpc, I: Indexer + IndexerType<R>> BatchProcessor<R, I> {
5859

5960
match state {
6061
BatchReadyState::ReadyForAppend => match self.tree_type {
61-
TreeType::AddressV2 => address::process_batch(&self.context).await,
62+
TreeType::AddressV2 => {
63+
trace!(
64+
"Processing address append for tree: {}",
65+
self.context.merkle_tree
66+
);
67+
68+
let batch_hash = format!(
69+
"address_batch_{}_{}",
70+
self.context.merkle_tree, self.context.epoch
71+
);
72+
{
73+
let mut cache = self.context.ops_cache.lock().await;
74+
if cache.contains(&batch_hash) {
75+
debug!("Skipping already processed address batch: {}", batch_hash);
76+
return Ok(0);
77+
}
78+
cache.add(&batch_hash);
79+
}
80+
81+
let result = address::process_batch(&self.context).await;
82+
83+
if let Err(ref e) = result {
84+
error!(
85+
"Address append failed for tree {}: {:?}",
86+
self.context.merkle_tree, e
87+
);
88+
}
89+
90+
let mut cache = self.context.ops_cache.lock().await;
91+
cache.cleanup_by_key(&batch_hash);
92+
trace!("Cache cleaned up for batch: {}", batch_hash);
93+
94+
result
95+
}
6296
TreeType::StateV2 => {
6397
trace!(
6498
"Process state append for tree: {}",
@@ -218,7 +252,31 @@ impl<R: Rpc, I: Indexer + IndexerType<R>> BatchProcessor<R, I> {
218252
async fn process_state_append(&self) -> Result<usize> {
219253
let mut rpc = self.context.rpc_pool.get_connection().await?;
220254
let (_, zkp_batch_size) = self.get_num_inserted_zkps(&mut rpc).await?;
255+
256+
let batch_hash = format!(
257+
"state_append_{}_{}",
258+
self.context.merkle_tree, self.context.epoch
259+
);
260+
{
261+
let mut cache = self.context.ops_cache.lock().await;
262+
if cache.contains(&batch_hash) {
263+
trace!(
264+
"Skipping already processed state append batch: {}",
265+
batch_hash
266+
);
267+
return Ok(0);
268+
}
269+
cache.add(&batch_hash);
270+
}
221271
state::perform_append(&self.context, &mut rpc).await?;
272+
trace!(
273+
"State append operation completed for tree: {}",
274+
self.context.merkle_tree
275+
);
276+
277+
let mut cache = self.context.ops_cache.lock().await;
278+
cache.cleanup_by_key(&batch_hash);
279+
222280
Ok(zkp_batch_size)
223281
}
224282

@@ -231,7 +289,34 @@ impl<R: Rpc, I: Indexer + IndexerType<R>> BatchProcessor<R, I> {
231289
zkp_batch_size,
232290
inserted_zkps_count
233291
);
292+
293+
let batch_hash = format!(
294+
"state_nullify_{}_{}",
295+
self.context.merkle_tree, self.context.epoch
296+
);
297+
298+
{
299+
let mut cache = self.context.ops_cache.lock().await;
300+
if cache.contains(&batch_hash) {
301+
trace!(
302+
"Skipping already processed state nullify batch: {}",
303+
batch_hash
304+
);
305+
return Ok(0);
306+
}
307+
cache.add(&batch_hash);
308+
}
309+
234310
state::perform_nullify(&self.context, &mut rpc).await?;
311+
312+
trace!(
313+
"State nullify operation completed for tree: {}",
314+
self.context.merkle_tree
315+
);
316+
let mut cache = self.context.ops_cache.lock().await;
317+
cache.cleanup_by_key(&batch_hash);
318+
trace!("Cache cleaned up for batch: {}", batch_hash);
319+
235320
Ok(zkp_batch_size)
236321
}
237322

forester/src/processor/v2/state.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use light_registry::account_compression_cpi::sdk::{
88
create_batch_append_instruction, create_batch_nullify_instruction,
99
};
1010
use solana_sdk::signer::Signer;
11-
use tracing::{debug, info, instrument, log::error};
11+
use tracing::{debug, info, instrument, log::error, trace};
1212

1313
use super::{
1414
common::BatchContext,
@@ -47,7 +47,9 @@ pub(crate) async fn perform_append<R: Rpc, I: Indexer + IndexerType<R>>(
4747
})?;
4848

4949
if instruction_data_vec.is_empty() {
50-
debug!("No zkp batches to append");
50+
trace!("No zkp batches to append");
51+
let mut cache = context.ops_cache.lock().await;
52+
cache.cleanup();
5153
return Ok(());
5254
}
5355

@@ -159,7 +161,9 @@ pub(crate) async fn perform_nullify<R: Rpc, I: Indexer + IndexerType<R>>(
159161
})?;
160162

161163
if instruction_data_vec.is_empty() {
162-
debug!("No zkp batches to nullify");
164+
trace!("No zkp batches to nullify");
165+
let mut cache = context.ops_cache.lock().await;
166+
cache.cleanup();
163167
return Ok(());
164168
}
165169

forester/tests/batched_state_test.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ async fn test_state_batched() {
246246
let timeout_duration = Duration::from_secs(60 * 10);
247247
match timeout(timeout_duration, work_report_receiver.recv()).await {
248248
Ok(Some(report)) => {
249-
info!("Received work report: {:?}", report);
250-
info!(
249+
println!("Received work report: {:?}", report);
250+
println!(
251251
"Work report debug:
252252
reported_items: {}
253253
batch_size: {}
@@ -284,6 +284,7 @@ async fn test_state_batched() {
284284
)
285285
.unwrap();
286286

287+
println!("Merkle tree metadata: {:?}", merkle_tree.get_metadata());
287288
assert!(
288289
merkle_tree.get_metadata().queue_batches.pending_batch_index > 0,
289290
"No batches were processed"

0 commit comments

Comments
 (0)