Skip to content

Commit 8c5f685

Browse files
v1.14: increases retransmit-stage deduper capacity and reset-cycle (backport of #30758) (#30860)
* increases retransmit-stage deduper capacity and reset-cycle (#30758) For duplicate block detection, for each (slot, shred-index, shred-type) we need to allow 2 different shreds to be retransmitted. The commit implements this using two bloom-filter dedupers: * Shreds are deduplicated using the 1st deduper. * If a shred is not a duplicate, then we check if: (slot, shred-index, shred-type, k) is not a duplicate for either k = 0 or k = 1 using the 2nd deduper, and if so then the shred is retransmitted. This allows to achieve larger capactiy compared to current LRU-cache. (cherry picked from commit 5d9aba5) # Conflicts: # core/src/lib.rs # core/src/retransmit_stage.rs * resolves mergify merge conflicts --------- Co-authored-by: behzad nouri <behzadnouri@gmail.com>
1 parent 5fdbf28 commit 8c5f685

File tree

5 files changed

+60
-167
lines changed

5 files changed

+60
-167
lines changed

core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ pub mod ledger_metric_report_service;
3838
pub mod multi_iterator_scanner;
3939
pub mod optimistic_confirmation_verifier;
4040
pub mod outstanding_requests;
41-
mod packet_hasher;
4241
pub mod packet_threshold;
4342
pub mod poh_timing_report_service;
4443
pub mod poh_timing_reporter;

core/src/packet_hasher.rs

Lines changed: 0 additions & 39 deletions
This file was deleted.

core/src/retransmit_stage.rs

Lines changed: 58 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
#![allow(clippy::rc_buffer)]
33

44
use {
5-
crate::{
6-
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
7-
packet_hasher::PacketHasher,
8-
},
5+
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
96
crossbeam_channel::{Receiver, RecvTimeoutError},
107
itertools::{izip, Itertools},
118
lru::LruCache,
9+
rand::Rng,
1210
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
1311
solana_client::rpc_response::SlotUpdate,
1412
solana_gossip::{
@@ -19,6 +17,7 @@ use {
1917
shred::{self, ShredId},
2018
},
2119
solana_measure::measure::Measure,
20+
solana_perf::sigverify::Deduper,
2221
solana_rayon_threadlimit::get_thread_count,
2322
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
2423
solana_runtime::{bank::Bank, bank_forks::BankForks},
@@ -42,7 +41,9 @@ use {
4241
};
4342

4443
const MAX_DUPLICATE_COUNT: usize = 2;
45-
const DEFAULT_LRU_SIZE: usize = 1 << 20;
44+
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
45+
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
46+
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
4647
// Minimum number of shreds to use rayon parallel iterators.
4748
const PAR_ITER_MIN_NUM_SHREDS: usize = 2;
4849

@@ -131,45 +132,36 @@ impl RetransmitStats {
131132
}
132133
}
133134

134-
// Map of shred (slot, index, type) => list of hash values seen for that key.
135-
type ShredFilter = LruCache<ShredId, Vec<u64>>;
135+
struct ShredDeduper<const K: usize> {
136+
deduper: Deduper<K, /*shred:*/ [u8]>,
137+
shred_id_filter: Deduper<K, (ShredId, /*0..MAX_DUPLICATE_COUNT:*/ usize)>,
138+
}
136139

137-
// Returns true if shred is already received and should skip retransmit.
138-
fn should_skip_retransmit(
139-
key: ShredId,
140-
shred: &[u8],
141-
shreds_received: &mut ShredFilter,
142-
packet_hasher: &PacketHasher,
143-
) -> bool {
144-
match shreds_received.get_mut(&key) {
145-
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
146-
Some(sent) => {
147-
let hash = packet_hasher.hash_shred(shred);
148-
if sent.contains(&hash) {
149-
true
150-
} else {
151-
sent.push(hash);
152-
false
153-
}
154-
}
155-
None => {
156-
let hash = packet_hasher.hash_shred(shred);
157-
shreds_received.put(key, vec![hash]);
158-
false
140+
impl<const K: usize> ShredDeduper<K> {
141+
fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
142+
Self {
143+
deduper: Deduper::new(rng, num_bits),
144+
shred_id_filter: Deduper::new(rng, num_bits),
159145
}
160146
}
161-
}
162147

163-
fn maybe_reset_shreds_received_cache(
164-
shreds_received: &mut ShredFilter,
165-
packet_hasher: &mut PacketHasher,
166-
hasher_reset_ts: &mut Instant,
167-
) {
168-
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
169-
if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
170-
*hasher_reset_ts = Instant::now();
171-
shreds_received.clear();
172-
packet_hasher.reset();
148+
fn maybe_reset<R: Rng>(
149+
&mut self,
150+
rng: &mut R,
151+
false_positive_rate: f64,
152+
reset_cycle: Duration,
153+
) {
154+
self.deduper
155+
.maybe_reset(rng, false_positive_rate, reset_cycle);
156+
self.shred_id_filter
157+
.maybe_reset(rng, false_positive_rate, reset_cycle);
158+
}
159+
160+
fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool {
161+
// In order to detect duplicate blocks across cluster, we retransmit
162+
// max_duplicate_count different shreds for each ShredId.
163+
self.deduper.dedup(shred)
164+
|| (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i)))
173165
}
174166
}
175167

@@ -183,9 +175,7 @@ fn retransmit(
183175
sockets: &[UdpSocket],
184176
stats: &mut RetransmitStats,
185177
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
186-
hasher_reset_ts: &mut Instant,
187-
shreds_received: &mut ShredFilter,
188-
packet_hasher: &mut PacketHasher,
178+
shred_deduper: &mut ShredDeduper<2>,
189179
max_slots: &MaxSlots,
190180
rpc_subscriptions: Option<&RpcSubscriptions>,
191181
) -> Result<(), RecvTimeoutError> {
@@ -205,15 +195,19 @@ fn retransmit(
205195
stats.epoch_fetch += epoch_fetch.as_us();
206196

207197
let mut epoch_cache_update = Measure::start("retransmit_epoch_cache_update");
208-
maybe_reset_shreds_received_cache(shreds_received, packet_hasher, hasher_reset_ts);
198+
shred_deduper.maybe_reset(
199+
&mut rand::thread_rng(),
200+
DEDUPER_FALSE_POSITIVE_RATE,
201+
DEDUPER_RESET_CYCLE,
202+
);
209203
epoch_cache_update.stop();
210204
stats.epoch_cache_update += epoch_cache_update.as_us();
211205
// Lookup slot leader and cluster nodes for each slot.
212206
let shreds: Vec<_> = shreds
213207
.into_iter()
214208
.filter_map(|shred| {
215209
let key = shred::layout::get_shred_id(&shred)?;
216-
if should_skip_retransmit(key, &shred, shreds_received, packet_hasher) {
210+
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
217211
stats.num_shreds_skipped += 1;
218212
None
219213
} else {
@@ -377,10 +371,9 @@ pub fn retransmitter(
377371
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
378372
CLUSTER_NODES_CACHE_TTL,
379373
);
380-
let mut hasher_reset_ts = Instant::now();
374+
let mut rng = rand::thread_rng();
375+
let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
381376
let mut stats = RetransmitStats::new(Instant::now());
382-
let mut shreds_received = LruCache::<ShredId, _>::new(DEFAULT_LRU_SIZE);
383-
let mut packet_hasher = PacketHasher::default();
384377
let num_threads = get_thread_count().min(8).max(sockets.len());
385378
let thread_pool = ThreadPoolBuilder::new()
386379
.num_threads(num_threads)
@@ -399,9 +392,7 @@ pub fn retransmitter(
399392
&sockets,
400393
&mut stats,
401394
&cluster_nodes_cache,
402-
&mut hasher_reset_ts,
403-
&mut shreds_received,
404-
&mut packet_hasher,
395+
&mut shred_deduper,
405396
&max_slots,
406397
rpc_subscriptions.as_deref(),
407398
) {
@@ -593,6 +584,8 @@ impl RetransmitSlotStats {
593584
mod tests {
594585
use {
595586
super::*,
587+
rand::SeedableRng,
588+
rand_chacha::ChaChaRng,
596589
solana_ledger::shred::{Shred, ShredFlags},
597590
};
598591

@@ -611,22 +604,12 @@ mod tests {
611604
version,
612605
0,
613606
);
614-
let mut shreds_received = LruCache::new(100);
615-
let packet_hasher = PacketHasher::default();
607+
let mut rng = ChaChaRng::from_seed([0xa5; 32]);
608+
let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
616609
// unique shred for (1, 5) should pass
617-
assert!(!should_skip_retransmit(
618-
shred.id(),
619-
shred.payload(),
620-
&mut shreds_received,
621-
&packet_hasher
622-
));
610+
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
623611
// duplicate shred for (1, 5) blocked
624-
assert!(should_skip_retransmit(
625-
shred.id(),
626-
shred.payload(),
627-
&mut shreds_received,
628-
&packet_hasher
629-
));
612+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
630613

631614
let shred = Shred::new_from_data(
632615
slot,
@@ -639,19 +622,9 @@ mod tests {
639622
0,
640623
);
641624
// first duplicate shred for (1, 5) passed
642-
assert!(!should_skip_retransmit(
643-
shred.id(),
644-
shred.payload(),
645-
&mut shreds_received,
646-
&packet_hasher
647-
));
625+
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
648626
// then blocked
649-
assert!(should_skip_retransmit(
650-
shred.id(),
651-
shred.payload(),
652-
&mut shreds_received,
653-
&packet_hasher
654-
));
627+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
655628

656629
let shred = Shred::new_from_data(
657630
slot,
@@ -664,64 +637,24 @@ mod tests {
664637
0,
665638
);
666639
// 2nd duplicate shred for (1, 5) blocked
667-
assert!(should_skip_retransmit(
668-
shred.id(),
669-
shred.payload(),
670-
&mut shreds_received,
671-
&packet_hasher
672-
));
673-
assert!(should_skip_retransmit(
674-
shred.id(),
675-
shred.payload(),
676-
&mut shreds_received,
677-
&packet_hasher
678-
));
640+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
641+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
679642

680643
let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version);
681644
// Coding at (1, 5) passes
682-
assert!(!should_skip_retransmit(
683-
shred.id(),
684-
shred.payload(),
685-
&mut shreds_received,
686-
&packet_hasher
687-
));
645+
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
688646
// then blocked
689-
assert!(should_skip_retransmit(
690-
shred.id(),
691-
shred.payload(),
692-
&mut shreds_received,
693-
&packet_hasher
694-
));
647+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
695648

696649
let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version);
697650
// 2nd unique coding at (1, 5) passes
698-
assert!(!should_skip_retransmit(
699-
shred.id(),
700-
shred.payload(),
701-
&mut shreds_received,
702-
&packet_hasher
703-
));
651+
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
704652
// same again is blocked
705-
assert!(should_skip_retransmit(
706-
shred.id(),
707-
shred.payload(),
708-
&mut shreds_received,
709-
&packet_hasher
710-
));
653+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
711654

712655
let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version);
713656
// Another unique coding at (1, 5) always blocked
714-
assert!(should_skip_retransmit(
715-
shred.id(),
716-
shred.payload(),
717-
&mut shreds_received,
718-
&packet_hasher
719-
));
720-
assert!(should_skip_retransmit(
721-
shred.id(),
722-
shred.payload(),
723-
&mut shreds_received,
724-
&packet_hasher
725-
));
657+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
658+
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
726659
}
727660
}

ledger/src/blockstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3160,7 +3160,7 @@ impl Blockstore {
31603160
// given slot and index as this implies the leader generated two different shreds with
31613161
// the same slot and index
31623162
pub fn is_shred_duplicate(&self, shred: ShredId, payload: Vec<u8>) -> Option<Vec<u8>> {
3163-
let (slot, index, shred_type) = shred.unwrap();
3163+
let (slot, index, shred_type) = shred.unpack();
31643164
let existing_shred = match shred_type {
31653165
ShredType::Data => self.get_data_shred(slot, index as u64),
31663166
ShredType::Code => self.get_coding_shred(slot, index as u64),

ledger/src/shred.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl ShredId {
258258
self.0
259259
}
260260

261-
pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
261+
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
262262
(self.0, self.1, self.2)
263263
}
264264

0 commit comments

Comments
 (0)