Skip to content

Commit b5e7363

Browse files
refactor: removed mutexed indexer from forester, instead use rpc.indexer() (#1864)
* refactor: removed mutexed indexer from forester, instead use rpc.indexer() * refactor: simplify generic constraints in instruction stream functions * Remove unused imports in tests * ignore legacy e2e test (photon needs to support custom trees before we can re-enable it) * Potential fix for code scanning alert no. 102: Workflow does not contain permissions Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
1 parent 86f703c commit b5e7363

File tree

21 files changed

+192
-264
lines changed

21 files changed

+192
-264
lines changed

.github/workflows/forester-tests.yml

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
name: forester-tests
22

3+
permissions:
4+
contents: read
5+
36
on:
47
push:
58
branches: [main]
@@ -38,26 +41,9 @@ env:
3841

3942
jobs:
4043
test:
41-
strategy:
42-
matrix:
43-
test-name:
44-
[
45-
{
46-
name: "e2e (legacy)",
47-
command: "test_e2e_v1",
48-
timeout: 60,
49-
needs-test-program: false,
50-
},
51-
{
52-
name: "e2e",
53-
command: "test_e2e_v2",
54-
timeout: 60,
55-
needs-test-program: true,
56-
},
57-
]
58-
name: test-${{ matrix.test-name.name }}
44+
name: Forester e2e test
5945
runs-on: warp-ubuntu-latest-x64-4x
60-
timeout-minutes: ${{ matrix.test-name.timeout }}
46+
timeout-minutes: 30
6147

6248
services:
6349
redis:
@@ -75,31 +61,35 @@ jobs:
7561

7662
steps:
7763
- uses: actions/checkout@v4
64+
7865
- name: Setup and build
7966
uses: ./.github/actions/setup-and-build
8067
with:
8168
skip-components: "redis"
69+
8270
- name: Clean build artifacts before tests
8371
shell: bash
8472
run: |
8573
cargo clean
8674
rm -rf target/debug/deps/*
75+
8776
- name: Check available disk space
8877
shell: bash
8978
run: |
9079
df -h /
9180
du -sh /home/runner/work/* | sort -hr | head -n 10
81+
9282
- name: Build CLI
9383
run: |
9484
source ./scripts/devenv.sh
9585
npx nx build @lightprotocol/zk-compression-cli
86+
9687
- name: Build test program
97-
if: ${{ matrix.test-name.needs-test-program }}
9888
run: |
9989
source ./scripts/devenv.sh
10090
cargo test-sbf -p create-address-test-program
10191
102-
- name: Run ${{ matrix.test-name.name }} tests
92+
- name: Test
10393
run: |
10494
source ./scripts/devenv.sh
105-
cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture
95+
cargo test --package forester e2e_test -- --nocapture

forester-utils/src/error.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use light_batched_merkle_tree::errors::BatchedMerkleTreeError;
2+
use light_client::rpc::RpcError;
23
use light_hasher::HasherError;
34
use thiserror::Error;
45

@@ -10,8 +11,8 @@ pub enum ForesterUtilsError {
1011
Parse(String),
1112
#[error("prover error: {0:?}")]
1213
Prover(String),
13-
#[error("rpc error: {0:?}")]
14-
Rpc(String),
14+
#[error("rpc error")]
15+
Rpc(#[from] RpcError),
1516
#[error("indexer error: {0:?}")]
1617
Indexer(String),
1718
#[error("invalid slot number")]
@@ -22,9 +23,6 @@ pub enum ForesterUtilsError {
2223
#[error("Account zero-copy error: {0}")]
2324
AccountZeroCopy(String),
2425

25-
#[error("light client error: {0}")]
26-
LightClient(#[from] light_client::rpc::RpcError),
27-
2826
#[error("batched merkle tree error: {0}")]
2927
BatchedMerkleTree(#[from] BatchedMerkleTreeError),
3028

forester-utils/src/instructions/address_batch_update.rs

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,14 @@ use light_prover_client::{
1919
proof_types::batch_address_append::get_batch_address_append_circuit_inputs,
2020
};
2121
use light_sparse_merkle_tree::SparseMerkleTree;
22-
use tokio::sync::Mutex;
2322
use tracing::{debug, error, info, warn};
2423

25-
use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer};
24+
use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool};
2625

2726
const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 500;
2827

29-
pub struct AddressUpdateConfig<R, I>
30-
where
31-
R: Rpc + Send + Sync,
32-
I: Indexer + Send,
33-
{
28+
pub struct AddressUpdateConfig<R: Rpc> {
3429
pub rpc_pool: Arc<SolanaRpcPool<R>>,
35-
pub indexer: Arc<Mutex<I>>,
3630
pub merkle_tree_pubkey: Pubkey,
3731
pub prover_url: String,
3832
pub polling_interval: Duration,
@@ -41,8 +35,8 @@ where
4135
}
4236

4337
#[allow(clippy::too_many_arguments)]
44-
fn stream_instruction_data<'a, I>(
45-
indexer: Arc<Mutex<I>>,
38+
async fn stream_instruction_data<'a, R: Rpc>(
39+
rpc_pool: Arc<SolanaRpcPool<R>>,
4640
merkle_tree_pubkey: Pubkey,
4741
prover_url: String,
4842
polling_interval: Duration,
@@ -53,24 +47,24 @@ fn stream_instruction_data<'a, I>(
5347
mut current_root: [u8; 32],
5448
yield_batch_size: usize,
5549
) -> impl Stream<Item = Result<Vec<InstructionDataAddressAppendInputs>, ForesterUtilsError>> + Send + 'a
56-
where
57-
I: Indexer + Send + 'a,
5850
{
5951
stream! {
6052
let proof_client = Arc::new(ProofClient::with_config(prover_url, polling_interval, max_wait_time));
6153
let max_zkp_batches_per_call = calculate_max_zkp_batches_per_call(zkp_batch_size);
6254
let total_chunks = leaves_hash_chains.len().div_ceil(max_zkp_batches_per_call);
6355

6456
for chunk_idx in 0..total_chunks {
65-
let mut indexer_guard = indexer.lock().await;
6657
let chunk_start = chunk_idx * max_zkp_batches_per_call;
6758
let chunk_end = std::cmp::min(chunk_start + max_zkp_batches_per_call, leaves_hash_chains.len());
6859
let chunk_hash_chains = &leaves_hash_chains[chunk_start..chunk_end];
6960

7061
let elements_for_chunk = chunk_hash_chains.len() * zkp_batch_size as usize;
7162
let processed_items_offset = chunk_start * zkp_batch_size as usize;
7263

73-
let indexer_update_info = match indexer_guard
64+
let indexer_update_info = {
65+
let mut connection = rpc_pool.get_connection().await?;
66+
let indexer = connection.indexer_mut()?;
67+
match indexer
7468
.get_address_queue_with_proofs(
7569
&merkle_tree_pubkey,
7670
elements_for_chunk as u16,
@@ -83,7 +77,8 @@ where
8377
yield Err(ForesterUtilsError::Indexer(format!("Failed to get address queue with proofs: {}", e)));
8478
return;
8579
}
86-
};
80+
}
81+
};
8782

8883
if chunk_idx == 0 {
8984
if let Some(first_proof) = indexer_update_info.value.non_inclusion_proofs.first() {
@@ -296,8 +291,8 @@ fn get_all_circuit_inputs_for_chunk(
296291
Ok((all_inputs, current_root))
297292
}
298293

299-
pub async fn get_address_update_instruction_stream<'a, R, I>(
300-
config: AddressUpdateConfig<R, I>,
294+
pub async fn get_address_update_instruction_stream<'a, R: Rpc>(
295+
config: AddressUpdateConfig<R>,
301296
merkle_tree_data: crate::ParsedMerkleTreeData,
302297
) -> Result<
303298
(
@@ -312,17 +307,7 @@ pub async fn get_address_update_instruction_stream<'a, R, I>(
312307
u16,
313308
),
314309
ForesterUtilsError,
315-
>
316-
where
317-
R: Rpc + Send + Sync + 'a,
318-
I: Indexer + Send + 'a,
319-
{
320-
let rpc = config.rpc_pool.get_connection().await?;
321-
let indexer_guard = config.indexer.lock().await;
322-
wait_for_indexer(&*rpc, &*indexer_guard).await?;
323-
drop(rpc);
324-
drop(indexer_guard);
325-
310+
> {
326311
let (current_root, leaves_hash_chains, start_index, zkp_batch_size) = (
327312
merkle_tree_data.current_root,
328313
merkle_tree_data.leaves_hash_chains,
@@ -336,7 +321,7 @@ where
336321
}
337322

338323
let stream = stream_instruction_data(
339-
config.indexer,
324+
config.rpc_pool,
340325
config.merkle_tree_pubkey,
341326
config.prover_url,
342327
config.polling_interval,
@@ -346,7 +331,8 @@ where
346331
zkp_batch_size,
347332
current_root,
348333
config.ixs_per_tx,
349-
);
334+
)
335+
.await;
350336

351337
Ok((Box::pin(stream), zkp_batch_size))
352338
}

forester-utils/src/instructions/state_batch_append.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use light_prover_client::{
1818
proof_types::batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs},
1919
};
2020
use light_sparse_merkle_tree::changelog::ChangelogEntry;
21-
use tokio::sync::Mutex;
2221
use tracing::trace;
2322

2423
use crate::{
@@ -45,9 +44,8 @@ async fn generate_zkp_proof(
4544
}
4645

4746
#[allow(clippy::too_many_arguments)]
48-
pub async fn get_append_instruction_stream<'a, R, I>(
47+
pub async fn get_append_instruction_stream<'a, R: Rpc>(
4948
rpc_pool: Arc<SolanaRpcPool<R>>,
50-
indexer: Arc<Mutex<I>>,
5149
merkle_tree_pubkey: Pubkey,
5250
prover_url: String,
5351
polling_interval: Duration,
@@ -67,16 +65,8 @@ pub async fn get_append_instruction_stream<'a, R, I>(
6765
u16,
6866
),
6967
ForesterUtilsError,
70-
>
71-
where
72-
R: Rpc + Send + Sync + 'a,
73-
I: Indexer + Send + 'a,
74-
{
68+
> {
7569
trace!("Initializing append batch instruction stream with parsed data");
76-
77-
let (indexer_guard, rpc_result) = tokio::join!(indexer.lock(), rpc_pool.get_connection());
78-
let rpc = rpc_result?;
79-
8070
let (merkle_tree_next_index, mut current_root, _) = (
8171
merkle_tree_data.next_index,
8272
merkle_tree_data.current_root,
@@ -91,19 +81,18 @@ where
9181
trace!("No hash chains to process, returning empty stream.");
9282
return Ok((Box::pin(futures::stream::empty()), zkp_batch_size));
9383
}
94-
95-
wait_for_indexer(&*rpc, &*indexer_guard).await?;
84+
let rpc = rpc_pool.get_connection().await?;
85+
wait_for_indexer(&*rpc).await?;
9686
drop(rpc);
97-
drop(indexer_guard);
9887

9988
let stream = stream! {
10089
let total_elements = zkp_batch_size as usize * leaves_hash_chains.len();
10190
let offset = merkle_tree_next_index;
10291

10392
let queue_elements = {
104-
let mut indexer_guard = indexer.lock().await;
105-
106-
match indexer_guard
93+
let mut connection = rpc_pool.get_connection().await?;
94+
let indexer = connection.indexer_mut()?;
95+
match indexer
10796
.get_queue_elements(
10897
merkle_tree_pubkey.to_bytes(),
10998
QueueType::OutputStateV2,

forester-utils/src/instructions/state_batch_nullify.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use light_prover_client::{
1717
proof_client::ProofClient,
1818
proof_types::batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs},
1919
};
20-
use tokio::sync::Mutex;
2120
use tracing::{debug, trace};
2221

2322
use crate::{
@@ -44,9 +43,8 @@ async fn generate_nullify_zkp_proof(
4443
}
4544

4645
#[allow(clippy::too_many_arguments)]
47-
pub async fn get_nullify_instruction_stream<'a, R, I>(
46+
pub async fn get_nullify_instruction_stream<'a, R: Rpc>(
4847
rpc_pool: Arc<SolanaRpcPool<R>>,
49-
indexer: Arc<Mutex<I>>,
5048
merkle_tree_pubkey: Pubkey,
5149
prover_url: String,
5250
polling_interval: Duration,
@@ -66,13 +64,7 @@ pub async fn get_nullify_instruction_stream<'a, R, I>(
6664
u16,
6765
),
6866
ForesterUtilsError,
69-
>
70-
where
71-
R: Rpc + Send + Sync + 'a,
72-
I: Indexer + Send + 'a,
73-
{
74-
let rpc = rpc_pool.get_connection().await?;
75-
67+
> {
7668
let (mut current_root, leaves_hash_chains, num_inserted_zkps, zkp_batch_size) = (
7769
merkle_tree_data.current_root,
7870
merkle_tree_data.leaves_hash_chains,
@@ -85,10 +77,9 @@ where
8577
return Ok((Box::pin(futures::stream::empty()), zkp_batch_size));
8678
}
8779

88-
let indexer_guard = indexer.lock().await;
89-
wait_for_indexer(&*rpc, &*indexer_guard).await?;
80+
let rpc = rpc_pool.get_connection().await?;
81+
wait_for_indexer(&*rpc).await?;
9082
drop(rpc);
91-
drop(indexer_guard);
9283

9384
let stream = stream! {
9485
let total_elements = zkp_batch_size as usize * leaves_hash_chains.len();
@@ -97,9 +88,9 @@ where
9788
trace!("Requesting {} total elements with offset {}", total_elements, offset);
9889

9990
let all_queue_elements = {
100-
let mut indexer_guard = indexer.lock().await;
101-
indexer_guard
102-
.get_queue_elements(
91+
let mut connection = rpc_pool.get_connection().await?;
92+
let indexer = connection.indexer_mut()?;
93+
indexer.get_queue_elements(
10394
merkle_tree_pubkey.to_bytes(),
10495
QueueType::InputStateV2,
10596
total_elements as u16,

forester-utils/src/utils.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,10 @@ pub async fn airdrop_lamports<R: Rpc>(
2828
Ok(())
2929
}
3030

31-
pub async fn wait_for_indexer<R: Rpc, I: Indexer>(
32-
rpc: &R,
33-
indexer: &I,
34-
) -> Result<(), ForesterUtilsError> {
35-
let rpc_slot = rpc
36-
.get_slot()
37-
.await
38-
.map_err(|_| ForesterUtilsError::Rpc("Failed to get rpc slot".into()))?;
31+
pub async fn wait_for_indexer<R: Rpc>(rpc: &R) -> Result<(), ForesterUtilsError> {
32+
let rpc_slot = rpc.get_slot().await?;
3933

40-
let indexer_slot = indexer.get_indexer_slot(None).await;
34+
let indexer_slot = rpc.indexer()?.get_indexer_slot(None).await;
4135

4236
let mut indexer_slot = match indexer_slot {
4337
Ok(slot) => slot,
@@ -66,7 +60,7 @@ pub async fn wait_for_indexer<R: Rpc, I: Indexer>(
6660

6761
tokio::task::yield_now().await;
6862
sleep(std::time::Duration::from_millis(500)).await;
69-
indexer_slot = indexer.get_indexer_slot(None).await.map_err(|e| {
63+
indexer_slot = rpc.indexer()?.get_indexer_slot(None).await.map_err(|e| {
7064
error!("failed to get indexer slot from indexer: {:?}", e);
7165
ForesterUtilsError::Indexer("Failed to get indexer slot".into())
7266
})?;

forester/package.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@
44
"license": "GPL-3.0",
55
"scripts": {
66
"build": "cargo build",
7-
"test": "RUSTFLAGS=\"--cfg tokio_unstable -D warnings\" cargo test --package forester -- --nocapture",
8-
"e2e-test": "source .env && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_e2e_v2 -- --nocapture",
9-
"test-address-batched-local": "RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_address_batched -- --nocapture",
10-
"e2e-v1": "RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_e2e_v1 -- --nocapture",
11-
"test-address-v2": "RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_create_v2_address -- --nocapture",
12-
"test-state-batched-indexer-async": "RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_state_indexer_async_batched -- --nocapture",
7+
"test": "source .env && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_e2e_v2 -- --nocapture",
138
"docker:build": "docker build --tag forester -f Dockerfile .."
149
},
1510
"devDependencies": {

0 commit comments

Comments
 (0)