Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
},
crate::{
banking_stage::{
consume_worker::ConsumeWorker,
consume_worker::{ConsumeWorker, CrossbeamConsumeWorkerChannels},
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::{
Expand Down Expand Up @@ -485,14 +485,16 @@ impl BankingStage {
let consume_worker = ConsumeWorker::new(
id,
exit.clone(),
work_receiver,
CrossbeamConsumeWorkerChannels {
receiver: work_receiver,
sender: finished_work_sender.clone(),
},
Consumer::new(
context.committer.clone(),
context.transaction_recorder.clone(),
QosService::new(id),
context.log_messages_bytes_limit,
),
finished_work_sender.clone(),
context.poh_recorder.read().unwrap().shared_working_bank(),
);

Expand Down
138 changes: 93 additions & 45 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
},
crate::banking_stage::consumer::RetryableIndex,
crossbeam_channel::{Receiver, SendError, Sender, TryRecvError},
solana_poh::poh_recorder::SharedWorkingBank,
solana_runtime::bank::Bank,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
Expand All @@ -22,11 +21,11 @@ use {
};

#[derive(Debug, Error)]
pub enum ConsumeWorkerError<Tx> {
#[error("Failed to receive work from scheduler: {0}")]
Recv(#[from] TryRecvError),
#[error("Failed to send finalized consume work to scheduler: {0}")]
Send(#[from] SendError<FinishedConsumeWork<Tx>>),
pub enum ConsumeWorkerError {
#[error("Failed to receive work from scheduler")]
Recv(TryRecvError),
#[error("Failed to send finalized consume work to scheduler")]
Send(TrySendError),
}

enum ProcessingStatus<Tx> {
Expand All @@ -35,30 +34,27 @@ enum ProcessingStatus<Tx> {
CouldNotProcess(ConsumeWork<Tx>),
}

pub(crate) struct ConsumeWorker<Tx> {
pub(crate) struct ConsumeWorker<Ch> {
exit: Arc<AtomicBool>,
consume_receiver: Receiver<ConsumeWork<Tx>>,
channels: Ch,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,

shared_working_bank: SharedWorkingBank,
metrics: Arc<ConsumeWorkerMetrics>,
}

impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
impl<Channels: ConsumeWorkerChannels> ConsumeWorker<Channels> {
pub fn new(
id: u32,
exit: Arc<AtomicBool>,
consume_receiver: Receiver<ConsumeWork<Tx>>,
channels: Channels,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
shared_working_bank: SharedWorkingBank,
) -> Self {
Self {
exit,
consume_receiver,
channels,
consumer,
consumed_sender,
shared_working_bank,
metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
}
Expand All @@ -68,15 +64,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
pub fn run(mut self) -> Result<(), ConsumeWorkerError> {
const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250);

let mut did_work = false;
let mut last_empty_time = Instant::now();
let mut sleep_duration = STARTING_SLEEP_DURATION;

while !self.exit.load(Ordering::Relaxed) {
match self.consume_receiver.try_recv() {
match self.channels.try_recv() {
Ok(work) => {
did_work = true;
match self.consume(work)? {
Expand Down Expand Up @@ -106,9 +102,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

fn consume(
&self,
work: ConsumeWork<Tx>,
) -> Result<ProcessingStatus<Tx>, ConsumeWorkerError<Tx>> {
&mut self,
work: ConsumeWork<Channels::Tx>,
) -> Result<ProcessingStatus<Channels::Tx>, ConsumeWorkerError> {
let Some(bank) = self.active_working_bank_with_timeout() else {
return Ok(ProcessingStatus::CouldNotProcess(work));
};
Expand All @@ -128,10 +124,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {

/// Consume a single batch.
fn consume_with_bank(
&self,
&mut self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
) -> Result<(), ConsumeWorkerError<Tx>> {
work: ConsumeWork<Channels::Tx>,
) -> Result<(), ConsumeWorkerError> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
Expand All @@ -141,12 +137,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.update_for_consume(&output);
self.metrics.has_data.store(true, Ordering::Relaxed);

self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes: output
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})?;
self.channels
.try_send(FinishedConsumeWork {
work,
retryable_indexes: output
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})
.map_err(ConsumeWorkerError::Send)?;

Ok(())
}

Expand Down Expand Up @@ -189,8 +188,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

/// Retry current batch and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
for work in try_drain_iter(work, &self.consume_receiver) {
fn retry_drain(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> {
self.retry(work)?;
while let Ok(work) = self.channels.try_recv() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight change of behavior - now work is retried without checking exit, while before exit is always checked before retry this work or anything in recv queue. Don't seem to be an issue, just want to point out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def slight change but just need to change the loop a bit because of the new mut

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually now that i'm looking at this it's probably better if we retry the work and THEN check exit...otherwise we just drop that work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed a change - we'll now process the message before checking exit.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, since it's exiting, it doesn't really matter if it drops a message, better it exits quicker by not to processing messages. wdyt?

if self.exit.load(Ordering::Relaxed) {
return Ok(());
}
Expand All @@ -200,7 +200,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
fn retry(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> {
let retryable_indexes: Vec<_> = (0..work.transactions.len())
.map(|index| RetryableIndex {
index,
Expand All @@ -217,20 +217,16 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.retryable_expired_bank_count
.fetch_add(num_retryable, Ordering::Relaxed);
self.metrics.has_data.store(true, Ordering::Relaxed);
self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes,
})?;
self.channels
.try_send(FinishedConsumeWork {
work,
retryable_indexes,
})
.map_err(ConsumeWorkerError::Send)?;
Ok(())
}
}

/// Helper function to create an non-blocking iterator over work in the receiver,
/// starting with the given work item.
fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T> + '_ {
std::iter::once(work).chain(receiver.try_iter())
}

fn backoff(idle_duration: Duration, sleep_duration: &mut Duration) {
const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1);
const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1);
Expand Down Expand Up @@ -784,6 +780,55 @@ impl ConsumeWorkerTransactionErrorMetrics {
}
}

#[derive(Debug)]
pub enum TryRecvError {
Empty,
Disconnected,
}

#[derive(Debug)]
pub enum TrySendError {
Full,
Disconnected,
}

pub(crate) trait ConsumeWorkerChannels {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wdyt to move the trait and impl to own modules?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was a bit on the fence about it, since it's really only relevant to the consume workers

type Tx: TransactionWithMeta;
fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError>;
fn try_send(
&mut self,
finished_work: FinishedConsumeWork<Self::Tx>,
) -> Result<(), TrySendError>;
}

pub(crate) struct CrossbeamConsumeWorkerChannels<Tx> {
pub receiver: crossbeam_channel::Receiver<ConsumeWork<Tx>>,
pub sender: crossbeam_channel::Sender<FinishedConsumeWork<Tx>>,
}

impl<Tx: TransactionWithMeta> ConsumeWorkerChannels for CrossbeamConsumeWorkerChannels<Tx> {
type Tx = Tx;

fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError> {
self.receiver.try_recv().map_err(|err| match err {
crossbeam_channel::TryRecvError::Empty => TryRecvError::Empty,
crossbeam_channel::TryRecvError::Disconnected => TryRecvError::Disconnected,
})
}

fn try_send(
&mut self,
finished_work: FinishedConsumeWork<Self::Tx>,
) -> Result<(), TrySendError> {
self.sender
.try_send(finished_work)
.map_err(|err| match err {
crossbeam_channel::TrySendError::Full(_) => TrySendError::Full,
crossbeam_channel::TrySendError::Disconnected(_) => TrySendError::Disconnected,
})
}
}

#[cfg(test)]
mod tests {
use {
Expand All @@ -794,7 +839,7 @@ mod tests {
scheduler_messages::{MaxAge, TransactionBatchId},
tests::{create_slow_genesis_config, sanitize_transactions},
},
crossbeam_channel::unbounded,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_clock::{Slot, MAX_PROCESSING_AGE},
solana_genesis_config::GenesisConfig,
solana_keypair::Keypair,
Expand Down Expand Up @@ -848,7 +893,7 @@ mod tests {
relax_intrabatch_account_locks: bool,
) -> (
TestFrame,
ConsumeWorker<RuntimeTransaction<SanitizedTransaction>>,
ConsumeWorker<CrossbeamConsumeWorkerChannels<RuntimeTransaction<SanitizedTransaction>>>,
) {
let GenesisConfigInfo {
genesis_config,
Expand Down Expand Up @@ -881,12 +926,15 @@ mod tests {

let (consume_sender, consume_receiver) = unbounded();
let (consumed_sender, consumed_receiver) = unbounded();
let channels = CrossbeamConsumeWorkerChannels {
receiver: consume_receiver,
sender: consumed_sender,
};
let worker = ConsumeWorker::new(
0,
Arc::new(AtomicBool::new(false)),
consume_receiver,
channels,
consumer,
consumed_sender,
shared_working_bank.clone(),
);

Expand Down
Loading