Skip to content

Commit f67084a

Browse files
authored
Remove reprocess channel (#7437)
Partially #6291 This PR removes the reprocess event channel from being externally exposed. All work events are now sent through the single `BeaconProcessorSend` channel. I've introduced a new `Work::Reprocess` enum variant which we then use to schedule jobs for reprocess. I've also created a new scheduler module which will eventually house the different scheduler impls. This is all needed as an initial step to generalize the beacon processor A "full" implementation for the generalized beacon processor can be found here #6448 I'm going to try to break up the full implementation into smaller PR's so it can actually be reviewed
1 parent dd98534 commit f67084a

File tree

17 files changed

+186
-152
lines changed

17 files changed

+186
-152
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4747
use logging::crit;
4848
use logging::TimeLatch;
4949
use parking_lot::Mutex;
50+
pub use scheduler::work_reprocessing_queue;
5051
use serde::{Deserialize, Serialize};
5152
use slot_clock::SlotClock;
5253
use std::cmp;
@@ -73,7 +74,7 @@ use work_reprocessing_queue::{
7374
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};
7475

7576
mod metrics;
76-
pub mod work_reprocessing_queue;
77+
pub mod scheduler;
7778

7879
/// The maximum size of the channel for work events to the `BeaconProcessor`.
7980
///
@@ -264,22 +265,16 @@ impl Default for BeaconProcessorConfig {
264265
pub struct BeaconProcessorChannels<E: EthSpec> {
265266
pub beacon_processor_tx: BeaconProcessorSend<E>,
266267
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
267-
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
268-
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
269268
}
270269

271270
impl<E: EthSpec> BeaconProcessorChannels<E> {
272271
pub fn new(config: &BeaconProcessorConfig) -> Self {
273272
let (beacon_processor_tx, beacon_processor_rx) =
274273
mpsc::channel(config.max_work_event_queue_len);
275-
let (work_reprocessing_tx, work_reprocessing_rx) =
276-
mpsc::channel(config.max_scheduled_work_queue_len);
277274

278275
Self {
279276
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
280277
beacon_processor_rx,
281-
work_reprocessing_rx,
282-
work_reprocessing_tx,
283278
}
284279
}
285280
}
@@ -638,6 +633,7 @@ pub enum Work<E: EthSpec> {
638633
LightClientUpdatesByRangeRequest(BlockingFn),
639634
ApiRequestP0(BlockingOrAsync),
640635
ApiRequestP1(BlockingOrAsync),
636+
Reprocess(ReprocessQueueMessage),
641637
}
642638

643639
impl<E: EthSpec> fmt::Debug for Work<E> {
@@ -692,6 +688,7 @@ pub enum WorkType {
692688
LightClientUpdatesByRangeRequest,
693689
ApiRequestP0,
694690
ApiRequestP1,
691+
Reprocess,
695692
}
696693

697694
impl<E: EthSpec> Work<E> {
@@ -750,6 +747,7 @@ impl<E: EthSpec> Work<E> {
750747
}
751748
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
752749
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
750+
Work::Reprocess { .. } => WorkType::Reprocess,
753751
}
754752
}
755753
}
@@ -774,7 +772,7 @@ struct InboundEvents<E: EthSpec> {
774772
/// Used by upstream processes to send new work to the `BeaconProcessor`.
775773
event_rx: mpsc::Receiver<WorkEvent<E>>,
776774
/// Used internally for queuing work ready to be re-processed.
777-
reprocess_work_rx: mpsc::Receiver<ReadyWork>,
775+
ready_work_rx: mpsc::Receiver<ReadyWork>,
778776
}
779777

780778
impl<E: EthSpec> Stream for InboundEvents<E> {
@@ -795,7 +793,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
795793

796794
// Poll for delayed blocks before polling for new work. It might be the case that a delayed
797795
// block is required to successfully process some new work.
798-
match self.reprocess_work_rx.poll_recv(cx) {
796+
match self.ready_work_rx.poll_recv(cx) {
799797
Poll::Ready(Some(ready_work)) => {
800798
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
801799
}
@@ -846,8 +844,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
846844
pub fn spawn_manager<S: SlotClock + 'static>(
847845
mut self,
848846
event_rx: mpsc::Receiver<WorkEvent<E>>,
849-
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
850-
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
851847
work_journal_tx: Option<mpsc::Sender<&'static str>>,
852848
slot_clock: S,
853849
maximum_gossip_clock_disparity: Duration,
@@ -935,9 +931,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
935931
// receive them back once they are ready (`ready_work_rx`).
936932
let (ready_work_tx, ready_work_rx) =
937933
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
934+
935+
let (reprocess_work_tx, reprocess_work_rx) =
936+
mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);
937+
938938
spawn_reprocess_scheduler(
939939
ready_work_tx,
940-
work_reprocessing_rx,
940+
reprocess_work_rx,
941941
&self.executor,
942942
Arc::new(slot_clock),
943943
maximum_gossip_clock_disparity,
@@ -951,7 +951,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
951951
let mut inbound_events = InboundEvents {
952952
idle_rx,
953953
event_rx,
954-
reprocess_work_rx: ready_work_rx,
954+
ready_work_rx,
955955
};
956956

957957
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
@@ -965,7 +965,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
965965
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
966966
match QueuedBackfillBatch::try_from(event) {
967967
Ok(backfill_batch) => {
968-
match work_reprocessing_tx
968+
match reprocess_work_tx
969969
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
970970
{
971971
Err(e) => {
@@ -1027,8 +1027,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
10271027
.unwrap_or(WORKER_FREED);
10281028

10291029
// We don't care if this message was successfully sent, we only use the journal
1030-
// during testing.
1031-
let _ = work_journal_tx.try_send(id);
1030+
// during testing. We also ignore reprocess messages to ensure our test cases can pass.
1031+
if id != "reprocess" {
1032+
let _ = work_journal_tx.try_send(id);
1033+
}
10321034
}
10331035

10341036
let can_spawn = self.current_workers < self.config.max_workers;
@@ -1318,6 +1320,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
13181320
let work_type = work.to_type();
13191321

13201322
match work {
1323+
Work::Reprocess(work_event) => {
1324+
if let Err(e) = reprocess_work_tx.try_send(work_event) {
1325+
error!(
1326+
error = ?e,
1327+
"Failed to reprocess work event"
1328+
)
1329+
}
1330+
}
13211331
_ if can_spawn => self.spawn_worker(work, idle_tx),
13221332
Work::GossipAttestation { .. } => attestation_queue.push(work),
13231333
// Attestation batches are formed internally within the
@@ -1488,6 +1498,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14881498
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
14891499
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
14901500
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
1501+
WorkType::Reprocess => 0,
14911502
};
14921503
metrics::observe_vec(
14931504
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
@@ -1639,6 +1650,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
16391650
| Work::LightClientUpdatesByRangeRequest(process_fn) => {
16401651
task_spawner.spawn_blocking(process_fn)
16411652
}
1653+
Work::Reprocess(_) => {}
16421654
};
16431655
}
16441656
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod work_reprocessing_queue;

beacon_node/client/src/builder.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,6 @@ where
546546
network_senders: None,
547547
network_globals: None,
548548
beacon_processor_send: None,
549-
beacon_processor_reprocess_send: None,
550549
eth1_service: Some(genesis_service.eth1_service.clone()),
551550
sse_logging_components: runtime_context.sse_logging_components.clone(),
552551
});
@@ -638,7 +637,6 @@ where
638637
context.executor,
639638
libp2p_registry.as_mut(),
640639
beacon_processor_channels.beacon_processor_tx.clone(),
641-
beacon_processor_channels.work_reprocessing_tx.clone(),
642640
)
643641
.await
644642
.map_err(|e| format!("Failed to start network: {:?}", e))?;
@@ -777,9 +775,6 @@ where
777775
network_globals: self.network_globals.clone(),
778776
eth1_service: self.eth1_service.clone(),
779777
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
780-
beacon_processor_reprocess_send: Some(
781-
beacon_processor_channels.work_reprocessing_tx.clone(),
782-
),
783778
sse_logging_components: runtime_context.sse_logging_components.clone(),
784779
});
785780

@@ -843,8 +838,6 @@ where
843838
}
844839
.spawn_manager(
845840
beacon_processor_channels.beacon_processor_rx,
846-
beacon_processor_channels.work_reprocessing_tx.clone(),
847-
beacon_processor_channels.work_reprocessing_rx,
848841
None,
849842
beacon_chain.slot_clock.clone(),
850843
beacon_chain.spec.maximum_gossip_clock_disparity(),
@@ -918,7 +911,7 @@ where
918911
compute_light_client_updates(
919912
&inner_chain,
920913
light_client_server_rv,
921-
beacon_processor_channels.work_reprocessing_tx,
914+
beacon_processor_channels.beacon_processor_tx,
922915
)
923916
.await
924917
},

beacon_node/client/src/compute_light_client_updates.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
22
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
3+
use beacon_processor::{BeaconProcessorSend, Work, WorkEvent};
34
use futures::channel::mpsc::Receiver;
45
use futures::StreamExt;
5-
use tokio::sync::mpsc::Sender;
66
use tracing::error;
77

88
// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
@@ -14,7 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
1414
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
1515
chain: &BeaconChain<T>,
1616
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
17-
reprocess_tx: Sender<ReprocessQueueMessage>,
17+
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
1818
) {
1919
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
2020
//
@@ -31,7 +31,13 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
3131
});
3232

3333
let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
34-
if reprocess_tx.try_send(msg).is_err() {
34+
if beacon_processor_send
35+
.try_send(WorkEvent {
36+
drop_during_sync: true,
37+
work: Work::Reprocess(msg),
38+
})
39+
.is_err()
40+
{
3541
error!(%parent_root,"Failed to inform light client update")
3642
};
3743
}

beacon_node/http_api/src/lib.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use beacon_chain::{
4040
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
4141
BeaconChainTypes, WhenSlotSkipped,
4242
};
43-
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
43+
use beacon_processor::BeaconProcessorSend;
4444
pub use block_id::BlockId;
4545
use builder_states::get_next_withdrawals;
4646
use bytes::Bytes;
@@ -130,7 +130,6 @@ pub struct Context<T: BeaconChainTypes> {
130130
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
131131
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
132132
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
133-
pub beacon_processor_reprocess_send: Option<Sender<ReprocessQueueMessage>>,
134133
pub eth1_service: Option<eth1::Service>,
135134
pub sse_logging_components: Option<SSELoggingComponents>,
136135
}
@@ -554,11 +553,6 @@ pub fn serve<T: BeaconChainTypes>(
554553
.filter(|_| config.enable_beacon_processor);
555554
let task_spawner_filter =
556555
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
557-
let beacon_processor_reprocess_send = ctx
558-
.beacon_processor_reprocess_send
559-
.clone()
560-
.filter(|_| config.enable_beacon_processor);
561-
let reprocess_send_filter = warp::any().map(move || beacon_processor_reprocess_send.clone());
562556

563557
let duplicate_block_status_code = ctx.config.duplicate_block_status_code;
564558

@@ -1986,20 +1980,18 @@ pub fn serve<T: BeaconChainTypes>(
19861980
.and(warp_utils::json::json::<Vec<SingleAttestation>>())
19871981
.and(optional_consensus_version_header_filter)
19881982
.and(network_tx_filter.clone())
1989-
.and(reprocess_send_filter.clone())
19901983
.then(
19911984
|task_spawner: TaskSpawner<T::EthSpec>,
19921985
chain: Arc<BeaconChain<T>>,
19931986
attestations: Vec<SingleAttestation>,
19941987
_fork_name: Option<ForkName>,
1995-
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
1996-
reprocess_tx: Option<Sender<ReprocessQueueMessage>>| async move {
1988+
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
19971989
let result = crate::publish_attestations::publish_attestations(
19981990
task_spawner,
19991991
chain,
20001992
attestations,
20011993
network_tx,
2002-
reprocess_tx,
1994+
true,
20031995
)
20041996
.await
20051997
.map(|()| warp::reply::json(&()));

beacon_node/http_api/src/publish_attestations.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,13 @@ use beacon_chain::{
4040
BeaconChainTypes,
4141
};
4242
use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage};
43+
use beacon_processor::{Work, WorkEvent};
4344
use eth2::types::Failure;
4445
use lighthouse_network::PubsubMessage;
4546
use network::NetworkMessage;
4647
use std::sync::Arc;
4748
use std::time::Duration;
48-
use tokio::sync::{
49-
mpsc::{Sender, UnboundedSender},
50-
oneshot,
51-
};
49+
use tokio::sync::{mpsc::UnboundedSender, oneshot};
5250
use tracing::{debug, error, warn};
5351
use types::SingleAttestation;
5452

@@ -130,7 +128,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
130128
chain: Arc<BeaconChain<T>>,
131129
attestations: Vec<SingleAttestation>,
132130
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
133-
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
131+
allow_reprocess: bool,
134132
) -> Result<(), warp::Rejection> {
135133
// Collect metadata about attestations which we'll use to report failures. We need to
136134
// move the `attestations` vec into the blocking task, so this small overhead is unavoidable.
@@ -142,6 +140,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
142140
// Gossip validate and publish attestations that can be immediately processed.
143141
let seen_timestamp = timestamp_now();
144142
let mut prelim_results = task_spawner
143+
.clone()
145144
.blocking_task(Priority::P0, move || {
146145
Ok(attestations
147146
.into_iter()
@@ -156,7 +155,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
156155
Err(Error::Validation(AttestationError::UnknownHeadBlock {
157156
beacon_block_root,
158157
})) => {
159-
let Some(reprocess_tx) = &reprocess_send else {
158+
if !allow_reprocess {
160159
return PublishAttestationResult::Failure(Error::ReprocessDisabled);
161160
};
162161
// Re-process.
@@ -180,7 +179,13 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
180179
beacon_block_root,
181180
process_fn: Box::new(reprocess_fn),
182181
});
183-
if reprocess_tx.try_send(reprocess_msg).is_err() {
182+
if task_spawner
183+
.try_send(WorkEvent {
184+
drop_during_sync: false,
185+
work: Work::Reprocess(reprocess_msg),
186+
})
187+
.is_err()
188+
{
184189
PublishAttestationResult::Failure(Error::ReprocessFull)
185190
} else {
186191
PublishAttestationResult::Reprocessing(rx)

beacon_node/http_api/src/task_spawner.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ impl Priority {
3030
}
3131

3232
/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
33+
#[derive(Clone)]
3334
pub struct TaskSpawner<E: EthSpec> {
3435
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
3536
/// used if this is `None`.
@@ -155,6 +156,32 @@ impl<E: EthSpec> TaskSpawner<E> {
155156
.and_then(|x| x)
156157
}
157158
}
159+
160+
pub fn try_send(&self, work_event: WorkEvent<E>) -> Result<(), warp::Rejection> {
161+
if let Some(beacon_processor_send) = &self.beacon_processor_send {
162+
let error_message = match beacon_processor_send.try_send(work_event) {
163+
Ok(()) => None,
164+
Err(TrySendError::Full(_)) => {
165+
Some("The task was dropped. The server is overloaded.")
166+
}
167+
Err(TrySendError::Closed(_)) => {
168+
Some("The task was dropped. The server is shutting down.")
169+
}
170+
};
171+
172+
if let Some(error_message) = error_message {
173+
return Err(warp_utils::reject::custom_server_error(
174+
error_message.to_string(),
175+
));
176+
};
177+
178+
Ok(())
179+
} else {
180+
Err(warp_utils::reject::custom_server_error(
181+
"The beacon processor is unavailable".to_string(),
182+
))
183+
}
184+
}
158185
}
159186

160187
/// Send a task to the beacon processor and await execution.

0 commit comments

Comments
 (0)