Skip to content

Commit 2de3451

Browse files
committed
Rate limiting backfill sync (#3936)
## Issue Addressed #3212 ## Proposed Changes - Introduce a new `rate_limiting_backfill_queue` - any new inbound backfill work events gets immediately sent to this FIFO queue **without any processing** - Spawn a `backfill_scheduler` routine that pops a backfill event from the FIFO queue at specified intervals (currently halfway through a slot, or at 6s after slot start for 12s slots) and sends the event to `BeaconProcessor` via a `scheduled_backfill_work_tx` channel - This channel gets polled last in the `InboundEvents`, and work event received is wrapped in a `InboundEvent::ScheduledBackfillWork` enum variant, which gets processed immediately or queued by the `BeaconProcessor` (existing logic applies from here) Diagram comparing backfill processing with / without rate-limiting: #3212 (comment) See this comment for @paulhauner's explanation and solution: #3212 (comment) ## Additional Info I've compared this branch (with backfill processing rate limited to to 1 and 3 batches per slot) against the latest stable version. The CPU usage during backfill sync is reduced by ~5% - 20%, more details on this page: https://hackmd.io/@jimmygchen/SJuVpJL3j The above testing is done on Goerli (as I don't currently have hardware for Mainnet), I'm guessing the differences are likely to be bigger on mainnet due to block size. ### TODOs - [x] Experiment with processing multiple batches per slot. (need to think about how to do this for different slot durations) - [x] Add option to disable rate-limiting, enabed by default. - [x] (No longer required now we're reusing the reprocessing queue) Complete the `backfill_scheduler` task when backfill sync is completed or not required
1 parent c5383e3 commit 2de3451

File tree

10 files changed

+380
-24
lines changed

10 files changed

+380
-24
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2893,7 +2893,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28932893
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES);
28942894
let block_delay = self
28952895
.slot_clock
2896-
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
2896+
.seconds_from_current_slot_start()
28972897
.ok_or(Error::UnableToComputeTimeAtSlot)?;
28982898

28992899
fork_choice
@@ -3746,7 +3746,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37463746

37473747
let slot_delay = self
37483748
.slot_clock
3749-
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
3749+
.seconds_from_current_slot_start()
37503750
.or_else(|| {
37513751
warn!(
37523752
self.log,

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub struct ChainConfig {
6868
///
6969
/// This is useful for block builders and testing.
7070
pub always_prepare_payload: bool,
71+
/// Whether backfill sync processing should be rate-limited.
72+
pub enable_backfill_rate_limiting: bool,
7173
}
7274

7375
impl Default for ChainConfig {
@@ -94,6 +96,7 @@ impl Default for ChainConfig {
9496
optimistic_finalized_sync: true,
9597
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
9698
always_prepare_payload: false,
99+
enable_backfill_rate_limiting: true,
97100
}
98101
}
99102
}

beacon_node/network/src/beacon_processor/mod.rs

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use std::time::Duration;
6161
use std::{cmp, collections::HashSet};
6262
use task_executor::TaskExecutor;
6363
use tokio::sync::mpsc;
64+
use tokio::sync::mpsc::error::TrySendError;
6465
use types::{
6566
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
6667
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange,
@@ -77,7 +78,9 @@ mod tests;
7778
mod work_reprocessing_queue;
7879
mod worker;
7980

80-
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
81+
use crate::beacon_processor::work_reprocessing_queue::{
82+
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
83+
};
8184
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
8285

8386
/// The maximum size of the channel for work events to the `BeaconProcessor`.
@@ -218,6 +221,7 @@ pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_upd
218221
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
219222
pub const RPC_BLOCK: &str = "rpc_block";
220223
pub const CHAIN_SEGMENT: &str = "chain_segment";
224+
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
221225
pub const STATUS_PROCESSING: &str = "status_processing";
222226
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
223227
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
@@ -738,6 +742,9 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
738742
seen_timestamp,
739743
},
740744
},
745+
ReadyWork::BackfillSync(QueuedBackfillBatch { process_id, blocks }) => {
746+
WorkEvent::chain_segment(process_id, blocks)
747+
}
741748
}
742749
}
743750
}
@@ -893,6 +900,10 @@ impl<T: BeaconChainTypes> Work<T> {
893900
Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
894901
Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
895902
Work::RpcBlock { .. } => RPC_BLOCK,
903+
Work::ChainSegment {
904+
process_id: ChainSegmentProcessId::BackSyncBatchId { .. },
905+
..
906+
} => CHAIN_SEGMENT_BACKFILL,
896907
Work::ChainSegment { .. } => CHAIN_SEGMENT,
897908
Work::Status { .. } => STATUS_PROCESSING,
898909
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
@@ -1054,23 +1065,23 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
10541065
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
10551066

10561067
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
1068+
1069+
let chain = match self.beacon_chain.upgrade() {
1070+
Some(chain) => chain,
1071+
// No need to proceed any further if the beacon chain has been dropped, the client
1072+
// is shutting down.
1073+
None => return,
1074+
};
1075+
10571076
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
10581077
// receive them back once they are ready (`ready_work_rx`).
10591078
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
1060-
let work_reprocessing_tx = {
1061-
if let Some(chain) = self.beacon_chain.upgrade() {
1062-
spawn_reprocess_scheduler(
1063-
ready_work_tx,
1064-
&self.executor,
1065-
chain.slot_clock.clone(),
1066-
self.log.clone(),
1067-
)
1068-
} else {
1069-
// No need to proceed any further if the beacon chain has been dropped, the client
1070-
// is shutting down.
1071-
return;
1072-
}
1073-
};
1079+
let work_reprocessing_tx = spawn_reprocess_scheduler(
1080+
ready_work_tx,
1081+
&self.executor,
1082+
chain.slot_clock.clone(),
1083+
self.log.clone(),
1084+
);
10741085

10751086
let executor = self.executor.clone();
10761087

@@ -1083,12 +1094,55 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
10831094
reprocess_work_rx: ready_work_rx,
10841095
};
10851096

1097+
let enable_backfill_rate_limiting = chain.config.enable_backfill_rate_limiting;
1098+
10861099
loop {
10871100
let work_event = match inbound_events.next().await {
10881101
Some(InboundEvent::WorkerIdle) => {
10891102
self.current_workers = self.current_workers.saturating_sub(1);
10901103
None
10911104
}
1105+
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
1106+
match QueuedBackfillBatch::try_from(event) {
1107+
Ok(backfill_batch) => {
1108+
match work_reprocessing_tx
1109+
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
1110+
{
1111+
Err(e) => {
1112+
warn!(
1113+
self.log,
1114+
"Unable to queue backfill work event. Will try to process now.";
1115+
"error" => %e
1116+
);
1117+
match e {
1118+
TrySendError::Full(reprocess_queue_message)
1119+
| TrySendError::Closed(reprocess_queue_message) => {
1120+
match reprocess_queue_message {
1121+
ReprocessQueueMessage::BackfillSync(
1122+
backfill_batch,
1123+
) => Some(backfill_batch.into()),
1124+
other => {
1125+
crit!(
1126+
self.log,
1127+
"Unexpected queue message type";
1128+
"message_type" => other.as_ref()
1129+
);
1130+
// This is an unhandled exception, drop the message.
1131+
continue;
1132+
}
1133+
}
1134+
}
1135+
}
1136+
}
1137+
Ok(..) => {
1138+
// backfill work sent to "reprocessing" queue. Process the next event.
1139+
continue;
1140+
}
1141+
}
1142+
}
1143+
Err(event) => Some(event),
1144+
}
1145+
}
10921146
Some(InboundEvent::WorkEvent(event))
10931147
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
10941148
None => {

beacon_node/network/src/beacon_processor/tests.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{service::NetworkMessage, sync::SyncMessage};
99
use beacon_chain::test_utils::{
1010
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
1111
};
12-
use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
12+
use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
1313
use lighthouse_network::{
1414
discv5::enr::{CombinedKey, EnrBuilder},
1515
rpc::methods::{MetaData, MetaDataV2},
@@ -23,8 +23,8 @@ use std::sync::Arc;
2323
use std::time::Duration;
2424
use tokio::sync::mpsc;
2525
use types::{
26-
Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock,
27-
SignedVoluntaryExit, SubnetId,
26+
Attestation, AttesterSlashing, Epoch, EthSpec, MainnetEthSpec, ProposerSlashing,
27+
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
2828
};
2929

3030
type E = MainnetEthSpec;
@@ -70,6 +70,10 @@ impl Drop for TestRig {
7070

7171
impl TestRig {
7272
pub async fn new(chain_length: u64) -> Self {
73+
Self::new_with_chain_config(chain_length, ChainConfig::default()).await
74+
}
75+
76+
pub async fn new_with_chain_config(chain_length: u64, chain_config: ChainConfig) -> Self {
7377
// This allows for testing voluntary exits without building out a massive chain.
7478
let mut spec = E::default_spec();
7579
spec.shard_committee_period = 2;
@@ -78,6 +82,7 @@ impl TestRig {
7882
.spec(spec)
7983
.deterministic_keypairs(VALIDATOR_COUNT)
8084
.fresh_ephemeral_store()
85+
.chain_config(chain_config)
8186
.build();
8287

8388
harness.advance_slot();
@@ -261,6 +266,14 @@ impl TestRig {
261266
self.beacon_processor_tx.try_send(event).unwrap();
262267
}
263268

269+
pub fn enqueue_backfill_batch(&self) {
270+
let event = WorkEvent::chain_segment(
271+
ChainSegmentProcessId::BackSyncBatchId(Epoch::default()),
272+
Vec::default(),
273+
);
274+
self.beacon_processor_tx.try_send(event).unwrap();
275+
}
276+
264277
pub fn enqueue_unaggregated_attestation(&self) {
265278
let (attestation, subnet_id) = self.attestations.first().unwrap().clone();
266279
self.beacon_processor_tx
@@ -873,3 +886,49 @@ async fn test_rpc_block_reprocessing() {
873886
// cache handle was dropped.
874887
assert_eq!(next_block_root, rig.head_root());
875888
}
889+
890+
/// Ensure that backfill batches get rate-limited and processing is scheduled at specified intervals.
891+
#[tokio::test]
892+
async fn test_backfill_sync_processing() {
893+
let mut rig = TestRig::new(SMALL_CHAIN).await;
894+
// Note: to verify the exact event times in an integration test is not straight forward here
895+
// (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code)
896+
// and makes the test very slow, hence timing calculation is unit tested separately in
897+
// `work_reprocessing_queue`.
898+
for _ in 0..1 {
899+
rig.enqueue_backfill_batch();
900+
// ensure queued batch is not processed until later
901+
rig.assert_no_events_for(Duration::from_millis(100)).await;
902+
// A new batch should be processed within a slot.
903+
rig.assert_event_journal_with_timeout(
904+
&[CHAIN_SEGMENT_BACKFILL, WORKER_FREED, NOTHING_TO_DO],
905+
rig.chain.slot_clock.slot_duration(),
906+
)
907+
.await;
908+
}
909+
}
910+
911+
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
912+
#[tokio::test]
913+
async fn test_backfill_sync_processing_rate_limiting_disabled() {
914+
let chain_config = ChainConfig {
915+
enable_backfill_rate_limiting: false,
916+
..Default::default()
917+
};
918+
let mut rig = TestRig::new_with_chain_config(SMALL_CHAIN, chain_config).await;
919+
920+
for _ in 0..3 {
921+
rig.enqueue_backfill_batch();
922+
}
923+
924+
// ensure all batches are processed
925+
rig.assert_event_journal_with_timeout(
926+
&[
927+
CHAIN_SEGMENT_BACKFILL,
928+
CHAIN_SEGMENT_BACKFILL,
929+
CHAIN_SEGMENT_BACKFILL,
930+
],
931+
Duration::from_millis(100),
932+
)
933+
.await;
934+
}

0 commit comments

Comments
 (0)