@@ -500,35 +500,46 @@ pub struct Deduper<const K: usize> {
500
500
bits : Vec < AtomicU64 > ,
501
501
seeds : [ ( u128 , u128 ) ; K ] ,
502
502
clock : Instant ,
503
- // Maximum number of one bits before the false positive
504
- // rate exceeds the specified threshold.
505
- capacity : u64 ,
506
503
popcount : AtomicU64 , // Number of one bits in self.bits.
507
504
}
508
505
509
506
impl < const K : usize > Deduper < K > {
510
- pub fn new < R : Rng > ( rng : & mut R , false_positive_rate : f64 , num_bits : u64 ) -> Self {
511
- assert ! ( 0.0 < false_positive_rate && false_positive_rate < 1.0 ) ;
512
- let size = usize:: try_from ( num_bits. checked_add ( 63 ) . unwrap ( ) / 64 ) . unwrap ( ) ;
513
- let capacity = num_bits as f64 * false_positive_rate. powf ( 1f64 / K as f64 ) ;
507
+ pub fn new < R : Rng > ( rng : & mut R , num_bits : u64 ) -> Self {
508
+ let size = num_bits. checked_add ( 63 ) . unwrap ( ) / 64 ;
509
+ let size = usize:: try_from ( size) . unwrap ( ) ;
514
510
Self {
515
511
num_bits,
516
512
seeds : [ ( ) ; K ] . map ( |_| rng. gen ( ) ) ,
517
513
clock : Instant :: now ( ) ,
518
514
bits : repeat_with ( AtomicU64 :: default) . take ( size) . collect ( ) ,
519
- capacity : capacity as u64 ,
520
515
popcount : AtomicU64 :: default ( ) ,
521
516
}
522
517
}
523
518
524
- pub fn maybe_reset < R : Rng > ( & mut self , rng : & mut R , reset_cycle : & Duration ) {
519
+ fn false_positive_rate ( & self ) -> f64 {
525
520
let popcount = self . popcount . load ( Ordering :: Relaxed ) ;
526
- if popcount >= self . capacity || & self . clock . elapsed ( ) >= reset_cycle {
521
+ let ones_ratio = popcount. min ( self . num_bits ) as f64 / self . num_bits as f64 ;
522
+ ones_ratio. powi ( K as i32 )
523
+ }
524
+
525
+ /// Resets the Deduper if either it is older than the reset_cycle or it is
526
+ /// saturated enough that false positive rate exceeds specified threshold.
527
+ /// Returns true if the deduper was saturated.
528
+ pub fn maybe_reset < R : Rng > (
529
+ & mut self ,
530
+ rng : & mut R ,
531
+ false_positive_rate : f64 ,
532
+ reset_cycle : Duration ,
533
+ ) -> bool {
534
+ assert ! ( 0.0 < false_positive_rate && false_positive_rate < 1.0 ) ;
535
+ let saturated = self . false_positive_rate ( ) >= false_positive_rate;
536
+ if saturated || self . clock . elapsed ( ) >= reset_cycle {
527
537
self . seeds = [ ( ) ; K ] . map ( |_| rng. gen ( ) ) ;
528
538
self . clock = Instant :: now ( ) ;
529
539
self . bits . fill_with ( AtomicU64 :: default) ;
530
540
self . popcount = AtomicU64 :: default ( ) ;
531
541
}
542
+ saturated
532
543
}
533
544
534
545
// Returns true if the packet is duplicate.
@@ -1595,9 +1606,7 @@ mod tests {
1595
1606
to_packet_batches ( & std:: iter:: repeat ( tx) . take ( 1024 ) . collect :: < Vec < _ > > ( ) , 128 ) ;
1596
1607
let packet_count = sigverify:: count_packets_in_batches ( & batches) ;
1597
1608
let mut rng = rand:: thread_rng ( ) ;
1598
- let filter = Deduper :: < 2 > :: new (
1599
- & mut rng, /*false_positive_rate:*/ 0.001 , /*num_bits:*/ 63_999_979 ,
1600
- ) ;
1609
+ let filter = Deduper :: < 2 > :: new ( & mut rng, /*num_bits:*/ 63_999_979 ) ;
1601
1610
let mut num_deduped = 0 ;
1602
1611
let discard = filter. dedup_packets_and_count_discards (
1603
1612
& mut batches,
@@ -1612,46 +1621,57 @@ mod tests {
1612
1621
#[ test]
1613
1622
fn test_dedup_diff ( ) {
1614
1623
let mut rng = rand:: thread_rng ( ) ;
1615
- let mut filter = Deduper :: < 2 > :: new (
1616
- & mut rng, /*false_positive_rate:*/ 0.001 , /*num_bits:*/ 63_999_979 ,
1617
- ) ;
1624
+ let mut filter = Deduper :: < 2 > :: new ( & mut rng, /*num_bits:*/ 63_999_979 ) ;
1618
1625
let mut batches = to_packet_batches ( & ( 0 ..1024 ) . map ( |_| test_tx ( ) ) . collect :: < Vec < _ > > ( ) , 128 ) ;
1619
1626
let discard = filter. dedup_packets_and_count_discards ( & mut batches, |_, _, _| ( ) ) as usize ;
1620
1627
// because dedup uses a threadpool, there maybe up to N threads of txs that go through
1621
1628
assert_eq ! ( discard, 0 ) ;
1622
- filter. maybe_reset ( & mut rng, /*reset_cycle:*/ & Duration :: from_millis ( 0 ) ) ;
1629
+ assert ! ( !filter. maybe_reset(
1630
+ & mut rng,
1631
+ 0.001 , // false_positive_rate
1632
+ Duration :: from_millis( 0 ) , // reset_cycle
1633
+ ) ) ;
1623
1634
for i in filter. bits {
1624
1635
assert_eq ! ( i. load( Ordering :: Relaxed ) , 0 ) ;
1625
1636
}
1626
1637
}
1627
1638
1639
+ fn get_capacity < const K : usize > ( num_bits : u64 , false_positive_rate : f64 ) -> u64 {
1640
+ ( num_bits as f64 * false_positive_rate. powf ( 1f64 / K as f64 ) ) as u64
1641
+ }
1642
+
1628
1643
#[ test]
1629
1644
#[ ignore]
1630
1645
fn test_dedup_saturated ( ) {
1646
+ const NUM_BITS : u64 = 63_999_979 ;
1647
+ const FALSE_POSITIVE_RATE : f64 = 0.001 ;
1631
1648
let mut rng = rand:: thread_rng ( ) ;
1632
- let filter = Deduper :: < 2 > :: new (
1633
- & mut rng, /*false_positive_rate:*/ 0.001 , /*num_bits:*/ 63_999_979 ,
1634
- ) ;
1649
+ let mut filter = Deduper :: < 2 > :: new ( & mut rng, NUM_BITS ) ;
1650
+ let capacity = get_capacity :: < 2 > ( NUM_BITS , FALSE_POSITIVE_RATE ) ;
1635
1651
let mut discard = 0 ;
1636
- assert ! ( filter. popcount. load( Ordering :: Relaxed ) < filter . capacity) ;
1652
+ assert ! ( filter. popcount. load( Ordering :: Relaxed ) < capacity) ;
1637
1653
for i in 0 ..1000 {
1638
1654
let mut batches =
1639
1655
to_packet_batches ( & ( 0 ..1000 ) . map ( |_| test_tx ( ) ) . collect :: < Vec < _ > > ( ) , 128 ) ;
1640
1656
discard += filter. dedup_packets_and_count_discards ( & mut batches, |_, _, _| ( ) ) as usize ;
1641
1657
trace ! ( "{} {}" , i, discard) ;
1642
- if filter. popcount . load ( Ordering :: Relaxed ) >= filter . capacity {
1658
+ if filter. popcount . load ( Ordering :: Relaxed ) > capacity {
1643
1659
break ;
1644
1660
}
1645
1661
}
1646
- assert ! ( filter. popcount. load( Ordering :: Relaxed ) >= filter. capacity) ;
1662
+ assert ! ( filter. popcount. load( Ordering :: Relaxed ) > capacity) ;
1663
+ assert ! ( filter. false_positive_rate( ) >= FALSE_POSITIVE_RATE ) ;
1664
+ assert ! ( filter. maybe_reset(
1665
+ & mut rng,
1666
+ FALSE_POSITIVE_RATE ,
1667
+ Duration :: from_millis( 0 ) , // reset_cycle
1668
+ ) ) ;
1647
1669
}
1648
1670
1649
1671
#[ test]
1650
1672
fn test_dedup_false_positive ( ) {
1651
1673
let mut rng = rand:: thread_rng ( ) ;
1652
- let filter = Deduper :: < 2 > :: new (
1653
- & mut rng, /*false_positive_rate:*/ 0.001 , /*num_bits:*/ 63_999_979 ,
1654
- ) ;
1674
+ let filter = Deduper :: < 2 > :: new ( & mut rng, /*num_bits:*/ 63_999_979 ) ;
1655
1675
let mut discard = 0 ;
1656
1676
for i in 0 ..10 {
1657
1677
let mut batches =
@@ -1676,8 +1696,18 @@ mod tests {
1676
1696
#[ test_case( 637_534_199 , 0.0001 , 6_375_341 ) ]
1677
1697
fn test_dedup_capacity ( num_bits : u64 , false_positive_rate : f64 , capacity : u64 ) {
1678
1698
let mut rng = rand:: thread_rng ( ) ;
1679
- let deduper = Deduper :: < 2 > :: new ( & mut rng, false_positive_rate, num_bits) ;
1680
- assert_eq ! ( deduper. capacity, capacity) ;
1699
+ assert_eq ! ( get_capacity:: <2 >( num_bits, false_positive_rate) , capacity) ;
1700
+ let mut deduper = Deduper :: < 2 > :: new ( & mut rng, num_bits) ;
1701
+ assert_eq ! ( deduper. false_positive_rate( ) , 0.0 ) ;
1702
+ deduper. popcount . store ( capacity, Ordering :: Relaxed ) ;
1703
+ assert ! ( deduper. false_positive_rate( ) < false_positive_rate) ;
1704
+ deduper. popcount . store ( capacity + 1 , Ordering :: Relaxed ) ;
1705
+ assert ! ( deduper. false_positive_rate( ) >= false_positive_rate) ;
1706
+ assert ! ( deduper. maybe_reset(
1707
+ & mut rng,
1708
+ false_positive_rate,
1709
+ Duration :: from_millis( 0 ) , // reset_cycle
1710
+ ) ) ;
1681
1711
}
1682
1712
1683
1713
#[ test_case( [ 0xf9 ; 32 ] , 3_199_997 , 101_192 , 51_414 , 70 , 101_125 ) ]
@@ -1694,9 +1724,10 @@ mod tests {
1694
1724
num_dups : usize ,
1695
1725
popcount : u64 ,
1696
1726
) {
1727
+ const FALSE_POSITIVE_RATE : f64 = 0.001 ;
1697
1728
let mut rng = ChaChaRng :: from_seed ( seed) ;
1698
- let deduper = Deduper :: < 2 > :: new ( & mut rng, /*false_positive_rate:*/ 0.001 , num_bits) ;
1699
- assert_eq ! ( deduper . capacity , capacity) ;
1729
+ let mut deduper = Deduper :: < 2 > :: new ( & mut rng, num_bits) ;
1730
+ assert_eq ! ( get_capacity :: < 2 > ( num_bits , FALSE_POSITIVE_RATE ) , capacity) ;
1700
1731
let mut packet = Packet :: new ( [ 0u8 ; PACKET_DATA_SIZE ] , Meta :: default ( ) ) ;
1701
1732
let mut dup_count = 0usize ;
1702
1733
for _ in 0 ..num_packets {
@@ -1710,6 +1741,12 @@ mod tests {
1710
1741
}
1711
1742
assert_eq ! ( dup_count, num_dups) ;
1712
1743
assert_eq ! ( deduper. popcount. load( Ordering :: Relaxed ) , popcount) ;
1744
+ assert ! ( deduper. false_positive_rate( ) < FALSE_POSITIVE_RATE ) ;
1745
+ assert ! ( !deduper. maybe_reset(
1746
+ & mut rng,
1747
+ FALSE_POSITIVE_RATE ,
1748
+ Duration :: from_millis( 0 ) , // reset_cycle
1749
+ ) ) ;
1713
1750
}
1714
1751
1715
1752
#[ test]
0 commit comments