Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let pubkey = solana_pubkey::new_rand();
let keypair2 = Keypair::new();
Expand All @@ -1137,18 +1137,19 @@ mod tests {
system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
];

let summary = recorder.record_transactions(bank.slot(), txs.clone());
let summary = recorder.record_transactions(bank.bank_id(), txs.clone());
assert!(summary.result.is_ok());
assert_eq!(
record_receiver.try_recv().unwrap().transaction_batches,
vec![txs.clone()]
);
assert!(record_receiver.try_recv().is_err());

// Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
// Once bank is set to a new bank (setting bank id + 1 in record_transactions),
// record_transactions should throw MaxHeightReached
let next_slot = bank.slot() + 1;
let RecordTransactionsSummary { result, .. } = recorder.record_transactions(next_slot, txs);
let next_bank_id = bank.bank_id() + 1;
let RecordTransactionsSummary { result, .. } =
recorder.record_transactions(next_bank_id, txs);
assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
// Should receive nothing from PohRecorder b/c record failed
assert!(record_receiver.try_recv().is_err());
Expand Down
8 changes: 4 additions & 4 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ mod tests {
} = &mut test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
shared_working_bank.store(bank.clone());
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let pubkey1 = Pubkey::new_unique();

Expand Down Expand Up @@ -981,7 +981,7 @@ mod tests {
} = &mut test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
shared_working_bank.store(bank.clone());
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let pubkey1 = Pubkey::new_unique();
let pubkey2 = Pubkey::new_unique();
Expand Down Expand Up @@ -1041,7 +1041,7 @@ mod tests {
} = &mut test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
shared_working_bank.store(bank.clone());
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let pubkey1 = Pubkey::new_unique();
let pubkey2 = Pubkey::new_unique();
Expand Down Expand Up @@ -1115,7 +1115,7 @@ mod tests {
} = &mut test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
shared_working_bank.store(bank.clone());
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());
assert!(bank.slot() > 0);
assert!(bank.epoch() > 0);

Expand Down
20 changes: 10 additions & 10 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl Consumer {

let (record_transactions_summary, record_us) = measure_us!(self
.transaction_recorder
.record_transactions(bank.slot(), processed_transactions));
.record_transactions(bank.bank_id(), processed_transactions));
execute_and_commit_timings.record_us = record_us;

let RecordTransactionsSummary {
Expand Down Expand Up @@ -578,7 +578,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Expand Down Expand Up @@ -691,7 +691,7 @@ mod tests {
record_receiver.shutdown();

let record = record_receiver.drain().next().unwrap();
assert_eq!(record.slot, bank.slot());
assert_eq!(record.bank_id, bank.bank_id());
assert_eq!(record.transaction_batches.len(), 1);
let transaction_batch = record.transaction_batches[0].clone();
assert_eq!(transaction_batch.len(), 1);
Expand Down Expand Up @@ -777,7 +777,7 @@ mod tests {
let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);

record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

while bank.tick_height() != bank.max_tick_height() - 1 {
bank.register_default_tick_for_test();
Expand Down Expand Up @@ -840,7 +840,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Expand Down Expand Up @@ -901,7 +901,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Expand Down Expand Up @@ -1070,7 +1070,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Expand Down Expand Up @@ -1397,7 +1397,7 @@ mod tests {
let blockstore = Arc::new(blockstore);
let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let shreds = entries_to_test_shreds(
&entries,
Expand Down Expand Up @@ -1521,7 +1521,7 @@ mod tests {

let (record_sender, mut record_receiver) = record_channels(false);
let recorder = TransactionRecorder::new(record_sender);
record_receiver.restart(bank.slot());
record_receiver.restart(bank.bank_id());

let shreds = entries_to_test_shreds(
&entries,
Expand Down
2 changes: 1 addition & 1 deletion poh/benches/transaction_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn bench_record_transactions(c: &mut Criterion) {

let start = Instant::now();
for txs in tx_batches {
let summary = transaction_recorder.record_transactions(bank.slot(), txs);
let summary = transaction_recorder.record_transactions(bank.bank_id(), txs);
assert!(summary.result.is_ok());
}
let elapsed = start.elapsed();
Expand Down
19 changes: 11 additions & 8 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
arc_swap::ArcSwapOption,
crossbeam_channel::{unbounded, Receiver, SendError, Sender, TrySendError},
log::*,
solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_clock::{BankId, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_entry::{
entry::Entry,
poh::{Poh, PohEntry},
Expand Down Expand Up @@ -76,19 +76,19 @@ pub struct RecordSummary {
pub struct Record {
pub mixins: Vec<Hash>,
pub transaction_batches: Vec<Vec<VersionedTransaction>>,
pub slot: Slot,
pub bank_id: BankId,
}

impl Record {
pub fn new(
mixins: Vec<Hash>,
transaction_batches: Vec<Vec<VersionedTransaction>>,
slot: Slot,
bank_id: BankId,
) -> Self {
Self {
mixins,
transaction_batches,
slot,
bank_id,
}
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ impl PohRecorder {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn record(
&mut self,
bank_slot: Slot,
bank_id: BankId,
mixins: Vec<Hash>,
transaction_batches: Vec<Vec<VersionedTransaction>>,
) -> Result<RecordSummary> {
Expand All @@ -321,8 +321,11 @@ impl PohRecorder {
"No transactions provided"
);

let ((), report_metrics_us) = measure_us!(self.metrics.report(bank_slot));
self.metrics.report_metrics_us += report_metrics_us;
if let Some(working_bank) = self.working_bank.as_ref() {
let ((), report_metrics_us) =
measure_us!(self.metrics.report(working_bank.bank.slot()));
self.metrics.report_metrics_us += report_metrics_us;
}

loop {
let (flush_cache_res, flush_cache_us) = measure_us!(self.flush_cache(false));
Expand All @@ -334,7 +337,7 @@ impl PohRecorder {
.working_bank
.as_mut()
.ok_or(PohRecorderError::MaxHeightReached)?;
if bank_slot != working_bank.bank.slot() {
if bank_id != working_bank.bank.bank_id() {
return Err(PohRecorderError::MaxHeightReached);
}

Expand Down
8 changes: 4 additions & 4 deletions poh/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl PohService {
let record = record_receiver.recv_timeout(timeout);
if let Ok(record) = record {
match poh_recorder.write().unwrap().record(
record.slot,
record.bank_id,
record.mixins,
record.transaction_batches,
) {
Expand Down Expand Up @@ -400,7 +400,7 @@ impl PohService {
let mut record_time = Measure::start("record");
loop {
match poh_recorder_l.record(
record.slot,
record.bank_id,
record.mixins,
std::mem::take(&mut record.transaction_batches),
) {
Expand Down Expand Up @@ -594,13 +594,13 @@ impl PohService {
recorder.reset(reset_bank, next_leader_slot);
}
PohServiceMessage::SetBank { bank } => {
let slot = bank.slot();
let bank_id = bank.bank_id();
let bank_max_tick_height = bank.max_tick_height();
recorder.set_bank(bank);
let should_restart =
recorder.tick_height() < bank_max_tick_height.saturating_sub(1);
if should_restart {
record_receiver.restart(slot);
record_receiver.restart(bank_id);
}
}
}
Expand Down
Loading
Loading