-
Notifications
You must be signed in to change notification settings - Fork 756
ConsumeWorker: add abstraction on channels #8483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ use { | |
scheduler_messages::{ConsumeWork, FinishedConsumeWork}, | ||
}, | ||
crate::banking_stage::consumer::RetryableIndex, | ||
crossbeam_channel::{Receiver, SendError, Sender, TryRecvError}, | ||
solana_poh::poh_recorder::SharedWorkingBank, | ||
solana_runtime::bank::Bank, | ||
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, | ||
|
@@ -22,11 +21,11 @@ use { | |
}; | ||
|
||
#[derive(Debug, Error)] | ||
pub enum ConsumeWorkerError<Tx> { | ||
#[error("Failed to receive work from scheduler: {0}")] | ||
Recv(#[from] TryRecvError), | ||
#[error("Failed to send finalized consume work to scheduler: {0}")] | ||
Send(#[from] SendError<FinishedConsumeWork<Tx>>), | ||
pub enum ConsumeWorkerError { | ||
#[error("Failed to receive work from scheduler")] | ||
Recv(TryRecvError), | ||
#[error("Failed to send finalized consume work to scheduler")] | ||
Send(TrySendError), | ||
} | ||
|
||
enum ProcessingStatus<Tx> { | ||
|
@@ -35,30 +34,27 @@ enum ProcessingStatus<Tx> { | |
CouldNotProcess(ConsumeWork<Tx>), | ||
} | ||
|
||
pub(crate) struct ConsumeWorker<Tx> { | ||
pub(crate) struct ConsumeWorker<Ch> { | ||
exit: Arc<AtomicBool>, | ||
consume_receiver: Receiver<ConsumeWork<Tx>>, | ||
channels: Ch, | ||
consumer: Consumer, | ||
consumed_sender: Sender<FinishedConsumeWork<Tx>>, | ||
|
||
shared_working_bank: SharedWorkingBank, | ||
metrics: Arc<ConsumeWorkerMetrics>, | ||
} | ||
|
||
impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | ||
impl<Channels: ConsumeWorkerChannels> ConsumeWorker<Channels> { | ||
pub fn new( | ||
id: u32, | ||
exit: Arc<AtomicBool>, | ||
consume_receiver: Receiver<ConsumeWork<Tx>>, | ||
channels: Channels, | ||
consumer: Consumer, | ||
consumed_sender: Sender<FinishedConsumeWork<Tx>>, | ||
shared_working_bank: SharedWorkingBank, | ||
) -> Self { | ||
Self { | ||
exit, | ||
consume_receiver, | ||
channels, | ||
consumer, | ||
consumed_sender, | ||
shared_working_bank, | ||
metrics: Arc::new(ConsumeWorkerMetrics::new(id)), | ||
} | ||
|
@@ -68,15 +64,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
self.metrics.clone() | ||
} | ||
|
||
pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> { | ||
pub fn run(mut self) -> Result<(), ConsumeWorkerError> { | ||
const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250); | ||
|
||
let mut did_work = false; | ||
let mut last_empty_time = Instant::now(); | ||
let mut sleep_duration = STARTING_SLEEP_DURATION; | ||
|
||
while !self.exit.load(Ordering::Relaxed) { | ||
match self.consume_receiver.try_recv() { | ||
match self.channels.try_recv() { | ||
Ok(work) => { | ||
did_work = true; | ||
match self.consume(work)? { | ||
|
@@ -106,9 +102,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
} | ||
|
||
fn consume( | ||
&self, | ||
work: ConsumeWork<Tx>, | ||
) -> Result<ProcessingStatus<Tx>, ConsumeWorkerError<Tx>> { | ||
&mut self, | ||
work: ConsumeWork<Channels::Tx>, | ||
) -> Result<ProcessingStatus<Channels::Tx>, ConsumeWorkerError> { | ||
let Some(bank) = self.active_working_bank_with_timeout() else { | ||
return Ok(ProcessingStatus::CouldNotProcess(work)); | ||
}; | ||
|
@@ -128,10 +124,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
|
||
/// Consume a single batch. | ||
fn consume_with_bank( | ||
&self, | ||
&mut self, | ||
bank: &Arc<Bank>, | ||
work: ConsumeWork<Tx>, | ||
) -> Result<(), ConsumeWorkerError<Tx>> { | ||
work: ConsumeWork<Channels::Tx>, | ||
) -> Result<(), ConsumeWorkerError> { | ||
let output = self.consumer.process_and_record_aged_transactions( | ||
bank, | ||
&work.transactions, | ||
|
@@ -141,12 +137,15 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
self.metrics.update_for_consume(&output); | ||
self.metrics.has_data.store(true, Ordering::Relaxed); | ||
|
||
self.consumed_sender.send(FinishedConsumeWork { | ||
work, | ||
retryable_indexes: output | ||
.execute_and_commit_transactions_output | ||
.retryable_transaction_indexes, | ||
})?; | ||
self.channels | ||
.try_send(FinishedConsumeWork { | ||
work, | ||
retryable_indexes: output | ||
.execute_and_commit_transactions_output | ||
.retryable_transaction_indexes, | ||
}) | ||
.map_err(ConsumeWorkerError::Send)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -189,8 +188,9 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
} | ||
|
||
/// Retry current batch and all outstanding batches. | ||
fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> { | ||
for work in try_drain_iter(work, &self.consume_receiver) { | ||
fn retry_drain(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> { | ||
self.retry(work)?; | ||
while let Ok(work) = self.channels.try_recv() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. slight change of behavior - now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. def slight change but just need to change the loop a bit because of the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually now that i'm looking at this it's probably better if we retry the work and THEN check exit...otherwise we just drop that work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pushed a change - we'll now process the message before checking exit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, since it's exiting, it doesn't really matter if it drops a message, better it exits quicker by not to processing messages. wdyt? |
||
if self.exit.load(Ordering::Relaxed) { | ||
return Ok(()); | ||
} | ||
|
@@ -200,7 +200,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
} | ||
|
||
/// Send transactions back to scheduler as retryable. | ||
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> { | ||
fn retry(&mut self, work: ConsumeWork<Channels::Tx>) -> Result<(), ConsumeWorkerError> { | ||
let retryable_indexes: Vec<_> = (0..work.transactions.len()) | ||
.map(|index| RetryableIndex { | ||
index, | ||
|
@@ -217,20 +217,16 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { | |
.retryable_expired_bank_count | ||
.fetch_add(num_retryable, Ordering::Relaxed); | ||
self.metrics.has_data.store(true, Ordering::Relaxed); | ||
self.consumed_sender.send(FinishedConsumeWork { | ||
work, | ||
retryable_indexes, | ||
})?; | ||
self.channels | ||
.try_send(FinishedConsumeWork { | ||
work, | ||
retryable_indexes, | ||
}) | ||
.map_err(ConsumeWorkerError::Send)?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Helper function to create an non-blocking iterator over work in the receiver, | ||
/// starting with the given work item. | ||
fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T> + '_ { | ||
std::iter::once(work).chain(receiver.try_iter()) | ||
} | ||
|
||
fn backoff(idle_duration: Duration, sleep_duration: &mut Duration) { | ||
const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1); | ||
const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1); | ||
|
@@ -784,6 +780,55 @@ impl ConsumeWorkerTransactionErrorMetrics { | |
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum TryRecvError { | ||
Empty, | ||
Disconnected, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum TrySendError { | ||
Full, | ||
Disconnected, | ||
} | ||
|
||
pub(crate) trait ConsumeWorkerChannels { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: wdyt to move the trait and impl to own modules? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was a bit on the fence about it, since it's really only relevant to the consume workers |
||
type Tx: TransactionWithMeta; | ||
fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError>; | ||
fn try_send( | ||
&mut self, | ||
tao-stones marked this conversation as resolved.
Show resolved
Hide resolved
|
||
finished_work: FinishedConsumeWork<Self::Tx>, | ||
) -> Result<(), TrySendError>; | ||
} | ||
|
||
pub(crate) struct CrossbeamConsumeWorkerChannels<Tx> { | ||
pub receiver: crossbeam_channel::Receiver<ConsumeWork<Tx>>, | ||
pub sender: crossbeam_channel::Sender<FinishedConsumeWork<Tx>>, | ||
} | ||
|
||
impl<Tx: TransactionWithMeta> ConsumeWorkerChannels for CrossbeamConsumeWorkerChannels<Tx> { | ||
type Tx = Tx; | ||
|
||
fn try_recv(&mut self) -> Result<ConsumeWork<Self::Tx>, TryRecvError> { | ||
self.receiver.try_recv().map_err(|err| match err { | ||
crossbeam_channel::TryRecvError::Empty => TryRecvError::Empty, | ||
crossbeam_channel::TryRecvError::Disconnected => TryRecvError::Disconnected, | ||
}) | ||
} | ||
|
||
fn try_send( | ||
&mut self, | ||
finished_work: FinishedConsumeWork<Self::Tx>, | ||
) -> Result<(), TrySendError> { | ||
self.sender | ||
.try_send(finished_work) | ||
.map_err(|err| match err { | ||
crossbeam_channel::TrySendError::Full(_) => TrySendError::Full, | ||
crossbeam_channel::TrySendError::Disconnected(_) => TrySendError::Disconnected, | ||
}) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use { | ||
|
@@ -794,7 +839,7 @@ mod tests { | |
scheduler_messages::{MaxAge, TransactionBatchId}, | ||
tests::{create_slow_genesis_config, sanitize_transactions}, | ||
}, | ||
crossbeam_channel::unbounded, | ||
crossbeam_channel::{unbounded, Receiver, Sender}, | ||
solana_clock::{Slot, MAX_PROCESSING_AGE}, | ||
solana_genesis_config::GenesisConfig, | ||
solana_keypair::Keypair, | ||
|
@@ -848,7 +893,7 @@ mod tests { | |
relax_intrabatch_account_locks: bool, | ||
) -> ( | ||
TestFrame, | ||
ConsumeWorker<RuntimeTransaction<SanitizedTransaction>>, | ||
ConsumeWorker<CrossbeamConsumeWorkerChannels<RuntimeTransaction<SanitizedTransaction>>>, | ||
) { | ||
let GenesisConfigInfo { | ||
genesis_config, | ||
|
@@ -881,12 +926,15 @@ mod tests { | |
|
||
let (consume_sender, consume_receiver) = unbounded(); | ||
let (consumed_sender, consumed_receiver) = unbounded(); | ||
let channels = CrossbeamConsumeWorkerChannels { | ||
receiver: consume_receiver, | ||
sender: consumed_sender, | ||
}; | ||
let worker = ConsumeWorker::new( | ||
0, | ||
Arc::new(AtomicBool::new(false)), | ||
consume_receiver, | ||
channels, | ||
consumer, | ||
consumed_sender, | ||
shared_working_bank.clone(), | ||
); | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.