Skip to content

Commit edf1954

Browse files
authored
Faster dedup v1.8 (solana-labs#22619)
* Faster dedup
1 parent 59eee75 commit edf1954

File tree

6 files changed

+197
-93
lines changed

6 files changed

+197
-93
lines changed

Cargo.lock

Lines changed: 19 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/sigverify_stage.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77
88
use {
99
crate::sigverify,
10-
core::time::Duration,
1110
crossbeam_channel::{SendError, Sender as CrossbeamSender},
1211
itertools::Itertools,
13-
solana_bloom::bloom::{AtomicBloom, Bloom},
1412
solana_measure::measure::Measure,
1513
solana_perf::packet::PacketBatch,
16-
solana_perf::sigverify::dedup_packets,
14+
solana_perf::sigverify::Deduper,
1715
solana_sdk::timing,
1816
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
1917
std::{
@@ -215,7 +213,7 @@ impl SigVerifyStage {
215213
}
216214

217215
fn verifier<T: SigVerifier>(
218-
bloom: &AtomicBloom<&[u8]>,
216+
deduper: &Deduper,
219217
recvr: &PacketBatchReceiver,
220218
sendr: &CrossbeamSender<Vec<PacketBatch>>,
221219
verifier: &T,
@@ -231,15 +229,15 @@ impl SigVerifyStage {
231229
);
232230

233231
let mut dedup_time = Measure::start("sigverify_dedup_time");
234-
let dedup_fail = dedup_packets(bloom, &mut batches) as usize;
232+
let dedup_fail = deduper.dedup_packets(&mut batches) as usize;
235233
dedup_time.stop();
236-
let valid_packets = num_packets.saturating_sub(dedup_fail);
234+
let num_unique = num_packets.saturating_sub(dedup_fail);
237235

238236
let mut discard_time = Measure::start("sigverify_discard_time");
239-
if valid_packets > MAX_SIGVERIFY_BATCH {
237+
if num_unique > MAX_SIGVERIFY_BATCH {
240238
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH)
241239
};
242-
let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH);
240+
let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH);
243241
discard_time.stop();
244242

245243
let mut verify_batch_time = Measure::start("sigverify_batch_time");
@@ -298,25 +296,16 @@ impl SigVerifyStage {
298296
let verifier = verifier.clone();
299297
let mut stats = SigVerifierStats::default();
300298
let mut last_print = Instant::now();
301-
const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000);
302-
const MAX_BLOOM_ITEMS: usize = 1_000_000;
303-
const MAX_BLOOM_FAIL: f64 = 0.0001;
304-
const MAX_BLOOM_BITS: usize = 8 << 22;
299+
const MAX_DEDUPER_AGE_MS: u64 = 2_000;
300+
const MAX_DEDUPER_ITEMS: u32 = 1_000_000;
305301
Builder::new()
306302
.name("solana-verifier".to_string())
307303
.spawn(move || {
308-
let mut bloom =
309-
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
310-
let mut bloom_age = Instant::now();
304+
let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE_MS);
311305
loop {
312-
let now = Instant::now();
313-
if now.duration_since(bloom_age) > MAX_BLOOM_AGE {
314-
bloom =
315-
Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into();
316-
bloom_age = now;
317-
}
306+
deduper.reset();
318307
if let Err(e) = Self::verifier(
319-
&bloom,
308+
&deduper,
320309
&packet_receiver,
321310
&verified_sender,
322311
&verifier,

perf/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ edition = "2018"
1212
[dependencies]
1313
bincode = "1.3.1"
1414
curve25519-dalek = { version = "2" }
15+
ahash = "0.7.6"
1516
dlopen = "0.1.8"
1617
dlopen_derive = "0.1.4"
1718
lazy_static = "1.4.0"

perf/benches/dedup.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ extern crate test;
55

66
use {
77
rand::prelude::*,
8-
solana_bloom::bloom::{AtomicBloom, Bloom},
98
solana_perf::{
109
packet::{to_packet_batches, PacketBatch},
1110
sigverify,
1211
},
1312
test::Bencher,
1413
};
1514

15+
const NUM: usize = 4096;
16+
1617
fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
1718
// subtract 8 bytes because the length will get serialized as well
1819
(0..size.checked_sub(8).unwrap())
@@ -22,19 +23,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
2223

2324
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
2425
// verify packets
25-
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
26+
let mut deduper = sigverify::Deduper::new(1_000_000, 2_000);
2627
bencher.iter(|| {
27-
// bench
28-
sigverify::dedup_packets(&bloom, &mut batches);
29-
30-
// reset
31-
bloom.clear_for_tests();
32-
batches.iter_mut().for_each(|batch| {
33-
batch.packets.iter_mut().for_each(|p| {
34-
p.meta.discard = false;
35-
})
36-
});
37-
})
28+
let _ans = deduper.dedup_packets(&mut batches);
29+
deduper.reset();
30+
batches
31+
.iter_mut()
32+
.for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.discard = false));
33+
});
3834
}
3935

4036
#[bench]
@@ -45,7 +41,7 @@ fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
4541

4642
let batches = to_packet_batches(
4743
&std::iter::repeat(small_packet)
48-
.take(4096)
44+
.take(NUM)
4945
.collect::<Vec<_>>(),
5046
128,
5147
);
@@ -60,7 +56,7 @@ fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
6056
let big_packet = test_packet_with_size(1024, &mut rng);
6157

6258
let batches = to_packet_batches(
63-
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
59+
&std::iter::repeat(big_packet).take(NUM).collect::<Vec<_>>(),
6460
128,
6561
);
6662

@@ -73,7 +69,7 @@ fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
7369
let mut rng = rand::thread_rng();
7470

7571
let batches = to_packet_batches(
76-
&(0..4096)
72+
&(0..NUM)
7773
.map(|_| test_packet_with_size(128, &mut rng))
7874
.collect::<Vec<_>>(),
7975
128,
@@ -88,11 +84,35 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
8884
let mut rng = rand::thread_rng();
8985

9086
let batches = to_packet_batches(
91-
&(0..4096)
87+
&(0..NUM)
9288
.map(|_| test_packet_with_size(1024, &mut rng))
9389
.collect::<Vec<_>>(),
9490
128,
9591
);
9692

9793
do_bench_dedup_packets(bencher, batches);
9894
}
95+
96+
#[bench]
97+
#[ignore]
98+
fn bench_dedup_baseline(bencher: &mut Bencher) {
99+
let mut rng = rand::thread_rng();
100+
101+
let batches = to_packet_batches(
102+
&(0..0)
103+
.map(|_| test_packet_with_size(128, &mut rng))
104+
.collect::<Vec<_>>(),
105+
128,
106+
);
107+
108+
do_bench_dedup_packets(bencher, batches);
109+
}
110+
111+
#[bench]
112+
#[ignore]
113+
fn bench_dedup_reset(bencher: &mut Bencher) {
114+
let mut deduper = sigverify::Deduper::new(1_000_000, 0);
115+
bencher.iter(|| {
116+
deduper.reset();
117+
});
118+
}

0 commit comments

Comments
 (0)