diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f9138420f461c6..099ea1a0f82418 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -12,7 +12,7 @@ use { }, crate::{ banking_stage::{ - consume_worker::ConsumeWorker, + consume_worker::{ConsumeWorker, CrossbeamConsumeWorkerChannels}, transaction_scheduler::{ prio_graph_scheduler::PrioGraphScheduler, scheduler_controller::{ @@ -485,14 +485,16 @@ impl BankingStage { let consume_worker = ConsumeWorker::new( id, exit.clone(), - work_receiver, + CrossbeamConsumeWorkerChannels { + receiver: work_receiver, + sender: finished_work_sender.clone(), + }, Consumer::new( context.committer.clone(), context.transaction_recorder.clone(), QosService::new(id), context.log_messages_bytes_limit, ), - finished_work_sender.clone(), context.poh_recorder.read().unwrap().shared_working_bank(), ); diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 1fe91aa4ccc433..6a1de58016eab8 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -5,7 +5,6 @@ use { scheduler_messages::{ConsumeWork, FinishedConsumeWork}, }, crate::banking_stage::consumer::RetryableIndex, - crossbeam_channel::{Receiver, SendError, Sender, TryRecvError}, solana_poh::poh_recorder::SharedWorkingBank, solana_runtime::bank::Bank, solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, @@ -22,11 +21,11 @@ use { }; #[derive(Debug, Error)] -pub enum ConsumeWorkerError { - #[error("Failed to receive work from scheduler: {0}")] - Recv(#[from] TryRecvError), - #[error("Failed to send finalized consume work to scheduler: {0}")] - Send(#[from] SendError>), +pub enum ConsumeWorkerError { + #[error("Failed to receive work from scheduler")] + Recv(TryRecvError), + #[error("Failed to send finalized consume work to scheduler")] + Send(TrySendError), } enum ProcessingStatus { @@ -35,30 +34,27 @@ enum ProcessingStatus { CouldNotProcess(ConsumeWork), } -pub(crate) struct ConsumeWorker { +pub(crate) struct ConsumeWorker { exit: Arc, - consume_receiver: Receiver>, + channels: Ch, consumer: Consumer, - consumed_sender: Sender>, shared_working_bank: SharedWorkingBank, metrics: Arc, } -impl ConsumeWorker { +impl ConsumeWorker { pub fn new( id: u32, exit: Arc, - consume_receiver: Receiver>, + channels: Channels, consumer: Consumer, - consumed_sender: Sender>, shared_working_bank: SharedWorkingBank, ) -> Self { Self { exit, - consume_receiver, + channels, consumer, - consumed_sender, shared_working_bank, metrics: Arc::new(ConsumeWorkerMetrics::new(id)), } @@ -68,7 +64,7 @@ impl ConsumeWorker { self.metrics.clone() } - pub fn run(self) -> Result<(), ConsumeWorkerError> { + pub fn run(mut self) -> Result<(), ConsumeWorkerError> { const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250); let mut did_work = false; @@ -76,7 +72,7 @@ impl ConsumeWorker { let mut sleep_duration = STARTING_SLEEP_DURATION; while !self.exit.load(Ordering::Relaxed) { - match self.consume_receiver.try_recv() { + match self.channels.try_recv() { Ok(work) => { did_work = true; match self.consume(work)? { @@ -106,9 +102,9 @@ impl ConsumeWorker { } fn consume( - &self, - work: ConsumeWork, - ) -> Result, ConsumeWorkerError> { + &mut self, + work: ConsumeWork, + ) -> Result, ConsumeWorkerError> { let Some(bank) = self.active_working_bank_with_timeout() else { return Ok(ProcessingStatus::CouldNotProcess(work)); }; @@ -128,10 +124,10 @@ impl ConsumeWorker { /// Consume a single batch. fn consume_with_bank( - &self, + &mut self, bank: &Arc, - work: ConsumeWork, - ) -> Result<(), ConsumeWorkerError> { + work: ConsumeWork, + ) -> Result<(), ConsumeWorkerError> { let output = self.consumer.process_and_record_aged_transactions( bank, &work.transactions, @@ -141,12 +137,15 @@ impl ConsumeWorker { self.metrics.update_for_consume(&output); self.metrics.has_data.store(true, Ordering::Relaxed); - self.consumed_sender.send(FinishedConsumeWork { - work, - retryable_indexes: output - .execute_and_commit_transactions_output - .retryable_transaction_indexes, - })?; + self.channels + .try_send(FinishedConsumeWork { + work, + retryable_indexes: output + .execute_and_commit_transactions_output + .retryable_transaction_indexes, + }) + .map_err(ConsumeWorkerError::Send)?; + Ok(()) } @@ -189,18 +188,19 @@ impl ConsumeWorker { } /// Retry current batch and all outstanding batches. - fn retry_drain(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { - for work in try_drain_iter(work, &self.consume_receiver) { + fn retry_drain(&mut self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + self.retry(work)?; + while let Ok(work) = self.channels.try_recv() { + self.retry(work)?; if self.exit.load(Ordering::Relaxed) { return Ok(()); } - self.retry(work)?; } Ok(()) } /// Send transactions back to scheduler as retryable. - fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + fn retry(&mut self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { let retryable_indexes: Vec<_> = (0..work.transactions.len()) .map(|index| RetryableIndex { index, @@ -217,20 +217,16 @@ impl ConsumeWorker { .retryable_expired_bank_count .fetch_add(num_retryable, Ordering::Relaxed); self.metrics.has_data.store(true, Ordering::Relaxed); - self.consumed_sender.send(FinishedConsumeWork { - work, - retryable_indexes, - })?; + self.channels + .try_send(FinishedConsumeWork { + work, + retryable_indexes, + }) + .map_err(ConsumeWorkerError::Send)?; Ok(()) } } -/// Helper function to create an non-blocking iterator over work in the receiver, -/// starting with the given work item. -fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator + '_ { - std::iter::once(work).chain(receiver.try_iter()) -} - fn backoff(idle_duration: Duration, sleep_duration: &mut Duration) { const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1); const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1); @@ -784,6 +780,55 @@ impl ConsumeWorkerTransactionErrorMetrics { } } +#[derive(Debug)] +pub enum TryRecvError { + Empty, + Disconnected, +} + +#[derive(Debug)] +pub enum TrySendError { + Full, + Disconnected, +} + +pub(crate) trait ConsumeWorkerChannels { + type Tx: TransactionWithMeta; + fn try_recv(&mut self) -> Result, TryRecvError>; + fn try_send( + &mut self, + finished_work: FinishedConsumeWork, + ) -> Result<(), TrySendError>; +} + +pub(crate) struct CrossbeamConsumeWorkerChannels { + pub receiver: crossbeam_channel::Receiver>, + pub sender: crossbeam_channel::Sender>, +} + +impl ConsumeWorkerChannels for CrossbeamConsumeWorkerChannels { + type Tx = Tx; + + fn try_recv(&mut self) -> Result, TryRecvError> { + self.receiver.try_recv().map_err(|err| match err { + crossbeam_channel::TryRecvError::Empty => TryRecvError::Empty, + crossbeam_channel::TryRecvError::Disconnected => TryRecvError::Disconnected, + }) + } + + fn try_send( + &mut self, + finished_work: FinishedConsumeWork, + ) -> Result<(), TrySendError> { + self.sender + .try_send(finished_work) + .map_err(|err| match err { + crossbeam_channel::TrySendError::Full(_) => TrySendError::Full, + crossbeam_channel::TrySendError::Disconnected(_) => TrySendError::Disconnected, + }) + } +} + #[cfg(test)] mod tests { use { @@ -794,7 +839,7 @@ mod tests { scheduler_messages::{MaxAge, TransactionBatchId}, tests::{create_slow_genesis_config, sanitize_transactions}, }, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver, Sender}, solana_clock::{Slot, MAX_PROCESSING_AGE}, solana_genesis_config::GenesisConfig, solana_keypair::Keypair, @@ -848,7 +893,7 @@ mod tests { relax_intrabatch_account_locks: bool, ) -> ( TestFrame, - ConsumeWorker>, + ConsumeWorker>>, ) { let GenesisConfigInfo { genesis_config, @@ -881,12 +926,15 @@ mod tests { let (consume_sender, consume_receiver) = unbounded(); let (consumed_sender, consumed_receiver) = unbounded(); + let channels = CrossbeamConsumeWorkerChannels { + receiver: consume_receiver, + sender: consumed_sender, + }; let worker = ConsumeWorker::new( 0, Arc::new(AtomicBool::new(false)), - consume_receiver, + channels, consumer, - consumed_sender, shared_working_bank.clone(), );