Skip to content

Commit cb803c0

Browse files
feat: optimize forester by parallelizing zkp proof generation for batch append and nullify (#1633)
* wip * parallel batch proof gen * Remove debug logs * remove unnecessary init_logger calls from tests * cleanup * cleanup * Add delays to stabilize async indexer batch tests * test_indexer: get_queue_elements with 0 offset * fix * Group transactions into chunks for batch processing. * Remove sleep delay in transaction processing. * Refactor chunking logic with a constant for chunk size. * add batch_ixs_per_tx configuration for transaction batching Replaces the fixed CHUNK_SIZE with a configurable batch_ixs_per_tx parameter, allowing dynamic control over instructions per transaction. * Change batch_ixs_per_tx to 1 in test configurations * format * switch to warp-ubuntu-latest-x64-4x for forester ci workflow
1 parent dbe09f3 commit cb803c0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1584
-1043
lines changed

Cargo.lock

Lines changed: 3 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ members = [
4343

4444
resolver = "2"
4545

46+
[profile.dev]
47+
opt-level = 0
48+
debug = true
49+
strip = "none"
50+
4651
[profile.release]
4752
overflow-checks = true
4853

49-
[profile.test]
50-
opt-level = 2
51-
5254
[workspace.dependencies]
5355
# Solana
5456
solana-banks-client = "=1.18.22"

forester-utils/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ solana-sdk = { workspace = true }
3636
thiserror = { workspace = true }
3737

3838
# Logging
39-
log = { workspace = true }
39+
tracing = { workspace = true }
4040

4141
# Big numbers
4242
num-traits = { workspace = true }
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::{fmt, marker::PhantomData, mem, pin::Pin};
2+
3+
use account_compression::processor::initialize_address_merkle_tree::Pubkey;
4+
use light_client::rpc::RpcConnection;
5+
use light_concurrent_merkle_tree::copy::ConcurrentMerkleTreeCopy;
6+
use light_hash_set::HashSet;
7+
use light_hasher::Hasher;
8+
use light_indexed_merkle_tree::copy::IndexedMerkleTreeCopy;
9+
use num_traits::{CheckedAdd, CheckedSub, ToBytes, Unsigned};
10+
use solana_sdk::account::Account;
11+
12+
#[derive(Debug, Clone)]
13+
pub struct AccountZeroCopy<'a, T> {
14+
pub account: Pin<Box<Account>>,
15+
deserialized: *const T,
16+
_phantom_data: PhantomData<&'a T>,
17+
}
18+
19+
impl<'a, T> AccountZeroCopy<'a, T> {
20+
pub async fn new<R: RpcConnection>(rpc: &mut R, address: Pubkey) -> AccountZeroCopy<'a, T> {
21+
let account = Box::pin(rpc.get_account(address).await.unwrap().unwrap());
22+
let deserialized = account.data[8..].as_ptr() as *const T;
23+
24+
Self {
25+
account,
26+
deserialized,
27+
_phantom_data: PhantomData,
28+
}
29+
}
30+
31+
// Safe method to access `deserialized` ensuring the lifetime is respected
32+
pub fn deserialized(&self) -> &'a T {
33+
unsafe { &*self.deserialized }
34+
}
35+
}
36+
37+
/// Fetches the given account, then copies and serializes it as a `HashSet`.
38+
///
39+
/// # Safety
40+
///
41+
/// This is highly unsafe. Ensuring that:
42+
///
43+
/// * The correct account is used.
44+
/// * The account has enough space to be treated as a HashSet with specified
45+
/// parameters.
46+
/// * The account data is aligned.
47+
///
48+
/// Is the caller's responsibility.
49+
pub async unsafe fn get_hash_set<T, R: RpcConnection>(rpc: &mut R, pubkey: Pubkey) -> HashSet {
50+
let mut data = rpc.get_account(pubkey).await.unwrap().unwrap().data.clone();
51+
52+
HashSet::from_bytes_copy(&mut data[8 + mem::size_of::<T>()..]).unwrap()
53+
}
54+
55+
/// Fetches the given account, then copies and serializes it as a
56+
/// `ConcurrentMerkleTree`.
57+
pub async fn get_concurrent_merkle_tree<T, R, H, const HEIGHT: usize>(
58+
rpc: &mut R,
59+
pubkey: Pubkey,
60+
) -> ConcurrentMerkleTreeCopy<H, HEIGHT>
61+
where
62+
R: RpcConnection,
63+
H: Hasher,
64+
{
65+
let account = rpc.get_account(pubkey).await.unwrap().unwrap();
66+
67+
ConcurrentMerkleTreeCopy::from_bytes_copy(&account.data[8 + mem::size_of::<T>()..]).unwrap()
68+
}
69+
// TODO: do discriminator check
70+
/// Fetches the fiven account, then copies and serializes it as an
71+
/// `IndexedMerkleTree`.
72+
pub async fn get_indexed_merkle_tree<T, R, H, I, const HEIGHT: usize, const NET_HEIGHT: usize>(
73+
rpc: &mut R,
74+
pubkey: Pubkey,
75+
) -> IndexedMerkleTreeCopy<H, I, HEIGHT, NET_HEIGHT>
76+
where
77+
R: RpcConnection,
78+
H: Hasher,
79+
I: CheckedAdd
80+
+ CheckedSub
81+
+ Copy
82+
+ Clone
83+
+ fmt::Debug
84+
+ PartialOrd
85+
+ ToBytes
86+
+ TryFrom<usize>
87+
+ Unsigned,
88+
usize: From<I>,
89+
{
90+
let account = rpc.get_account(pubkey).await.unwrap().unwrap();
91+
92+
IndexedMerkleTreeCopy::from_bytes_copy(&account.data[8 + mem::size_of::<T>()..]).unwrap()
93+
}

forester-utils/src/address_merkle_tree_config.rs

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@ use light_hasher::Poseidon;
1313
use num_traits::Zero;
1414
use solana_sdk::pubkey::Pubkey;
1515

16-
use crate::{get_concurrent_merkle_tree, get_hash_set, get_indexed_merkle_tree, AccountZeroCopy};
16+
use crate::account_zero_copy::{
17+
get_concurrent_merkle_tree, get_hash_set, get_indexed_merkle_tree, AccountZeroCopy,
18+
};
1719

1820
pub async fn get_address_bundle_config<R: RpcConnection>(
1921
rpc: &mut R,
2022
address_bundle: AddressMerkleTreeAccounts,
2123
) -> (AddressMerkleTreeConfig, AddressQueueConfig) {
22-
let address_queue_meta_data =
23-
AccountZeroCopy::<account_compression::QueueAccount>::new(rpc, address_bundle.queue)
24-
.await
25-
.deserialized()
26-
.metadata;
24+
let address_queue_meta_data = AccountZeroCopy::<QueueAccount>::new(rpc, address_bundle.queue)
25+
.await
26+
.deserialized()
27+
.metadata;
2728
let address_queue = unsafe { get_hash_set::<QueueAccount, R>(rpc, address_bundle.queue).await };
2829
let queue_config = AddressQueueConfig {
2930
network_fee: Some(address_queue_meta_data.rollover_metadata.network_fee),
@@ -32,13 +33,10 @@ pub async fn get_address_bundle_config<R: RpcConnection>(
3233
sequence_threshold: address_queue.sequence_threshold as u64,
3334
};
3435
let address_tree_meta_data =
35-
AccountZeroCopy::<account_compression::AddressMerkleTreeAccount>::new(
36-
rpc,
37-
address_bundle.merkle_tree,
38-
)
39-
.await
40-
.deserialized()
41-
.metadata;
36+
AccountZeroCopy::<AddressMerkleTreeAccount>::new(rpc, address_bundle.merkle_tree)
37+
.await
38+
.deserialized()
39+
.metadata;
4240
let address_tree =
4341
get_indexed_merkle_tree::<AddressMerkleTreeAccount, R, Poseidon, usize, 26, 16>(
4442
rpc,
@@ -70,13 +68,11 @@ pub async fn get_state_bundle_config<R: RpcConnection>(
7068
rpc: &mut R,
7169
state_tree_bundle: StateMerkleTreeAccounts,
7270
) -> (StateMerkleTreeConfig, NullifierQueueConfig) {
73-
let address_queue_meta_data = AccountZeroCopy::<account_compression::QueueAccount>::new(
74-
rpc,
75-
state_tree_bundle.nullifier_queue,
76-
)
77-
.await
78-
.deserialized()
79-
.metadata;
71+
let address_queue_meta_data =
72+
AccountZeroCopy::<QueueAccount>::new(rpc, state_tree_bundle.nullifier_queue)
73+
.await
74+
.deserialized()
75+
.metadata;
8076
let address_queue =
8177
unsafe { get_hash_set::<QueueAccount, R>(rpc, state_tree_bundle.nullifier_queue).await };
8278
let queue_config = NullifierQueueConfig {
@@ -85,13 +81,10 @@ pub async fn get_state_bundle_config<R: RpcConnection>(
8581
sequence_threshold: address_queue.sequence_threshold as u64,
8682
};
8783
let address_tree_meta_data =
88-
AccountZeroCopy::<account_compression::StateMerkleTreeAccount>::new(
89-
rpc,
90-
state_tree_bundle.merkle_tree,
91-
)
92-
.await
93-
.deserialized()
94-
.metadata;
84+
AccountZeroCopy::<StateMerkleTreeAccount>::new(rpc, state_tree_bundle.merkle_tree)
85+
.await
86+
.deserialized()
87+
.metadata;
9588
let address_tree = get_concurrent_merkle_tree::<StateMerkleTreeAccount, R, Poseidon, 26>(
9689
rpc,
9790
state_tree_bundle.merkle_tree,
@@ -121,9 +114,7 @@ pub async fn address_tree_ready_for_rollover<R: RpcConnection>(
121114
rpc: &mut R,
122115
merkle_tree: Pubkey,
123116
) -> bool {
124-
let account =
125-
AccountZeroCopy::<account_compression::AddressMerkleTreeAccount>::new(rpc, merkle_tree)
126-
.await;
117+
let account = AccountZeroCopy::<AddressMerkleTreeAccount>::new(rpc, merkle_tree).await;
127118
let rent_exemption = rpc
128119
.get_minimum_balance_for_rent_exemption(account.account.data.len())
129120
.await

forester-utils/src/error.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use thiserror::Error;
2+
3+
#[derive(Error, Debug)]
4+
pub enum ForesterUtilsError {
5+
#[error("parse error: {0:?}")]
6+
Parse(String),
7+
#[error("prover error: {0:?}")]
8+
Prover(String),
9+
#[error("rpc error: {0:?}")]
10+
Rpc(String),
11+
#[error("indexer error: {0:?}")]
12+
Indexer(String),
13+
}

0 commit comments

Comments
 (0)