diff --git a/code/crates/core-consensus/src/handle/timeout.rs b/code/crates/core-consensus/src/handle/timeout.rs index 827eeca09..84fbd5601 100644 --- a/code/crates/core-consensus/src/handle/timeout.rs +++ b/code/crates/core-consensus/src/handle/timeout.rs @@ -16,8 +16,10 @@ where let height = state.driver.height(); let round = state.driver.round(); - if timeout.round != round && timeout.kind != TimeoutKind::Commit { - debug!( + if timeout.round != round + && !matches!(timeout.kind, TimeoutKind::Precommit | TimeoutKind::Commit) + { + warn!( %height, %round, timeout.round = %timeout.round, diff --git a/code/crates/core-driver/src/driver.rs b/code/crates/core-driver/src/driver.rs index 40e9bdbc0..a0252c04a 100644 --- a/code/crates/core-driver/src/driver.rs +++ b/code/crates/core-driver/src/driver.rs @@ -497,8 +497,8 @@ where let previous_step = round_state.step; - let proposer = self.get_proposer()?; - let info = Info::new(input_round, &self.address, proposer.address()); + let proposer = self.get_proposer().ok(); + let info = Info::new(input_round, &self.address, proposer.map(|p| p.address())); // Apply the input to the round state machine let transition = round_state.apply(&info, input); diff --git a/code/crates/core-state-machine/src/state_machine.rs b/code/crates/core-state-machine/src/state_machine.rs index 86f2cda51..6f4110c76 100644 --- a/code/crates/core-state-machine/src/state_machine.rs +++ b/code/crates/core-state-machine/src/state_machine.rs @@ -21,7 +21,7 @@ where /// Address of our node pub address: &'a Ctx::Address, /// Proposer for the round we are at - pub proposer: &'a Ctx::Address, + pub proposer: Option<&'a Ctx::Address>, } impl<'a, Ctx> Info<'a, Ctx> @@ -29,7 +29,11 @@ where Ctx: Context, { /// Create a new `Info` instance. - pub fn new(input_round: Round, address: &'a Ctx::Address, proposer: &'a Ctx::Address) -> Self { + pub fn new( + input_round: Round, + address: &'a Ctx::Address, + proposer: Option<&'a Ctx::Address>, + ) -> Self { Self { input_round, address, @@ -42,13 +46,15 @@ where Self { input_round, address, - proposer: address, + proposer: Some(address), } } /// Check if we are the proposer for the round we are at. pub fn is_proposer(&self) -> bool { - self.address == self.proposer + self.proposer + .map(|proposer| self.address == proposer) + .expect("no proposer") } } @@ -74,7 +80,7 @@ where { let this_round = state.round == info.input_round; - match (state.step, input) { + match dbg!((state.step, input)) { // // From NewRound. // diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 177320a91..db11a0a48 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -296,6 +296,8 @@ where ) .await; + self.tx_event.send(|| Event::StartedHeight(height)); + if let Err(e) = result { error!(%height, "Error when preparing for height: {e}"); } @@ -321,8 +323,6 @@ where } } - self.tx_event.send(|| Event::StartedHeight(height)); - self.process_buffered_msgs(&myself, state).await; state.phase = Phase::Running; @@ -1246,10 +1246,11 @@ fn span_height(height: Ctx::Height, msg: &Msg) -> Ctx::Height /// Use round 0 instead of the consensus state round for the tracing span of /// the Consensus actor when starting a new height. -fn span_round(round: Round, msg: &Msg) -> Round { - if let Msg::StartHeight(_, _) = msg { - Round::new(0) - } else { - round - } +fn span_round(round: Round, _msg: &Msg) -> Round { + round + // if let Msg::StartHeight(_, _) = msg { + // Round::new(0) + // } else { + // round + // } } diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs index 85ee804de..cf3cf1332 100644 --- a/code/crates/test/app/src/app.rs +++ b/code/crates/test/app/src/app.rs @@ -2,7 +2,7 @@ use std::time::Duration; use eyre::eyre; use tokio::time::sleep; -use tracing::{error, info}; +use tracing::{debug, error, info}; // use malachitebft_app_channel::app::config::ValuePayload; use malachitebft_app_channel::app::streaming::StreamContent; @@ -131,7 +131,8 @@ pub async fn run( // Now what's left to do is to break down the value to propose into parts, // and send those parts over the network to our peers, for them to re-assemble the full value. for stream_message in state.stream_proposal(proposal, pol_round) { - info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + debug!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels .network .send(NetworkMsg::PublishProposalPart(stream_message)) @@ -150,7 +151,7 @@ pub async fn run( StreamContent::Fin => "end of stream", }; - info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); let proposed_value = state.received_proposal_part(from, part).await?; diff --git a/code/crates/test/framework/src/lib.rs b/code/crates/test/framework/src/lib.rs index 417823e08..9c5c1ca7d 100644 --- a/code/crates/test/framework/src/lib.rs +++ b/code/crates/test/framework/src/lib.rs @@ -284,6 +284,26 @@ where } } + Step::WaitUntilRound(target_round) => { + info!("Waiting until node reaches round {target_round}"); + + 'inner: while let Ok(event) = rx_event.recv().await { + if let Some(failure) = failure.lock().await.take() { + return TestResult::Failure(failure); + } + + let Event::StartedRound(_, round) = event else { + continue 'inner; + }; + + info!("Node started round {round}"); + + if round.as_u32() == Some(target_round) { + break 'inner; + } + } + } + Step::Crash(after) => { let height = current_height.load(Ordering::SeqCst); diff --git a/code/crates/test/framework/src/node.rs b/code/crates/test/framework/src/node.rs index 228304e56..c0b8ff073 100644 --- a/code/crates/test/framework/src/node.rs +++ b/code/crates/test/framework/src/node.rs @@ -19,6 +19,7 @@ where ResetDb, Restart(Duration), WaitUntil(u64), + WaitUntilRound(u32), OnEvent(EventHandler), Expect(Expected), Success, @@ -125,6 +126,11 @@ where self } + pub fn wait_until_round(&mut self, round: u32) -> &mut Self { + self.steps.push(Step::WaitUntilRound(round)); + self + } + pub fn on_event(&mut self, on_event: F) -> &mut Self where F: Fn(Event, &mut State) -> Result diff --git a/code/crates/test/src/context.rs b/code/crates/test/src/context.rs index 0a04388d5..74e4d472e 100644 --- a/code/crates/test/src/context.rs +++ b/code/crates/test/src/context.rs @@ -78,6 +78,10 @@ impl Context for TestContext { value_id: NilOrVal, address: Address, ) -> Vote { + if round < Round::new(4) { + return Vote::new_prevote(height, round, NilOrVal::Nil, address); + } + Vote::new_prevote(height, round, value_id, address) } diff --git a/code/crates/test/tests/it/wal.rs b/code/crates/test/tests/it/wal.rs index 58eca21e7..de5ba227e 100644 --- a/code/crates/test/tests/it/wal.rs +++ b/code/crates/test/tests/it/wal.rs @@ -451,3 +451,50 @@ async fn byzantine_proposer_crashes_after_proposing_2(params: TestParams) { ) .await } + +#[tokio::test] +async fn multi_rounds() { + wal_multi_rounds(TestParams::default()).await +} + +async fn wal_multi_rounds(params: TestParams) { + const CRASH_HEIGHT: u64 = 1; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .start() + .wait_until(CRASH_HEIGHT) + .wait_until_round(3) + // Crash right after + .crash() + // Restart after 10 seconds + .restart_after(Duration::from_secs(10)) + // Check that we replay messages from the WAL + .expect_wal_replay(CRASH_HEIGHT) + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.add_node() + .start() + // .wait_until(CRASH_HEIGHT) + // .crash() + // .restart_after(Duration::from_secs(5)) + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.add_node() + .start() + .wait_until(CRASH_HEIGHT + 2) + .success(); + + test.build() + .run_with_params( + Duration::from_secs(60), + TestParams { + enable_value_sync: false, + ..params + }, + ) + .await +} diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index 870628289..a7389dbcc 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -2,7 +2,7 @@ use std::time::Duration; use eyre::eyre; use tokio::time::sleep; -use tracing::{error, info}; +use tracing::{debug, error, info}; use malachitebft_app_channel::app::streaming::StreamContent; use malachitebft_app_channel::app::types::codec::Codec; @@ -113,7 +113,7 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr // Now what's left to do is to break down the value to propose into parts, // and send those parts over the network to our peers, for them to re-assemble the full value. for stream_message in state.stream_proposal(proposal, pol_round) { - info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + debug!(%height, %round, "Streaming proposal part: {stream_message:?}"); channels .network @@ -158,7 +158,7 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr StreamContent::Fin => "end of stream", }; - info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); let proposed_value = state.received_proposal_part(from, part).await?; @@ -301,7 +301,7 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr for stream_message in state.stream_proposal(locally_proposed_value, valid_round) { - info!(%height, %valid_round, "Publishing proposal part: {stream_message:?}"); + debug!(%height, %valid_round, "Publishing proposal part: {stream_message:?}"); channels .network