diff --git a/code/Cargo.toml b/code/Cargo.toml index 0220989ba..a33dde512 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -64,6 +64,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } [workspace.lints.clippy] disallowed_types = "deny" +doc_overindented_list_items = "allow" [workspace.dependencies] malachitebft-engine = { version = "0.0.1", package = "informalsystems-malachitebft-engine", path = "crates/engine" } diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 9db83dc99..288ab8127 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -4,7 +4,7 @@ use malachitebft_core_types::*; use crate::input::RequestId; use crate::types::SignedConsensusMsg; -use crate::{ConsensusMsg, VoteExtensionError}; +use crate::{ConsensusMsg, VoteExtensionError, WalEntry}; /// Provides a way to construct the appropriate [`Resume`] value to /// resume execution after handling an [`Effect`]. @@ -182,15 +182,10 @@ where resume::Continue, ), - /// Append a consensus message to the Write-Ahead Log for crash recovery + /// Append an entry to the Write-Ahead Log for crash recovery /// /// Resume with: [`resume::Continue`]` - WalAppendMessage(SignedConsensusMsg, resume::Continue), - - /// Append a timeout to the Write-Ahead Log for crash recovery - /// - /// Resume with: [`resume::Continue`]` - WalAppendTimeout(Timeout, resume::Continue), + WalAppend(WalEntry, resume::Continue), /// Allows the application to extend the pre-commit vote with arbitrary data. /// diff --git a/code/crates/core-consensus/src/full_proposal.rs b/code/crates/core-consensus/src/full_proposal.rs index acefa3921..69a51f33b 100644 --- a/code/crates/core-consensus/src/full_proposal.rs +++ b/code/crates/core-consensus/src/full_proposal.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use derive_where::derive_where; -use malachitebft_core_types::{Context, Proposal, Round, SignedProposal, Validity, Value}; +use malachitebft_core_types::{Context, Proposal, Round, SignedProposal, Validity, Value, ValueId}; use crate::ProposedValue; @@ -54,6 +54,15 @@ impl Entry { fn full(value: Ctx::Value, validity: Validity, proposal: SignedProposal) -> Self { Entry::Full(FullProposal::new(value, validity, proposal)) } + + fn id(&self) -> Option> { + match self { + Entry::Full(p) => Some(p.builder_value.id()), + Entry::ProposalOnly(p) => Some(p.value().id()), + Entry::ValueOnly(v, _) => Some(v.id()), + Entry::Empty => None, + } + } } #[allow(clippy::derivable_impls)] @@ -113,7 +122,7 @@ impl FullProposalKeeper { Self::default() } - pub fn full_proposals_for_value( + pub fn proposals_for_value( &self, proposed_value: &ProposedValue, ) -> Vec> { @@ -157,7 +166,28 @@ impl FullProposalKeeper { None } - #[allow(clippy::type_complexity)] + pub fn full_proposal_at_round_and_proposer( + &self, + height: &Ctx::Height, + round: Round, + proposer: &Ctx::Address, + ) -> Option<&FullProposal> { + let entries = self + .keeper + .get(&(*height, round)) + .filter(|entries| !entries.is_empty())?; + + for entry in entries { + if let Entry::Full(p) = entry { + if p.proposal.validator_address() == proposer { + return Some(p); + } + } + } + + None + } + pub fn get_value<'a>( &self, height: &Ctx::Height, @@ -268,6 +298,15 @@ impl FullProposalKeeper { self.store_value_at_pol_round(new_value); } + pub fn value_exists(&self, value: &ProposedValue) -> bool { + match self.keeper.get(&(value.height, value.round)) { + None => false, + Some(entries) => entries + .iter() + .any(|entry| entry.id() == Some(value.value.id())), + } + } + fn store_value_at_value_round(&mut self, new_value: &ProposedValue) { let key = (new_value.height, new_value.round); let entries = self.keeper.get_mut(&key); diff --git a/code/crates/core-consensus/src/handle/driver.rs b/code/crates/core-consensus/src/handle/driver.rs index 40e5c69c8..61bb10602 100644 --- a/code/crates/core-consensus/src/handle/driver.rs +++ b/code/crates/core-consensus/src/handle/driver.rs @@ -8,8 +8,11 @@ use crate::handle::vote::on_vote; use crate::prelude::*; use crate::types::SignedConsensusMsg; use crate::util::pretty::PrettyVal; +use crate::LocallyProposedValue; use crate::VoteSyncMode; +use super::propose::on_propose; + #[async_recursion] pub async fn apply_driver_input( co: &Co, @@ -322,12 +325,26 @@ where } DriverOutput::GetValue(height, round, timeout) => { - info!(%height, %round, "Requesting value"); + if let Some(full_proposal) = + state.full_proposal_at_round_and_proposer(&height, round, state.address()) + { + info!(%height, %round, "Using already existing value"); + + let local_value = LocallyProposedValue { + height: full_proposal.proposal.height(), + round: full_proposal.proposal.round(), + value: full_proposal.builder_value.clone(), + }; - perform!( - co, - Effect::GetValue(height, round, timeout, Default::default()) - ); + on_propose(co, state, metrics, local_value).await?; + } else { + info!(%height, %round, "Requesting value from application"); + + perform!( + co, + Effect::GetValue(height, round, timeout, Default::default()) + ); + } Ok(()) } diff --git a/code/crates/core-consensus/src/handle/proposal.rs b/code/crates/core-consensus/src/handle/proposal.rs index c42e2f356..33908d0c2 100644 --- a/code/crates/core-consensus/src/handle/proposal.rs +++ b/code/crates/core-consensus/src/handle/proposal.rs @@ -3,10 +3,30 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, ProposedValue, SignedConsensusMsg, WalEntry}; use crate::util::pretty::PrettyProposal; -use crate::{ProposedValue, SignedConsensusMsg}; +/// Handles an incoming consensus proposal message. +/// +/// This handler processes proposals that can arrive from three sources: +/// 1. Network messages from other nodes +/// 2. Local proposals when this node is the proposer +/// 3. WAL replay during node restart +/// +/// When acting as proposer (2), consensus core interacts with the application to get a proposed value for the current height and round. +/// In this case the proposal message is sent out to the network but also back to the consensus core. +/// +/// # Arguments +/// * `co` - The context object containing configuration and external dependencies +/// * `state` - The current consensus state +/// * `metrics` - Metrics collection for monitoring +/// * `signed_proposal` - The signed proposal message to process +/// +/// # Flow +/// 1. Validates proposal height and signature +/// 2. Queues messages if not ready to process (wrong height/round) +/// 3. Stores valid proposals and updates WAL if needed +/// 4. Processes the proposal through the driver if a full proposal is available pub async fn on_proposal( co: &Co, state: &mut State, @@ -38,9 +58,10 @@ where } info!( - height = %consensus_height, - %proposal_height, - address = %proposer_address, + consensus.height = %consensus_height, + proposal.height = %proposal_height, + proposal.round = %proposal_round, + proposer = %proposer_address, message = %PrettyProposal::(&signed_proposal.message), "Received proposal" ); @@ -50,14 +71,14 @@ where // Drop all others. if state.driver.round() == Round::Nil { debug!("Received proposal at round -1, queuing for later"); - state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal)); + state.buffer_input(proposal_height, Input::Proposal(signed_proposal)); return Ok(()); } if proposal_height > consensus_height { - debug!("Received proposal for higher height, queuing for later"); - state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal)); + debug!("Received proposal for higher height {proposal_height}, queuing for later",); + state.buffer_input(proposal_height, Input::Proposal(signed_proposal)); return Ok(()); } @@ -72,8 +93,8 @@ where if state.params.value_payload.include_proposal() { perform!( co, - Effect::WalAppendMessage( - SignedConsensusMsg::Proposal(signed_proposal.clone()), + Effect::WalAppend( + WalEntry::ConsensusMsg(SignedConsensusMsg::Proposal(signed_proposal.clone())), Default::default() ) ); diff --git a/code/crates/core-consensus/src/handle/propose.rs b/code/crates/core-consensus/src/handle/propose.rs index 4e3c6c845..da364a628 100644 --- a/code/crates/core-consensus/src/handle/propose.rs +++ b/code/crates/core-consensus/src/handle/propose.rs @@ -1,52 +1,85 @@ use crate::prelude::*; use crate::handle::driver::apply_driver_input; -use crate::types::{LocallyProposedValue, ProposedValue}; +use crate::types::{LocallyProposedValue, ProposedValue, WalEntry}; +/// Handles a locally proposed value. +/// Called when the application has built a value to propose. +/// +/// This function processes a value proposed by the local node: +/// - Validates that the height and round match the current state +/// - Creates a ProposedValue with the local node as proposer +/// - Appends the value to the WAL if it hasn't been seen before +/// - Stores the value in the state +/// - Applies the proposal to the driver +/// +/// # Arguments +/// * `co` - Coordination object for handling effects +/// * `state` - Current consensus state +/// * `metrics` - Metrics collection object +/// * `local_value` - The value being proposed locally +/// +/// # Returns +/// `Result<(), Error>` - Ok if the proposal was processed successfully pub async fn on_propose( co: &Co, state: &mut State, metrics: &Metrics, - value: LocallyProposedValue, + local_value: LocallyProposedValue, ) -> Result<(), Error> where Ctx: Context, { - let LocallyProposedValue { - height, - round, - value, - } = value; - - if state.driver.height() != height { + if state.driver.height() != local_value.height { warn!( - "Ignoring proposal for height {height}, current height: {}", + "Ignoring value for height {}, current height: {}", + local_value.height, state.driver.height() ); return Ok(()); } - if state.driver.round() != round { + if state.driver.round() != local_value.round { warn!( - "Ignoring propose value for round {round}, current round: {}", + "Ignoring value for round {}, current round: {}", + local_value.round, state.driver.round() ); return Ok(()); } - #[cfg(feature = "metrics")] - metrics.consensus_start(); - - state.store_value(&ProposedValue { - height, - round, + let proposed_value = ProposedValue { + height: local_value.height, + round: local_value.round, valid_round: Round::Nil, proposer: state.address().clone(), - value: value.clone(), + value: local_value.value.clone(), validity: Validity::Valid, - }); + }; + + #[cfg(feature = "metrics")] + metrics.consensus_start(); + + // If this is the first time we see this value in the current round, append it to the WAL + if !state.value_exists(&proposed_value) { + perform!( + co, + Effect::WalAppend( + WalEntry::ProposedValue(proposed_value.clone()), + Default::default() + ) + ); + } + + state.store_value(&proposed_value); - apply_driver_input(co, state, metrics, DriverInput::ProposeValue(round, value)).await + apply_driver_input( + co, + state, + metrics, + DriverInput::ProposeValue(local_value.round, local_value.value), + ) + .await } diff --git a/code/crates/core-consensus/src/handle/proposed_value.rs b/code/crates/core-consensus/src/handle/proposed_value.rs index 9c4d4b282..9004640fe 100644 --- a/code/crates/core-consensus/src/handle/proposed_value.rs +++ b/code/crates/core-consensus/src/handle/proposed_value.rs @@ -1,11 +1,35 @@ use crate::prelude::*; use crate::handle::driver::apply_driver_input; -use crate::types::ProposedValue; +use crate::types::{ProposedValue, WalEntry}; use super::decide::try_decide; use super::signature::sign_proposal; +/// Handles a proposed value that can originate from multiple sources: +/// 1. Application layer: +/// - In 'parts-only' mode +/// - In 'proposal-and-parts' mode +/// 2. WAL (Write-Ahead Log) during node restart recovery +/// 3. Sync service during state synchronization +/// +/// This function processes proposed values based on their height and origin: +/// - Drops values from lower heights +/// - Queues values from higher heights for later processing +/// - For parts-only mode or values from Sync, generates and signs internal Proposal messages +/// - Stores the value and appends it to the WAL if new +/// - Applies any associated proposals to the driver +/// - Attempts immediate decision for values from Sync +/// +/// # Arguments +/// * `co` - Coordination object for async operations +/// * `state` - Current consensus state +/// * `metrics` - Metrics collection +/// * `proposed_value` - The proposed value to process +/// * `origin` - Origin of the proposed value (e.g., Sync, Network) +/// +/// # Returns +/// Result indicating success or failure of processing the proposed value pub async fn on_proposed_value( co: &Co, state: &mut State, @@ -32,8 +56,6 @@ where return Ok(()); } - state.store_value(&proposed_value); - // There are two cases where we need to generate an internal Proposal message for consensus to process the full proposal: // a) In parts-only mode, where we do not get a Proposal message but only the proposal parts // b) In any mode if the proposed value was provided by Sync, where we do net get a Proposal message but only the full value and the certificate @@ -54,7 +76,22 @@ where state.store_proposal(signed_proposal); } - let proposals = state.full_proposals_for_value(&proposed_value); + // If this is the first time we see this value, append it to the WAL, so it can be used for recovery. + if !state.value_exists(&proposed_value) { + perform!( + co, + Effect::WalAppend( + WalEntry::ProposedValue(proposed_value.clone()), + Default::default() + ) + ); + } + + state.store_value(&proposed_value); + + let validity = proposed_value.validity; + let proposals = state.proposals_for_value(&proposed_value); + for signed_proposal in proposals { debug!( proposal.height = %signed_proposal.height(), @@ -66,7 +103,7 @@ where co, state, metrics, - DriverInput::Proposal(signed_proposal, proposed_value.validity), + DriverInput::Proposal(signed_proposal, validity), ) .await?; } diff --git a/code/crates/core-consensus/src/handle/start_height.rs b/code/crates/core-consensus/src/handle/start_height.rs index f1afc63a9..5b8516ad3 100644 --- a/code/crates/core-consensus/src/handle/start_height.rs +++ b/code/crates/core-consensus/src/handle/start_height.rs @@ -24,10 +24,10 @@ where debug_assert_eq!(state.height(), height); debug_assert_eq!(state.round(), Round::Nil); - start_height(co, state, metrics, height).await + on_start_height(co, state, metrics, height).await } -pub async fn start_height( +async fn on_start_height( co: &Co, state: &mut State, metrics: &Metrics, @@ -36,6 +36,9 @@ pub async fn start_height( where Ctx: Context, { + debug_assert_eq!(state.height(), height); + debug_assert_eq!(state.round(), Round::Nil); + let round = Round::new(0); info!(%height, "Starting new height"); diff --git a/code/crates/core-consensus/src/handle/timeout.rs b/code/crates/core-consensus/src/handle/timeout.rs index a2efcec22..f09402ebf 100644 --- a/code/crates/core-consensus/src/handle/timeout.rs +++ b/code/crates/core-consensus/src/handle/timeout.rs @@ -3,6 +3,7 @@ use crate::handle::driver::apply_driver_input; use crate::handle::rebroadcast_timeout::on_rebroadcast_timeout; use crate::handle::step_timeout::on_step_limit_timeout; use crate::prelude::*; +use crate::types::WalEntry; pub async fn on_timeout_elapsed( co: &Co, @@ -41,7 +42,10 @@ where ) { // Persist the timeout in the Write-ahead Log. // Time-limit and rebroadcast timeouts are not persisted because they only occur when consensus is stuck. - perform!(co, Effect::WalAppendTimeout(timeout, Default::default())); + perform!( + co, + Effect::WalAppend(WalEntry::Timeout(timeout), Default::default()) + ); } apply_driver_input(co, state, metrics, DriverInput::TimeoutElapsed(timeout)).await?; diff --git a/code/crates/core-consensus/src/handle/vote.rs b/code/crates/core-consensus/src/handle/vote.rs index c25aad219..5b8a33bcf 100644 --- a/code/crates/core-consensus/src/handle/vote.rs +++ b/code/crates/core-consensus/src/handle/vote.rs @@ -5,9 +5,8 @@ use crate::handle::signature::verify_signature; use crate::handle::validator_set::get_validator_set; use crate::input::Input; use crate::prelude::*; -use crate::types::ConsensusMsg; +use crate::types::{ConsensusMsg, SignedConsensusMsg, WalEntry}; use crate::util::pretty::PrettyVote; -use crate::SignedConsensusMsg; pub async fn on_vote( co: &Co, @@ -82,8 +81,8 @@ where // Append the vote to the Write-ahead Log perform!( co, - Effect::WalAppendMessage( - SignedConsensusMsg::Vote(signed_vote.clone()), + Effect::WalAppend( + WalEntry::ConsensusMsg(SignedConsensusMsg::Vote(signed_vote.clone())), Default::default() ) ); diff --git a/code/crates/core-consensus/src/input.rs b/code/crates/core-consensus/src/input.rs index 515a7ae1d..884765628 100644 --- a/code/crates/core-consensus/src/input.rs +++ b/code/crates/core-consensus/src/input.rs @@ -15,7 +15,7 @@ pub enum Input where Ctx: Context, { - /// Start a new height with the given validator set. + /// Start consensus for the given height with the given validator set StartHeight(Ctx::Height, Ctx::ValidatorSet), /// Process a vote received over the network. diff --git a/code/crates/core-consensus/src/state.rs b/code/crates/core-consensus/src/state.rs index 5c583b18a..a014eba37 100644 --- a/code/crates/core-consensus/src/state.rs +++ b/code/crates/core-consensus/src/state.rs @@ -174,18 +174,32 @@ where .full_proposal_at_round_and_value(height, round, &value.id()) } - pub fn full_proposals_for_value( + pub fn full_proposal_at_round_and_proposer( + &self, + height: &Ctx::Height, + round: Round, + address: &Ctx::Address, + ) -> Option<&FullProposal> { + self.full_proposal_keeper + .full_proposal_at_round_and_proposer(height, round, address) + } + + pub fn proposals_for_value( &self, proposed_value: &ProposedValue, ) -> Vec> { self.full_proposal_keeper - .full_proposals_for_value(proposed_value) + .proposals_for_value(proposed_value) } pub fn store_proposal(&mut self, new_proposal: SignedProposal) { self.full_proposal_keeper.store_proposal(new_proposal) } + pub fn value_exists(&mut self, new_value: &ProposedValue) -> bool { + self.full_proposal_keeper.value_exists(new_value) + } + pub fn store_value(&mut self, new_value: &ProposedValue) { // Values for higher height should have been cached for future processing assert_eq!(new_value.height, self.driver.height()); diff --git a/code/crates/core-consensus/src/types.rs b/code/crates/core-consensus/src/types.rs index 02a0192c0..56184592f 100644 --- a/code/crates/core-consensus/src/types.rs +++ b/code/crates/core-consensus/src/types.rs @@ -2,7 +2,7 @@ use derive_where::derive_where; use thiserror::Error; use malachitebft_core_types::{ - Context, Proposal, Round, Signature, SignedProposal, SignedVote, Validity, Vote, + Context, Proposal, Round, Signature, SignedProposal, SignedVote, Timeout, Validity, Vote, }; pub use malachitebft_core_types::ValuePayload; @@ -77,6 +77,36 @@ pub struct ProposedValue { pub validity: Validity, } +#[derive_where(Clone, Debug)] +pub enum WalEntry { + ConsensusMsg(SignedConsensusMsg), + Timeout(Timeout), + ProposedValue(ProposedValue), +} + +impl WalEntry { + pub fn as_consensus_msg(&self) -> Option<&SignedConsensusMsg> { + match self { + WalEntry::ConsensusMsg(msg) => Some(msg), + _ => None, + } + } + + pub fn as_timeout(&self) -> Option<&Timeout> { + match self { + WalEntry::Timeout(timeout) => Some(timeout), + _ => None, + } + } + + pub fn as_proposed_value(&self) -> Option<&ProposedValue> { + match self { + WalEntry::ProposedValue(value) => Some(value), + _ => None, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, Error)] pub enum VoteExtensionError { #[error("Invalid vote extension signature")] diff --git a/code/crates/core-consensus/tests/full_proposal.rs b/code/crates/core-consensus/tests/full_proposal.rs index fe82cc66c..fcaf237e6 100644 --- a/code/crates/core-consensus/tests/full_proposal.rs +++ b/code/crates/core-consensus/tests/full_proposal.rs @@ -88,7 +88,7 @@ fn props_for_value( k: &FullProposalKeeper, v: &ProposedValue, ) -> Vec> { - k.full_proposals_for_value(v) + k.proposals_for_value(v) } // Used for full proposer keeper testing: diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 97a83cf1a..974dc63da 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -95,7 +95,7 @@ pub enum Msg { TimeoutElapsed(TimeoutElapsed), /// The proposal builder has built a value and can be used in a new proposal consensus message - ProposeValue(Ctx::Height, Round, Ctx::Value), + ProposeValue(LocallyProposedValue), /// Received and assembled the full value proposed by a validator ReceivedProposedValue(ProposedValue, ValueOrigin), @@ -207,6 +207,13 @@ where } } +struct HandlerState<'a, Ctx: Context> { + phase: Phase, + height: Ctx::Height, + timers: &'a mut Timers, + timeouts: &'a mut Timeouts, +} + impl Consensus where Ctx: Context, @@ -256,14 +263,14 @@ where state: &mut state.consensus, metrics: &self.metrics, with: effect => { - self.handle_effect( - myself, + let handler_state = HandlerState { + phase: state.phase, height, - &mut state.timers, - &mut state.timeouts, - state.phase, - effect - ).await + timers: &mut state.timers, + timeouts: &mut state.timeouts, + }; + + self.handle_effect(myself, handler_state, effect).await } ) } @@ -293,6 +300,17 @@ where ) -> Result<(), ActorProcessingErr> { match msg { Msg::StartHeight(height, validator_set) => { + self.tx_event.send(|| Event::StartedHeight(height)); + + // Fetch entries from the WAL + let wal_entries = self.wal_fetch(height).await?; + + if !wal_entries.is_empty() { + // Set the phase to `Recovering` while we replay the WAL + state.set_phase(Phase::Recovering); + } + + // Start consensus for the given height let result = self .process_input( &myself, @@ -305,6 +323,17 @@ where error!(%height, "Error when starting height: {e}"); } + if !wal_entries.is_empty() { + // Replay the entries from the WAL + self.replay_wal(&myself, state, height, wal_entries).await; + } + + // Set the phase to `Running` now that we have replayed the WAL + state.set_phase(Phase::Running); + + // Process any buffered messages, now that we are in the `Running` phase + self.process_buffered_msgs(&myself, state).await; + // Notify the sync actor that we have started a new height if let Some(sync) = &self.sync { if let Err(e) = sync.cast(SyncMsg::StartedHeight(height)) { @@ -312,40 +341,22 @@ where } } - self.tx_event.send(|| Event::StartedHeight(height)); - - if let Err(e) = self.check_and_replay_wal(&myself, state, height).await { - error!(%height, "Error when checking and replaying WAL: {e}"); - } - - state.set_phase(Phase::Running); - - self.process_buffered_msgs(&myself, state).await; - Ok(()) } - Msg::ProposeValue(height, round, value) => { - let value_to_propose = LocallyProposedValue { - height, - round, - value: value.clone(), - }; - + Msg::ProposeValue(value) => { let result = self - .process_input( - &myself, - state, - ConsensusInput::Propose(value_to_propose.clone()), - ) + .process_input(&myself, state, ConsensusInput::Propose(value.clone())) .await; if let Err(e) = result { - error!(%height, %round, "Error when processing ProposeValue message: {e}"); + error!( + height = %value.height, round = %value.round, + "Error when processing ProposeValue message: {e}" + ); } - self.tx_event - .send(|| Event::ProposedValue(value_to_propose)); + self.tx_event.send(|| Event::ProposedValue(value)); Ok(()) } @@ -549,25 +560,7 @@ where return Ok(()); }; - state.timeouts.increase_timeout(timeout.kind); - - if matches!( - timeout.kind, - TimeoutKind::Prevote - | TimeoutKind::Precommit - | TimeoutKind::PrevoteTimeLimit - | TimeoutKind::PrecommitTimeLimit - ) { - warn!(step = ?timeout.kind, "Timeout elapsed"); - - state.consensus.print_state(); - } - - let result = self - .process_input(&myself, state, ConsensusInput::TimeoutElapsed(timeout)) - .await; - - if let Err(e) = result { + if let Err(e) = self.timeout_elapsed(&myself, state, timeout).await { error!("Error when processing TimeoutElapsed message: {e:?}"); } @@ -604,7 +597,13 @@ where state.timeouts.increase_timeout(timeout.kind); // Print debug information if the timeout is for a prevote or precommit - if matches!(timeout.kind, TimeoutKind::Prevote | TimeoutKind::Precommit) { + if matches!( + timeout.kind, + TimeoutKind::Prevote + | TimeoutKind::Precommit + | TimeoutKind::PrevoteTimeLimit + | TimeoutKind::PrecommitTimeLimit + ) { warn!(step = ?timeout.kind, "Timeout elapsed"); state.consensus.print_state(); } @@ -616,91 +615,117 @@ where Ok(()) } - async fn check_and_replay_wal( + async fn wal_fetch( &self, - myself: &ActorRef>, - state: &mut State, height: Ctx::Height, - ) -> Result<(), ActorProcessingErr> { + ) -> Result>, ActorProcessingErr> { let result = ractor::call!(self.wal, WalMsg::StartedHeight, height)?; match result { Ok(None) => { // Nothing to replay debug!(%height, "No WAL entries to replay"); + Ok(Default::default()) } - Ok(Some(entries)) => { - info!("Found {} WAL entries to replay", entries.len()); - state.set_phase(Phase::Recovering); + Ok(Some(entries)) => { + info!("Found {} WAL entries", entries.len()); - if let Err(e) = self.replay_wal_entries(myself, state, entries).await { - error!(%height, "Failed to replay WAL entries: {e}"); - self.tx_event.send(|| Event::WalReplayError(Arc::new(e))); - } + Ok(entries) } + Err(e) => { error!(%height, "Error when notifying WAL of started height: {e}"); self.tx_event .send(|| Event::WalReplayError(Arc::new(e.into()))); + Ok(Default::default()) } } - - Ok(()) } - async fn replay_wal_entries( + async fn replay_wal( &self, myself: &ActorRef>, state: &mut State, + height: Ctx::Height, entries: Vec>, - ) -> Result<(), ActorProcessingErr> { + ) { use SignedConsensusMsg::*; - debug_assert!(!entries.is_empty()); + assert_eq!(state.phase, Phase::Recovering); + + info!("Replaying {} WAL entries", entries.len()); + + if entries.is_empty() { + return; + } self.tx_event - .send(|| Event::WalReplayBegin(state.height(), entries.len())); + .send(|| Event::WalReplayBegin(height, entries.len())); for entry in entries { + self.tx_event.send(|| Event::WalReplayEntry(entry.clone())); + match entry { WalEntry::ConsensusMsg(Vote(vote)) => { - self.tx_event - .send(|| Event::WalReplayConsensus(Vote(vote.clone()))); + info!("Replaying vote: {vote:?}"); if let Err(e) = self .process_input(myself, state, ConsensusInput::Vote(vote)) .await { - error!("Error when replaying Vote: {e}"); + error!("Error when replaying vote: {e}"); + + self.tx_event + .send(|| Event::WalReplayError(Arc::new(e.into()))); } } WalEntry::ConsensusMsg(Proposal(proposal)) => { - self.tx_event - .send(|| Event::WalReplayConsensus(Proposal(proposal.clone()))); + info!("Replaying proposal: {proposal:?}"); if let Err(e) = self .process_input(myself, state, ConsensusInput::Proposal(proposal)) .await { error!("Error when replaying Proposal: {e}"); + + self.tx_event + .send(|| Event::WalReplayError(Arc::new(e.into()))); } } WalEntry::Timeout(timeout) => { - self.tx_event.send(|| Event::WalReplayTimeout(timeout)); + info!("Replaying timeout: {timeout:?}"); if let Err(e) = self.timeout_elapsed(myself, state, timeout).await { error!("Error when replaying TimeoutElapsed: {e}"); + + self.tx_event.send(|| Event::WalReplayError(Arc::new(e))); + } + } + + WalEntry::ProposedValue(value) => { + info!("Replaying proposed value: {value:?}"); + + if let Err(e) = self + .process_input( + myself, + state, + ConsensusInput::ProposedValue(value, ValueOrigin::Consensus), + ) + .await + { + error!("Error when replaying LocallyProposedValue: {e}"); + + self.tx_event + .send(|| Event::WalReplayError(Arc::new(e.into()))); } } } } self.tx_event.send(|| Event::WalReplayDone(state.height())); - - Ok(()) } fn get_value( @@ -720,9 +745,7 @@ where reply_to, }, myself, - |proposed: LocallyProposedValue| { - Msg::::ProposeValue(proposed.height, proposed.round, proposed.value) - }, + Msg::::ProposeValue, None, )?; @@ -826,37 +849,34 @@ where async fn handle_effect( &self, myself: &ActorRef>, - height: Ctx::Height, - timers: &mut Timers, - timeouts: &mut Timeouts, - phase: Phase, + state: HandlerState<'_, Ctx>, effect: Effect, ) -> Result, ActorProcessingErr> { match effect { Effect::ResetTimeouts(r) => { - timeouts.reset(self.timeout_config); + state.timeouts.reset(self.timeout_config); Ok(r.resume_with(())) } Effect::CancelAllTimeouts(r) => { - timers.cancel_all(); + state.timers.cancel_all(); Ok(r.resume_with(())) } Effect::CancelTimeout(timeout, r) => { - timers.cancel(&timeout); + state.timers.cancel(&timeout); Ok(r.resume_with(())) } Effect::ScheduleTimeout(timeout, r) => { - let duration = timeouts.duration_for(timeout.kind); - timers.start_timer(timeout, duration); + let duration = state.timeouts.duration_for(timeout.kind); + state.timers.start_timer(timeout, duration); Ok(r.resume_with(())) } Effect::StartRound(height, round, proposer, r) => { - self.wal_flush(phase).await?; + self.wal_flush(state.phase).await?; self.host.cast(HostMsg::StartedRound { height, @@ -955,8 +975,8 @@ where Effect::Publish(msg, r) => { // Sync the WAL to disk before we broadcast the message - // NOTE: The message has already been append to the WAL by the `WalAppendMessage` effect. - self.wal_flush(phase).await?; + // NOTE: The message has already been append to the WAL by the `WalAppend` effect. + self.wal_flush(state.phase).await?; // Notify any subscribers that we are about to publish a message self.tx_event.send(|| Event::Published(msg.clone())); @@ -984,10 +1004,12 @@ where } Effect::GetValue(height, round, timeout, r) => { - let timeout_duration = timeouts.duration_for(timeout.kind); + let timeout_duration = state.timeouts.duration_for(timeout.kind); self.get_value(myself, height, round, timeout_duration) - .map_err(|e| eyre!("Error when asking for value to be built: {e:?}"))?; + .map_err(|e| { + eyre!("Error when asking application for value to propose: {e:?}") + })?; Ok(r.resume_with(())) } @@ -1019,7 +1041,7 @@ where Effect::Decide(certificate, extensions, r) => { assert!(!certificate.aggregated_signature.signatures.is_empty()); - self.wal_flush(phase).await?; + self.wal_flush(state.phase).await?; self.tx_event.send(|| Event::Decided(certificate.clone())); @@ -1100,17 +1122,8 @@ where Ok(r.resume_with(())) } - Effect::WalAppendMessage(msg, r) => { - self.wal_append(height, WalEntry::ConsensusMsg(msg), phase) - .await?; - - Ok(r.resume_with(())) - } - - Effect::WalAppendTimeout(timeout, r) => { - self.wal_append(height, WalEntry::Timeout(timeout), phase) - .await?; - + Effect::WalAppend(entry, r) => { + self.wal_append(state.height, entry, state.phase).await?; Ok(r.resume_with(())) } } diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index ef55f2a7e..07ababfd0 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -5,10 +5,10 @@ use derive_where::derive_where; use ractor::ActorProcessingErr; use tokio::sync::broadcast; -use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue, SignedConsensusMsg}; -use malachitebft_core_types::{ - CommitCertificate, Context, Round, SignedVote, Timeout, ValueOrigin, +use malachitebft_core_consensus::{ + LocallyProposedValue, ProposedValue, SignedConsensusMsg, WalEntry, }; +use malachitebft_core_types::{CommitCertificate, Context, Round, SignedVote, ValueOrigin}; pub type RxEvent = broadcast::Receiver>; @@ -52,8 +52,7 @@ pub enum Event { RequestedVoteSet(Ctx::Height, Round), SentVoteSetResponse(Ctx::Height, Round, usize, usize), WalReplayBegin(Ctx::Height, usize), - WalReplayConsensus(SignedConsensusMsg), - WalReplayTimeout(Timeout), + WalReplayEntry(WalEntry), WalReplayDone(Ctx::Height), WalReplayError(Arc), } @@ -87,8 +86,7 @@ impl fmt::Display for Event { Event::WalReplayBegin(height, count) => { write!(f, "WalReplayBegin(height: {height}, count: {count})") } - Event::WalReplayConsensus(msg) => write!(f, "WalReplayConsensus(msg: {msg:?})"), - Event::WalReplayTimeout(timeout) => write!(f, "WalReplayTimeout(timeout: {timeout:?})"), + Event::WalReplayEntry(entry) => write!(f, "WalReplayEntry(entry: {entry:?})"), Event::WalReplayDone(height) => write!(f, "WalReplayDone(height: {height})"), Event::WalReplayError(error) => write!(f, "WalReplayError({error})"), } diff --git a/code/crates/engine/src/wal/entry.rs b/code/crates/engine/src/wal/entry.rs index c6221a9c0..c255304cd 100644 --- a/code/crates/engine/src/wal/entry.rs +++ b/code/crates/engine/src/wal/entry.rs @@ -1,10 +1,9 @@ use std::io::{self, Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt, BE}; -use derive_where::derive_where; use malachitebft_codec::Codec; -use malachitebft_core_consensus::SignedConsensusMsg; +use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{Context, Round, Timeout}; /// Codec for encoding and decoding WAL entries. @@ -15,6 +14,7 @@ pub trait WalCodec where Ctx: Context, Self: Codec>, + Self: Codec>, { } @@ -22,105 +22,97 @@ impl WalCodec for C where Ctx: Context, C: Codec>, + C: Codec>, { } -#[derive_where(Debug)] -pub enum WalEntry { - ConsensusMsg(SignedConsensusMsg), - Timeout(Timeout), -} +pub use malachitebft_core_consensus::WalEntry; + +const TAG_CONSENSUS: u8 = 0x01; +const TAG_TIMEOUT: u8 = 0x02; +const TAG_PROPOSED_VALUE: u8 = 0x04; -impl WalEntry +pub fn encode_entry(entry: &WalEntry, codec: &C, buf: W) -> io::Result<()> where Ctx: Context, + C: WalCodec, + W: Write, { - pub fn tpe(&self) -> &'static str { - match self { - Self::ConsensusMsg(msg) => match msg { - SignedConsensusMsg::Vote(_) => "Consensus(Vote)", - SignedConsensusMsg::Proposal(_) => "Consensus(Proposal)", - }, - Self::Timeout(_) => "Timeout", + match entry { + WalEntry::ConsensusMsg(msg) => encode_consensus_msg(TAG_CONSENSUS, msg, codec, buf), + WalEntry::Timeout(timeout) => encode_timeout(TAG_TIMEOUT, timeout, buf), + WalEntry::ProposedValue(value) => { + encode_proposed_value(TAG_PROPOSED_VALUE, value, codec, buf) } } } -impl WalEntry +pub fn decode_entry(codec: &C, mut buf: R) -> io::Result> where Ctx: Context, + C: WalCodec, + R: Read, { - const TAG_CONSENSUS: u8 = 0x01; - const TAG_TIMEOUT: u8 = 0x02; - - pub fn encode(&self, codec: &C, mut buf: W) -> io::Result<()> - where - C: WalCodec, - W: Write, - { - match self { - WalEntry::ConsensusMsg(msg) => { - let bytes = codec.encode(msg).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("failed to encode consensus message: {e}"), - ) - })?; - - // Write tag - buf.write_u8(Self::TAG_CONSENSUS)?; - - // Write encoded length - buf.write_u64::(bytes.len() as u64)?; - - // Write encoded bytes - buf.write_all(&bytes)?; - - Ok(()) - } - - WalEntry::Timeout(timeout) => { - // Write tag and timeout if applicable - encode_timeout(Self::TAG_TIMEOUT, timeout, &mut buf)?; - - Ok(()) - } - } - } + let tag = buf.read_u8()?; - pub fn decode(codec: &C, mut buf: R) -> io::Result> - where - C: WalCodec, - R: Read, - { - let tag = buf.read_u8()?; - - match tag { - Self::TAG_CONSENSUS => { - let len = buf.read_u64::()?; - let mut bytes = vec![0; len as usize]; - buf.read_exact(&mut bytes)?; - - let msg = codec.decode(bytes.into()).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("failed to decode consensus msg: {e}"), - ) - })?; - - Ok(WalEntry::ConsensusMsg(msg)) - } - - Self::TAG_TIMEOUT => { - let timeout = decode_timeout(&mut buf)?; - Ok(WalEntry::Timeout(timeout)) - } - - _ => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid tag")), - } + match tag { + TAG_CONSENSUS => decode_consensus_msg(codec, buf).map(WalEntry::ConsensusMsg), + TAG_TIMEOUT => decode_timeout(buf).map(WalEntry::Timeout), + TAG_PROPOSED_VALUE => decode_proposed_value(codec, buf).map(WalEntry::ProposedValue), + _ => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid tag")), } } +// Consensus message helpers +fn encode_consensus_msg( + tag: u8, + msg: &SignedConsensusMsg, + codec: &C, + mut buf: W, +) -> io::Result<()> +where + Ctx: Context, + C: WalCodec, + W: Write, +{ + let bytes = codec.encode(msg).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to encode consensus message: {e}"), + ) + })?; + + // Write tag + buf.write_u8(tag)?; + + // Write encoded length + buf.write_u64::(bytes.len() as u64)?; + + // Write encoded bytes + buf.write_all(&bytes)?; + + Ok(()) +} + +fn decode_consensus_msg(codec: &C, mut buf: R) -> io::Result> +where + Ctx: Context, + C: WalCodec, + R: Read, +{ + let len = buf.read_u64::()?; + let mut bytes = vec![0; len as usize]; + buf.read_exact(&mut bytes)?; + + codec.decode(bytes.into()).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to decode consensus msg: {e}"), + ) + }) +} + +// Timeout helpers fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> { use malachitebft_core_types::TimeoutKind; @@ -130,7 +122,7 @@ fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result TimeoutKind::Precommit => 3, TimeoutKind::Commit => 4, - // Consensus will typically not want to store these two timeouts in the WAL, + // Consensus will typically not want to store these timeouts in the WAL, // but we still need to handle them here. TimeoutKind::PrevoteTimeLimit => 5, TimeoutKind::PrecommitTimeLimit => 6, @@ -153,8 +145,13 @@ fn decode_timeout(mut buf: impl Read) -> io::Result { 2 => TimeoutKind::Prevote, 3 => TimeoutKind::Precommit, 4 => TimeoutKind::Commit, + + // Consensus will typically not want to store these timeouts in the WAL, + // but we still need to handle them here. 5 => TimeoutKind::PrevoteTimeLimit, 6 => TimeoutKind::PrecommitTimeLimit, + 7 => TimeoutKind::PrevoteRebroadcast, + 8 => TimeoutKind::PrecommitRebroadcast, _ => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -167,3 +164,52 @@ fn decode_timeout(mut buf: impl Read) -> io::Result { Ok(Timeout::new(round, step)) } + +// Proposed value helpers +fn encode_proposed_value( + tag: u8, + value: &ProposedValue, + codec: &C, + mut buf: W, +) -> io::Result<()> +where + Ctx: Context, + C: WalCodec, + W: Write, +{ + let bytes = codec.encode(value).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to encode consensus message: {e}"), + ) + })?; + + // Write tag + buf.write_u8(tag)?; + + // Write encoded length + buf.write_u64::(bytes.len() as u64)?; + + // Write encoded bytes + buf.write_all(&bytes)?; + + Ok(()) +} + +fn decode_proposed_value(codec: &C, mut buf: R) -> io::Result> +where + Ctx: Context, + C: WalCodec, + R: Read, +{ + let len = buf.read_u64::()?; + let mut bytes = vec![0; len as usize]; + buf.read_exact(&mut bytes)?; + + codec.decode(bytes.into()).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("failed to decode proposed value: {e}"), + ) + }) +} diff --git a/code/crates/engine/src/wal/iter.rs b/code/crates/engine/src/wal/iter.rs index 082a2a0d7..901c0532c 100644 --- a/code/crates/engine/src/wal/iter.rs +++ b/code/crates/engine/src/wal/iter.rs @@ -6,6 +6,7 @@ use malachitebft_wal as wal; use eyre::Result; +use super::entry::decode_entry; use super::{WalCodec, WalEntry}; pub fn log_entries<'a, Ctx, Codec>( @@ -41,7 +42,7 @@ where match entry { Ok(bytes) => { let buf = io::Cursor::new(bytes); - let entry = WalEntry::decode(self.codec, buf); + let entry = decode_entry(self.codec, buf); Some(entry.map_err(Into::into)) } Err(e) => Some(Err(e.into())), diff --git a/code/crates/engine/src/wal/thread.rs b/code/crates/engine/src/wal/thread.rs index a3a2bb2ca..2cd3d2ecf 100644 --- a/code/crates/engine/src/wal/thread.rs +++ b/code/crates/engine/src/wal/thread.rs @@ -9,7 +9,7 @@ use tracing::{debug, error, info}; use malachitebft_core_types::{Context, Height}; use malachitebft_wal as wal; -use super::entry::{WalCodec, WalEntry}; +use super::entry::{decode_entry, encode_entry, WalCodec, WalEntry}; use super::iter::log_entries; pub type ReplyTo = oneshot::Sender>; @@ -92,10 +92,10 @@ where } WalMsg::Append(entry, reply) => { - let tpe = entry.tpe(); + let tpe = wal_entry_type(&entry); let mut buf = Vec::new(); - entry.encode(codec, &mut buf)?; + encode_entry(&entry, codec, &mut buf)?; if !buf.is_empty() { let result = log.append(&buf).map_err(Into::into); @@ -168,7 +168,7 @@ where } }) .filter_map( - |(idx, bytes)| match WalEntry::decode(codec, io::Cursor::new(bytes.clone())) { + |(idx, bytes)| match decode_entry(codec, io::Cursor::new(bytes.clone())) { Ok(entry) => Some(entry), Err(e) => { error!("Failed to decode WAL entry {idx}: {e} {:?}", bytes); @@ -229,3 +229,16 @@ fn span_sequence(sequence: u64, msg: &WalMsg) -> u64 { sequence } } + +fn wal_entry_type(entry: &WalEntry) -> &'static str { + use malachitebft_core_consensus::SignedConsensusMsg; + + match entry { + WalEntry::ConsensusMsg(msg) => match msg { + SignedConsensusMsg::Vote(_) => "Consensus(Vote)", + SignedConsensusMsg::Proposal(_) => "Consensus(Proposal)", + }, + WalEntry::ProposedValue(_) => "LocallyProposedValue", + WalEntry::Timeout(_) => "Timeout", + } +} diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs index 0c018f6ae..344733297 100644 --- a/code/crates/test/app/src/app.rs +++ b/code/crates/test/app/src/app.rs @@ -2,7 +2,7 @@ use std::time::Duration; use eyre::eyre; use tokio::time::sleep; -use tracing::{error, info}; +use tracing::{debug, error, info}; // use malachitebft_app_channel::app::config::ValuePayload; use malachitebft_app_channel::app::streaming::StreamContent; @@ -129,7 +129,8 @@ pub async fn run( // Now what's left to do is to break down the value to propose into parts, // and send those parts over the network to our peers, for them to re-assemble the full value. for stream_message in state.stream_proposal(proposal, pol_round) { - info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + debug!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels .network .send(NetworkMsg::PublishProposalPart(stream_message)) @@ -148,7 +149,7 @@ pub async fn run( StreamContent::Fin => "end of stream", }; - info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); let proposed_value = state.received_proposal_part(from, part).await?; @@ -296,7 +297,7 @@ pub async fn run( for stream_message in state.stream_proposal(locally_proposed_value, valid_round) { - info!(%height, %valid_round, "Publishing proposal part: {stream_message:?}"); + debug!(%height, %valid_round, "Publishing proposal part: {stream_message:?}"); channels .network diff --git a/code/crates/test/framework/src/lib.rs b/code/crates/test/framework/src/lib.rs index 417823e08..9c5c1ca7d 100644 --- a/code/crates/test/framework/src/lib.rs +++ b/code/crates/test/framework/src/lib.rs @@ -284,6 +284,26 @@ where } } + Step::WaitUntilRound(target_round) => { + info!("Waiting until node reaches round {target_round}"); + + 'inner: while let Ok(event) = rx_event.recv().await { + if let Some(failure) = failure.lock().await.take() { + return TestResult::Failure(failure); + } + + let Event::StartedRound(_, round) = event else { + continue 'inner; + }; + + info!("Node started round {round}"); + + if round.as_u32() == Some(target_round) { + break 'inner; + } + } + } + Step::Crash(after) => { let height = current_height.load(Ordering::SeqCst); diff --git a/code/crates/test/framework/src/node.rs b/code/crates/test/framework/src/node.rs index e1767a88d..047c316c4 100644 --- a/code/crates/test/framework/src/node.rs +++ b/code/crates/test/framework/src/node.rs @@ -21,6 +21,7 @@ where ResetDb, Restart(Duration), WaitUntil(u64), + WaitUntilRound(u32), OnEvent(EventHandler), Expect(Expected), Success, @@ -127,6 +128,11 @@ where self } + pub fn wait_until_round(&mut self, round: u32) -> &mut Self { + self.steps.push(Step::WaitUntilRound(round)); + self + } + pub fn on_event(&mut self, on_event: F) -> &mut Self where F: Fn(Event, &mut State) -> Result diff --git a/code/crates/test/tests/it/wal.rs b/code/crates/test/tests/it/wal.rs index 69eb76376..aad5c0ef1 100644 --- a/code/crates/test/tests/it/wal.rs +++ b/code/crates/test/tests/it/wal.rs @@ -1,16 +1,16 @@ use std::time::Duration; use eyre::bail; -use informalsystems_malachitebft_test::middleware::Middleware; use tracing::info; use informalsystems_malachitebft_test::{self as malachitebft_test}; use malachitebft_config::{ValuePayload, VoteSyncMode}; use malachitebft_core_consensus::LocallyProposedValue; -use malachitebft_core_types::SignedVote; +use malachitebft_core_types::{NilOrVal, Round, SignedVote}; use malachitebft_engine::util::events::Event; -use malachitebft_test::TestContext; +use malachitebft_test::middleware::Middleware; +use malachitebft_test::{Address, Height, TestContext, ValueId, Vote}; use crate::{HandlerResult, TestBuilder, TestParams}; @@ -445,7 +445,48 @@ async fn byzantine_proposer_crashes_after_proposing_2(params: TestParams) { Duration::from_secs(60), TestParams { timeout_step: Duration::from_secs(5), - value_payload: ValuePayload::PartsOnly, + ..params + }, + ) + .await +} + +#[tokio::test] +async fn multi_rounds() { + wal_multi_rounds(TestParams::default()).await +} + +async fn wal_multi_rounds(params: TestParams) { + const CRASH_HEIGHT: u64 = 1; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_middleware(PrevoteNil) + .start() + .wait_until(CRASH_HEIGHT) + .wait_until_round(3) + .crash() + .restart_after(Duration::from_secs(10)) + .expect_wal_replay(CRASH_HEIGHT) + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.add_node() + .start() + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.add_node() + .start() + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.build() + .run_with_params( + Duration::from_secs(60), + TestParams { + enable_value_sync: false, ..params }, ) @@ -492,3 +533,23 @@ impl Middleware for ByzantineProposer { proposal.value = new_value; } } + +#[derive(Copy, Clone, Debug)] +struct PrevoteNil; + +impl Middleware for PrevoteNil { + fn new_prevote( + &self, + _ctx: &TestContext, + height: Height, + round: Round, + value_id: NilOrVal, + address: Address, + ) -> Vote { + if round.as_i64() <= 3 { + Vote::new_prevote(height, round, NilOrVal::Nil, address) + } else { + Vote::new_prevote(height, round, value_id, address) + } + } +}