Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
},
crate::{
banking_stage::{
consume_worker::ConsumeWorker,
consume_worker::{ConsumeWorker, CrossbeamConsumeWorkerChannels},
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::{
Expand Down Expand Up @@ -485,14 +485,16 @@ impl BankingStage {
let consume_worker = ConsumeWorker::new(
id,
exit.clone(),
work_receiver,
CrossbeamConsumeWorkerChannels {
receiver: work_receiver,
sender: finished_work_sender.clone(),
},
Consumer::new(
context.committer.clone(),
context.transaction_recorder.clone(),
QosService::new(id),
context.log_messages_bytes_limit,
),
finished_work_sender.clone(),
context.poh_recorder.read().unwrap().shared_working_bank(),
);

Expand Down
138 changes: 93 additions & 45 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
},
crate::banking_stage::consumer::RetryableIndex,
crossbeam_channel::{Receiver, SendError, Sender, TryRecvError},
solana_poh::poh_recorder::SharedWorkingBank,
solana_runtime::bank::Bank,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
Expand All @@ -22,11 +21,11 @@ use {
};

#[derive(Debug, Error)]
pub enum ConsumeWorkerError<Tx> {
#[error("Failed to receive work from scheduler: {0}")]
Recv(#[from] TryRecvError),
#[error("Failed to send finalized consume work to scheduler: {0}")]
Send(#[from] SendError<FinishedConsumeWork<Tx>>),
pub enum ConsumeWorkerError {
#[error("Failed to receive work from scheduler")]
Recv(TryRecvError),
#[error("Failed to send finalized consume work to scheduler")]
Send(TrySendError),
}

enum ProcessingStatus<Tx> {
Expand All @@ -35,30 +34,27 @@ enum ProcessingStatus<Tx> {
CouldNotProcess(ConsumeWork<Tx>),
}

pub(crate) struct ConsumeWorker<Tx> {
pub(crate) struct ConsumeWorker<Ch> {
exit: Arc<AtomicBool>,
consume_receiver: Receiver<ConsumeWork<Tx>>,
channels: Ch,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,

shared_working_bank: SharedWorkingBank,
metrics: Arc<ConsumeWorkerMetrics>,
}

impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
impl<Channels: ConsumeWorkerChannels> ConsumeWorker<Channels> {
pub fn new(
id: u32,
exit: Arc<AtomicBool>,
consume_receiver: Receiver<ConsumeWork<Tx>>,
channels: Channels,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
shared_working_bank: SharedWorkingBank,
) -> Self {
Self {
exit,
consume_receiver,
channels,
consumer,
consumed_sender,
shared_working_bank,
metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
}
Expand All @@ -68,15 +64,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
pub fn run(mut self) -> Result<(), ConsumeWorkerError> {
const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250);

let mut did_work = false;
let mut last_empty_time = Instant::now();
let mut sleep_duration = STARTING_SLEEP_DURATION;

while !self.exit.load(Ordering::Relaxed) {
match self.consume_receiver.try_recv() {
match self.channels.try_recv() {
Ok(work) => {
did_work = true;
match self.consume(work)? {
Expand Down Expand Up @@ -106,9 +102,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

fn consume(
&self,
work: ConsumeWork<Tx>,
) -> Result<ProcessingStatus<Tx>, ConsumeWorkerError<Tx>> {
&mut self,
work: ConsumeWork<Channels::Tx>,
) -> Result<ProcessingStatus<Channels::Tx>, ConsumeWorkerError> {
let Some(bank) = self.active_working_bank_with_timeout() else {
return Ok(ProcessingStatus::CouldNotProcess(work));
};
Expand All @@ -128,10 +124,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {

/// Consume a single batch.
fn consume_with_bank(
&self,
&mut self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
) -> Result<(), ConsumeWorkerError<Tx>> {
work: ConsumeWork<Channels::Tx>,
) -> Result<(), ConsumeWorkerError> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
Expand All @@ -141,12 +137,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.update_for_consume(&output);
self.metrics.has_data.store(true, Ordering::Relaxed);

self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes: output
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})?;
self.channels
.try_send(FinishedConsumeWork {
work,
retryable_indexes: output
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})
.map_err(ConsumeWorkerError::Send)?;

Ok(())
}

Expand Down Expand Up @@ -189,8 +188,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

/// Retry current batch and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
for work in try_drain_iter(work, &self.consume_receiver) {
fn retry_drain(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> {
self.retry(work)?;
while let Ok(work) = self.channels.try_recv() {
if self.exit.load(Ordering::Relaxed) {
return Ok(());
}
Expand All @@ -200,7 +200,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
fn retry(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> {
let retryable_indexes: Vec<_> = (0..work.transactions.len())
.map(|index| RetryableIndex {
index,
Expand All @@ -217,20 +217,16 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.retryable_expired_bank_count
.fetch_add(num_retryable, Ordering::Relaxed);
self.metrics.has_data.store(true, Ordering::Relaxed);
self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes,
})?;
self.channels
.try_send(FinishedConsumeWork {
work,
retryable_indexes,
})
.map_err(ConsumeWorkerError::Send)?;
Ok(())
}
}

/// Helper function to create an non-blocking iterator over work in the receiver,
/// starting with the given work item.
fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T> + '_ {
std::iter::once(work).chain(receiver.try_iter())
}

fn backoff(idle_duration: Duration, sleep_duration: &mut Duration) {
const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1);
const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1);
Expand Down Expand Up @@ -784,6 +780,55 @@ impl ConsumeWorkerTransactionErrorMetrics {
}
}

#[derive(Debug)]
pub enum TryRecvError {
Empty,
Disconnected,
}

#[derive(Debug)]
pub enum TrySendError {
Full,
Disconnected,
}

pub(crate) trait ConsumeWorkerChannels {
type Tx: TransactionWithMeta;
fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError>;
fn try_send(
&mut self,
Comment on lines +797 to +799
Copy link
Author

@apfitzge apfitzge Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: for crossbeam these are not necessary to be mut. However for shaq we update some local usizes internally.

Likely when we get to point of using shaq we may need to add something like sync/finalize. Don't need those with crossbeam either, but trying to keep this PR simpler. We don't want to call them every time we recv/send for efficiency.

finished_work: FinishedConsumeWork<Self::Tx>,
) -> Result<(), TrySendError>;
}

pub(crate) struct CrossbeamConsumeWorkerChannels<Tx> {
pub receiver: crossbeam_channel::Receiver<ConsumeWork<Tx>>,
pub sender: crossbeam_channel::Sender<FinishedConsumeWork<Tx>>,
}

impl<Tx: TransactionWithMeta> ConsumeWorkerChannels for CrossbeamConsumeWorkerChannels<Tx> {
type Tx = Tx;

fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError> {
self.receiver.try_recv().map_err(|err| match err {
crossbeam_channel::TryRecvError::Empty => TryRecvError::Empty,
crossbeam_channel::TryRecvError::Disconnected => TryRecvError::Disconnected,
})
}

fn try_send(
&mut self,
finished_work: FinishedConsumeWork<Self::Tx>,
) -> Result<(), TrySendError> {
self.sender
.try_send(finished_work)
.map_err(|err| match err {
crossbeam_channel::TrySendError::Full(_) => TrySendError::Full,
crossbeam_channel::TrySendError::Disconnected(_) => TrySendError::Disconnected,
})
}
}

#[cfg(test)]
mod tests {
use {
Expand All @@ -794,7 +839,7 @@ mod tests {
scheduler_messages::{MaxAge, TransactionBatchId},
tests::{create_slow_genesis_config, sanitize_transactions},
},
crossbeam_channel::unbounded,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_clock::{Slot, MAX_PROCESSING_AGE},
solana_genesis_config::GenesisConfig,
solana_keypair::Keypair,
Expand Down Expand Up @@ -848,7 +893,7 @@ mod tests {
relax_intrabatch_account_locks: bool,
) -> (
TestFrame,
ConsumeWorker<RuntimeTransaction<SanitizedTransaction>>,
ConsumeWorker<CrossbeamConsumeWorkerChannels<RuntimeTransaction<SanitizedTransaction>>>,
) {
let GenesisConfigInfo {
genesis_config,
Expand Down Expand Up @@ -881,12 +926,15 @@ mod tests {

let (consume_sender, consume_receiver) = unbounded();
let (consumed_sender, consumed_receiver) = unbounded();
let channels = CrossbeamConsumeWorkerChannels {
receiver: consume_receiver,
sender: consumed_sender,
};
let worker = ConsumeWorker::new(
0,
Arc::new(AtomicBool::new(false)),
consume_receiver,
channels,
consumer,
consumed_sender,
shared_working_bank.clone(),
);

Expand Down
Loading