Skip to content

Commit f1ccc41

Browse files
v1.14: moves shreds deduper to shred-sigverify stage (backport of #30786) (#30861)
* moves shreds deduper to shred-sigverify stage (#30786) Shreds arriving at tvu/tvu_forward/repair sockets are each processed in a separate thread, and since each thread has its own deduper, the duplicates across these sockets are not filtered out. Using a common deduper across these threads will require an RwLock wrapper and may introduce lock contention. The commit instead moves the shred-deduper to shred-sigverify-stage where all these shreds arrive through the same channel. (cherry picked from commit 25b7811) # Conflicts: # core/src/shred_fetch_stage.rs # core/src/sigverify_shreds.rs * resolves mergify merge conflicts --------- Co-authored-by: behzad nouri <behzadnouri@gmail.com>
1 parent 8c5f685 commit f1ccc41

File tree

3 files changed

+47
-89
lines changed

3 files changed

+47
-89
lines changed

core/src/shred_fetch_stage.rs

Lines changed: 12 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ use {
55
crossbeam_channel::{unbounded, Sender},
66
solana_gossip::cluster_info::ClusterInfo,
77
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
8-
solana_perf::{
9-
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
10-
sigverify::Deduper,
11-
},
8+
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
129
solana_runtime::{bank::Bank, bank_forks::BankForks},
1310
solana_sdk::{
1411
clock::{Slot, DEFAULT_MS_PER_SLOT},
@@ -26,10 +23,6 @@ use {
2623
},
2724
};
2825

29-
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
30-
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
31-
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
32-
3326
pub(crate) struct ShredFetchStage {
3427
thread_hdls: Vec<JoinHandle<()>>,
3528
}
@@ -47,8 +40,6 @@ impl ShredFetchStage {
4740
turbine_disabled: Arc<AtomicBool>,
4841
) {
4942
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
50-
let mut rng = rand::thread_rng();
51-
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
5243
let mut last_updated = Instant::now();
5344
let mut keypair = repair_context
5445
.as_ref()
@@ -63,9 +54,6 @@ impl ShredFetchStage {
6354
let mut stats = ShredFetchStats::default();
6455

6556
for mut packet_batch in recvr {
66-
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
67-
stats.num_deduper_saturations += 1;
68-
}
6957
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
7058
last_updated = Instant::now();
7159
{
@@ -102,12 +90,11 @@ impl ShredFetchStage {
10290
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
10391
for packet in packet_batch.iter_mut() {
10492
if turbine_disabled
105-
|| should_discard_packet(
93+
|| should_discard_shred(
10694
packet,
10795
last_root,
10896
max_slot,
10997
shred_version,
110-
&deduper,
11198
should_drop_merkle_shreds,
11299
&mut stats,
113100
)
@@ -245,39 +232,6 @@ impl ShredFetchStage {
245232
}
246233
}
247234

248-
// Returns true if the packet should be marked as discard.
249-
#[must_use]
250-
fn should_discard_packet<const K: usize>(
251-
packet: &Packet,
252-
root: Slot,
253-
max_slot: Slot, // Max slot to ingest shreds for.
254-
shred_version: u16,
255-
deduper: &Deduper<K, [u8]>,
256-
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
257-
stats: &mut ShredFetchStats,
258-
) -> bool {
259-
if should_discard_shred(
260-
packet,
261-
root,
262-
max_slot,
263-
shred_version,
264-
should_drop_merkle_shreds,
265-
stats,
266-
) {
267-
return true;
268-
}
269-
if packet
270-
.data(..)
271-
.map(|data| deduper.dedup(data))
272-
.unwrap_or(true)
273-
{
274-
stats.duplicate_shred += 1;
275-
true
276-
} else {
277-
false
278-
}
279-
}
280-
281235
#[must_use]
282236
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
283237
check_feature_activation(
@@ -299,13 +253,12 @@ mod tests {
299253
blockstore::MAX_DATA_SHREDS_PER_SLOT,
300254
shred::{ReedSolomonCache, Shred, ShredFlags},
301255
},
256+
solana_sdk::packet::Packet,
302257
};
303258

304259
#[test]
305260
fn test_data_code_same_index() {
306261
solana_logger::setup();
307-
let mut rng = rand::thread_rng();
308-
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
309262
let mut packet = Packet::default();
310263
let mut stats = ShredFetchStats::default();
311264

@@ -327,12 +280,11 @@ mod tests {
327280
let last_slot = 100;
328281
let slots_per_epoch = 10;
329282
let max_slot = last_slot + 2 * slots_per_epoch;
330-
assert!(!should_discard_packet(
283+
assert!(!should_discard_shred(
331284
&packet,
332285
last_root,
333286
max_slot,
334287
shred_version,
335-
&deduper,
336288
|_| false, // should_drop_merkle_shreds
337289
&mut stats,
338290
));
@@ -342,12 +294,11 @@ mod tests {
342294
&ReedSolomonCache::default(),
343295
);
344296
coding[0].copy_to_packet(&mut packet);
345-
assert!(!should_discard_packet(
297+
assert!(!should_discard_shred(
346298
&packet,
347299
last_root,
348300
max_slot,
349301
shred_version,
350-
&deduper,
351302
|_| false, // should_drop_merkle_shreds
352303
&mut stats,
353304
));
@@ -356,8 +307,6 @@ mod tests {
356307
#[test]
357308
fn test_shred_filter() {
358309
solana_logger::setup();
359-
let mut rng = rand::thread_rng();
360-
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
361310
let mut packet = Packet::default();
362311
let mut stats = ShredFetchStats::default();
363312
let last_root = 0;
@@ -367,12 +316,11 @@ mod tests {
367316
let max_slot = last_slot + 2 * slots_per_epoch;
368317

369318
// packet size is 0, so cannot get index
370-
assert!(should_discard_packet(
319+
assert!(should_discard_shred(
371320
&packet,
372321
last_root,
373322
max_slot,
374323
shred_version,
375-
&deduper,
376324
|_| false, // should_drop_merkle_shreds
377325
&mut stats,
378326
));
@@ -390,50 +338,35 @@ mod tests {
390338
shred.copy_to_packet(&mut packet);
391339

392340
// rejected slot is 2, root is 3
393-
assert!(should_discard_packet(
341+
assert!(should_discard_shred(
394342
&packet,
395343
3,
396344
max_slot,
397345
shred_version,
398-
&deduper,
399346
|_| false, // should_drop_merkle_shreds
400347
&mut stats,
401348
));
402349
assert_eq!(stats.slot_out_of_range, 1);
403350

404-
assert!(should_discard_packet(
351+
assert!(should_discard_shred(
405352
&packet,
406353
last_root,
407354
max_slot,
408-
345, // shred_version
409-
&deduper,
355+
345, // shred_version
410356
|_| false, // should_drop_merkle_shreds
411357
&mut stats,
412358
));
413359
assert_eq!(stats.shred_version_mismatch, 1);
414360

415361
// Accepted for 1,3
416-
assert!(!should_discard_packet(
417-
&packet,
418-
last_root,
419-
max_slot,
420-
shred_version,
421-
&deduper,
422-
|_| false, // should_drop_merkle_shreds
423-
&mut stats,
424-
));
425-
426-
// deduper should filter duplicate
427-
assert!(should_discard_packet(
362+
assert!(!should_discard_shred(
428363
&packet,
429364
last_root,
430365
max_slot,
431366
shred_version,
432-
&deduper,
433367
|_| false, // should_drop_merkle_shreds
434368
&mut stats,
435369
));
436-
assert_eq!(stats.duplicate_shred, 1);
437370

438371
let shred = Shred::new_from_data(
439372
1_000_000,
@@ -448,25 +381,23 @@ mod tests {
448381
shred.copy_to_packet(&mut packet);
449382

450383
// Slot 1 million is too high
451-
assert!(should_discard_packet(
384+
assert!(should_discard_shred(
452385
&packet,
453386
last_root,
454387
max_slot,
455388
shred_version,
456-
&deduper,
457389
|_| false, // should_drop_merkle_shreds
458390
&mut stats,
459391
));
460392

461393
let index = MAX_DATA_SHREDS_PER_SLOT as u32;
462394
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
463395
shred.copy_to_packet(&mut packet);
464-
assert!(should_discard_packet(
396+
assert!(should_discard_shred(
465397
&packet,
466398
last_root,
467399
max_slot,
468400
shred_version,
469-
&deduper,
470401
|_| false, // should_drop_merkle_shreds
471402
&mut stats,
472403
));

core/src/sigverify_shreds.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use {
22
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
3-
rayon::{ThreadPool, ThreadPoolBuilder},
3+
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
44
solana_gossip::cluster_info::ClusterInfo,
55
solana_ledger::{
66
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
77
},
8-
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
8+
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache, sigverify::Deduper},
99
solana_rayon_threadlimit::get_thread_count,
1010
solana_runtime::{bank::Bank, bank_forks::BankForks},
1111
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signer},
@@ -17,6 +17,10 @@ use {
1717
},
1818
};
1919

20+
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
21+
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
22+
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
23+
2024
#[allow(clippy::enum_variant_names)]
2125
enum Error {
2226
RecvDisconnected,
@@ -40,7 +44,12 @@ pub(crate) fn spawn_shred_sigverify(
4044
.build()
4145
.unwrap();
4246
let run_shred_sigverify = move || {
47+
let mut rng = rand::thread_rng();
48+
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
4349
loop {
50+
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
51+
stats.num_deduper_saturations += 1;
52+
}
4453
// We can't store the pubkey outside the loop
4554
// because the identity might be hot swapped.
4655
let self_pubkey = cluster_info.keypair().pubkey();
@@ -50,6 +59,7 @@ pub(crate) fn spawn_shred_sigverify(
5059
&bank_forks,
5160
&leader_schedule_cache,
5261
&recycler_cache,
62+
&deduper,
5363
&shred_fetch_receiver,
5464
&retransmit_sender,
5565
&verified_sender,
@@ -70,12 +80,13 @@ pub(crate) fn spawn_shred_sigverify(
7080
}
7181

7282
#[allow(clippy::too_many_arguments)]
73-
fn run_shred_sigverify(
83+
fn run_shred_sigverify<const K: usize>(
7484
thread_pool: &ThreadPool,
7585
self_pubkey: &Pubkey,
7686
bank_forks: &RwLock<BankForks>,
7787
leader_schedule_cache: &LeaderScheduleCache,
7888
recycler_cache: &RecyclerCache,
89+
deduper: &Deduper<K, [u8]>,
7990
shred_fetch_receiver: &Receiver<PacketBatch>,
8091
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
8192
verified_sender: &Sender<Vec<PacketBatch>>,
@@ -90,6 +101,20 @@ fn run_shred_sigverify(
90101
stats.num_iters += 1;
91102
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
92103
stats.num_discards_pre += count_discards(&packets);
104+
stats.num_duplicates += thread_pool.install(|| {
105+
packets
106+
.par_iter_mut()
107+
.flatten()
108+
.filter(|packet| {
109+
!packet.meta.discard()
110+
&& packet
111+
.data(..)
112+
.map(|data| deduper.dedup(data))
113+
.unwrap_or(true)
114+
})
115+
.map(|packet| packet.meta.set_discard(true))
116+
.count()
117+
});
93118
verify_packets(
94119
thread_pool,
95120
self_pubkey,
@@ -198,8 +223,10 @@ struct ShredSigVerifyStats {
198223
since: Instant,
199224
num_iters: usize,
200225
num_packets: usize,
201-
num_discards_pre: usize,
226+
num_deduper_saturations: usize,
202227
num_discards_post: usize,
228+
num_discards_pre: usize,
229+
num_duplicates: usize,
203230
num_retransmit_shreds: usize,
204231
elapsed_micros: u64,
205232
}
@@ -213,7 +240,9 @@ impl ShredSigVerifyStats {
213240
num_iters: 0usize,
214241
num_packets: 0usize,
215242
num_discards_pre: 0usize,
243+
num_deduper_saturations: 0usize,
216244
num_discards_post: 0usize,
245+
num_duplicates: 0usize,
217246
num_retransmit_shreds: 0usize,
218247
elapsed_micros: 0u64,
219248
}
@@ -228,7 +257,9 @@ impl ShredSigVerifyStats {
228257
("num_iters", self.num_iters, i64),
229258
("num_packets", self.num_packets, i64),
230259
("num_discards_pre", self.num_discards_pre, i64),
260+
("num_deduper_saturations", self.num_deduper_saturations, i64),
231261
("num_discards_post", self.num_discards_post, i64),
262+
("num_duplicates", self.num_duplicates, i64),
232263
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
233264
("elapsed_micros", self.elapsed_micros, i64),
234265
);

ledger/src/shred/stats.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,13 @@ pub struct ProcessShredsStats {
3232
pub struct ShredFetchStats {
3333
pub index_overrun: usize,
3434
pub shred_count: usize,
35-
pub num_deduper_saturations: usize,
3635
pub(crate) num_shreds_merkle_code: usize,
3736
pub(crate) num_shreds_merkle_data: usize,
3837
pub ping_count: usize,
3938
pub ping_err_verify_count: usize,
4039
pub(crate) index_bad_deserialize: usize,
4140
pub(crate) index_out_of_bounds: usize,
4241
pub(crate) slot_bad_deserialize: usize,
43-
pub duplicate_shred: usize,
4442
pub slot_out_of_range: usize,
4543
pub(crate) bad_shred_type: usize,
4644
pub shred_version_mismatch: usize,
@@ -118,7 +116,6 @@ impl ShredFetchStats {
118116
name,
119117
("index_overrun", self.index_overrun, i64),
120118
("shred_count", self.shred_count, i64),
121-
("num_deduper_saturations", self.num_deduper_saturations, i64),
122119
("num_shreds_merkle_code", self.num_shreds_merkle_code, i64),
123120
("num_shreds_merkle_data", self.num_shreds_merkle_data, i64),
124121
("ping_count", self.ping_count, i64),
@@ -127,7 +124,6 @@ impl ShredFetchStats {
127124
("index_bad_deserialize", self.index_bad_deserialize, i64),
128125
("index_out_of_bounds", self.index_out_of_bounds, i64),
129126
("slot_out_of_range", self.slot_out_of_range, i64),
130-
("duplicate_shred", self.duplicate_shred, i64),
131127
("bad_shred_type", self.bad_shred_type, i64),
132128
("shred_version_mismatch", self.shred_version_mismatch, i64),
133129
("bad_parent_offset", self.bad_parent_offset, i64),

0 commit comments

Comments
 (0)