Skip to content

Commit 5fdbf28

Browse files
v1.14: generalizes deduper to work with any hashable type (backport of #30753) (#30856)
* generalizes deduper to work with any hashable type (#30753) generalizes Deduper to work with any hashable type Current Deduper is hard-coded only for Packet type. In order to use Deduper in retransmit-stage, we need to dedup types other than Packet. The commit generalizes Deduper to any hashable type. (cherry picked from commit 9ad7748) # Conflicts: # core/src/sigverify_stage.rs # perf/benches/dedup.rs # perf/src/deduper.rs * resolves mergify merge conflicts --------- Co-authored-by: behzad nouri <behzadnouri@gmail.com>
1 parent 6f28fcf commit 5fdbf28

File tree

4 files changed

+71
-56
lines changed

4 files changed

+71
-56
lines changed

core/src/shred_fetch_stage.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl ShredFetchStage {
4848
) {
4949
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
5050
let mut rng = rand::thread_rng();
51-
let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
51+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
5252
let mut last_updated = Instant::now();
5353
let mut keypair = repair_context
5454
.as_ref()
@@ -252,7 +252,7 @@ fn should_discard_packet<const K: usize>(
252252
root: Slot,
253253
max_slot: Slot, // Max slot to ingest shreds for.
254254
shred_version: u16,
255-
deduper: &Deduper<K>,
255+
deduper: &Deduper<K, [u8]>,
256256
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
257257
stats: &mut ShredFetchStats,
258258
) -> bool {
@@ -266,7 +266,11 @@ fn should_discard_packet<const K: usize>(
266266
) {
267267
return true;
268268
}
269-
if deduper.dedup_packet(packet) {
269+
if packet
270+
.data(..)
271+
.map(|data| deduper.dedup(data))
272+
.unwrap_or(true)
273+
{
270274
stats.duplicate_shred += 1;
271275
true
272276
} else {
@@ -301,7 +305,7 @@ mod tests {
301305
fn test_data_code_same_index() {
302306
solana_logger::setup();
303307
let mut rng = rand::thread_rng();
304-
let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
308+
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
305309
let mut packet = Packet::default();
306310
let mut stats = ShredFetchStats::default();
307311

@@ -353,7 +357,7 @@ mod tests {
353357
fn test_shred_filter() {
354358
solana_logger::setup();
355359
let mut rng = rand::thread_rng();
356-
let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
360+
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
357361
let mut packet = Packet::default();
358362
let mut stats = ShredFetchStats::default();
359363
let last_root = 0;

core/src/sigverify_stage.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use {
1414
solana_perf::{
1515
packet::{Packet, PacketBatch},
1616
sigverify::{
17-
count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches,
18-
Deduper,
17+
count_discarded_packets, count_packets_in_batches, count_valid_packets,
18+
dedup_packets_and_count_discards, shrink_batches, Deduper,
1919
},
2020
},
2121
solana_sdk::timing,
@@ -291,8 +291,8 @@ impl SigVerifyStage {
291291
(shrink_time.as_us(), shrink_total)
292292
}
293293

294-
fn verifier<T: SigVerifier>(
295-
deduper: &Deduper<2>,
294+
fn verifier<const K: usize, T: SigVerifier>(
295+
deduper: &Deduper<K, [u8]>,
296296
recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
297297
verifier: &mut T,
298298
stats: &mut SigVerifierStats,
@@ -316,7 +316,8 @@ impl SigVerifyStage {
316316
discard_random_time.stop();
317317

318318
let mut dedup_time = Measure::start("sigverify_dedup_time");
319-
let discard_or_dedup_fail = deduper.dedup_packets_and_count_discards(
319+
let discard_or_dedup_fail = dedup_packets_and_count_discards(
320+
deduper,
320321
&mut batches,
321322
#[inline(always)]
322323
|received_packet, removed_before_sigverify_stage, is_dup| {
@@ -418,7 +419,7 @@ impl SigVerifyStage {
418419
.name("solSigVerifier".to_string())
419420
.spawn(move || {
420421
let mut rng = rand::thread_rng();
421-
let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
422+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
422423
loop {
423424
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, MAX_DEDUPER_AGE) {
424425
stats.num_deduper_saturations += 1;

perf/benches/dedup.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {
77
rand::prelude::*,
88
solana_perf::{
99
packet::{to_packet_batches, PacketBatch},
10-
sigverify::Deduper,
10+
sigverify::{self, Deduper},
1111
},
1212
std::time::Duration,
1313
test::Bencher,
@@ -25,9 +25,10 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
2525
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
2626
// verify packets
2727
let mut rng = rand::thread_rng();
28-
let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
28+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979);
2929
bencher.iter(|| {
30-
let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ());
30+
let _ans =
31+
sigverify::dedup_packets_and_count_discards(&deduper, &mut batches, |_, _, _| ());
3132
deduper.maybe_reset(
3233
&mut rng,
3334
0.001, // false_positive_rate
@@ -118,7 +119,7 @@ fn bench_dedup_baseline(bencher: &mut Bencher) {
118119
#[ignore]
119120
fn bench_dedup_reset(bencher: &mut Bencher) {
120121
let mut rng = rand::thread_rng();
121-
let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
122+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979);
122123
bencher.iter(|| {
123124
deduper.maybe_reset(
124125
&mut rng,

perf/src/sigverify.rs

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ use {
2424
},
2525
std::{
2626
convert::TryFrom,
27-
hash::{Hash as _, Hasher},
27+
hash::Hasher,
2828
iter::repeat_with,
29+
marker::PhantomData,
2930
mem::size_of,
3031
sync::atomic::{AtomicU64, Ordering},
3132
time::{Duration, Instant},
@@ -495,15 +496,16 @@ pub fn generate_offsets(
495496
)
496497
}
497498

498-
pub struct Deduper<const K: usize> {
499+
pub struct Deduper<const K: usize, T: ?Sized> {
499500
num_bits: u64,
500501
bits: Vec<AtomicU64>,
501502
seeds: [(u128, u128); K],
502503
clock: Instant,
503504
popcount: AtomicU64, // Number of one bits in self.bits.
505+
_phantom: PhantomData<T>,
504506
}
505507

506-
impl<const K: usize> Deduper<K> {
508+
impl<const K: usize, T: ?Sized + std::hash::Hash> Deduper<K, T> {
507509
pub fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
508510
let size = num_bits.checked_add(63).unwrap() / 64;
509511
let size = usize::try_from(size).unwrap();
@@ -513,6 +515,7 @@ impl<const K: usize> Deduper<K> {
513515
clock: Instant::now(),
514516
bits: repeat_with(AtomicU64::default).take(size).collect(),
515517
popcount: AtomicU64::default(),
518+
_phantom: PhantomData::<T>::default(),
516519
}
517520
}
518521

@@ -542,16 +545,14 @@ impl<const K: usize> Deduper<K> {
542545
saturated
543546
}
544547

545-
// Returns true if the packet is duplicate.
548+
// Returns true if the data is duplicate.
546549
#[must_use]
547550
#[allow(clippy::integer_arithmetic)]
548-
pub fn dedup_packet(&self, packet: &Packet) -> bool {
549-
// Should not dedup packet if already discarded.
550-
debug_assert!(!packet.meta.discard());
551+
pub fn dedup(&self, data: &T) -> bool {
551552
let mut out = true;
552553
for seed in self.seeds {
553554
let mut hasher = AHasher::new_with_keys(seed.0, seed.1);
554-
packet.data(..).unwrap_or_default().hash(&mut hasher);
555+
data.hash(&mut hasher);
555556
let hash: u64 = hasher.finish() % self.num_bits;
556557
let index = (hash >> 6) as usize;
557558
let mask: u64 = 1u64 << (hash & 63);
@@ -563,28 +564,32 @@ impl<const K: usize> Deduper<K> {
563564
}
564565
out
565566
}
567+
}
566568

567-
pub fn dedup_packets_and_count_discards(
568-
&self,
569-
batches: &mut [PacketBatch],
570-
mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
571-
) -> u64 {
572-
batches
573-
.iter_mut()
574-
.flat_map(PacketBatch::iter_mut)
575-
.map(|packet| {
576-
if packet.meta.discard() {
577-
process_received_packet(packet, true, false);
578-
} else if self.dedup_packet(packet) {
579-
packet.meta.set_discard(true);
580-
process_received_packet(packet, false, true);
581-
} else {
582-
process_received_packet(packet, false, false);
583-
}
584-
u64::from(packet.meta.discard())
585-
})
586-
.sum()
587-
}
569+
pub fn dedup_packets_and_count_discards<const K: usize>(
570+
deduper: &Deduper<K, [u8]>,
571+
batches: &mut [PacketBatch],
572+
mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
573+
) -> u64 {
574+
batches
575+
.iter_mut()
576+
.flat_map(PacketBatch::iter_mut)
577+
.map(|packet| {
578+
if packet.meta.discard() {
579+
process_received_packet(packet, true, false);
580+
} else if packet
581+
.data(..)
582+
.map(|data| deduper.dedup(data))
583+
.unwrap_or(true)
584+
{
585+
packet.meta.set_discard(true);
586+
process_received_packet(packet, false, true);
587+
} else {
588+
process_received_packet(packet, false, false);
589+
}
590+
u64::from(packet.meta.discard())
591+
})
592+
.sum()
588593
}
589594

590595
//inplace shrink a batch of packets
@@ -1606,9 +1611,10 @@ mod tests {
16061611
to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
16071612
let packet_count = sigverify::count_packets_in_batches(&batches);
16081613
let mut rng = rand::thread_rng();
1609-
let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
1614+
let filter = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979);
16101615
let mut num_deduped = 0;
1611-
let discard = filter.dedup_packets_and_count_discards(
1616+
let discard = dedup_packets_and_count_discards(
1617+
&filter,
16121618
&mut batches,
16131619
|_deduped_packet, _removed_before_sigverify_stage, _is_dup| {
16141620
num_deduped += 1;
@@ -1621,9 +1627,10 @@ mod tests {
16211627
#[test]
16221628
fn test_dedup_diff() {
16231629
let mut rng = rand::thread_rng();
1624-
let mut filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
1630+
let mut filter = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979);
16251631
let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
1626-
let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
1632+
let discard =
1633+
dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
16271634
// because dedup uses a threadpool, there maybe up to N threads of txs that go through
16281635
assert_eq!(discard, 0);
16291636
assert!(!filter.maybe_reset(
@@ -1646,14 +1653,15 @@ mod tests {
16461653
const NUM_BITS: u64 = 63_999_979;
16471654
const FALSE_POSITIVE_RATE: f64 = 0.001;
16481655
let mut rng = rand::thread_rng();
1649-
let mut filter = Deduper::<2>::new(&mut rng, NUM_BITS);
1656+
let mut filter = Deduper::<2, [u8]>::new(&mut rng, NUM_BITS);
16501657
let capacity = get_capacity::<2>(NUM_BITS, FALSE_POSITIVE_RATE);
16511658
let mut discard = 0;
16521659
assert!(filter.popcount.load(Ordering::Relaxed) < capacity);
16531660
for i in 0..1000 {
16541661
let mut batches =
16551662
to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
1656-
discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
1663+
discard +=
1664+
dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
16571665
trace!("{} {}", i, discard);
16581666
if filter.popcount.load(Ordering::Relaxed) > capacity {
16591667
break;
@@ -1671,12 +1679,13 @@ mod tests {
16711679
#[test]
16721680
fn test_dedup_false_positive() {
16731681
let mut rng = rand::thread_rng();
1674-
let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
1682+
let filter = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979);
16751683
let mut discard = 0;
16761684
for i in 0..10 {
16771685
let mut batches =
16781686
to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
1679-
discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
1687+
discard +=
1688+
dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
16801689
debug!("false positive rate: {}/{}", discard, i * 1024);
16811690
}
16821691
//allow for 1 false positive even if extremely unlikely
@@ -1697,7 +1706,7 @@ mod tests {
16971706
fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) {
16981707
let mut rng = rand::thread_rng();
16991708
assert_eq!(get_capacity::<2>(num_bits, false_positive_rate), capacity);
1700-
let mut deduper = Deduper::<2>::new(&mut rng, num_bits);
1709+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
17011710
assert_eq!(deduper.false_positive_rate(), 0.0);
17021711
deduper.popcount.store(capacity, Ordering::Relaxed);
17031712
assert!(deduper.false_positive_rate() < false_positive_rate);
@@ -1726,18 +1735,18 @@ mod tests {
17261735
) {
17271736
const FALSE_POSITIVE_RATE: f64 = 0.001;
17281737
let mut rng = ChaChaRng::from_seed(seed);
1729-
let mut deduper = Deduper::<2>::new(&mut rng, num_bits);
1738+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
17301739
assert_eq!(get_capacity::<2>(num_bits, FALSE_POSITIVE_RATE), capacity);
17311740
let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default());
17321741
let mut dup_count = 0usize;
17331742
for _ in 0..num_packets {
17341743
let size = rng.gen_range(0, PACKET_DATA_SIZE);
17351744
packet.meta.size = size;
17361745
rng.fill(&mut packet.buffer_mut()[0..size]);
1737-
if deduper.dedup_packet(&packet) {
1746+
if deduper.dedup(packet.data(..).unwrap()) {
17381747
dup_count += 1;
17391748
}
1740-
assert!(deduper.dedup_packet(&packet));
1749+
assert!(deduper.dedup(packet.data(..).unwrap()));
17411750
}
17421751
assert_eq!(dup_count, num_dups);
17431752
assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount);

0 commit comments

Comments
 (0)