Skip to content

code: Tests for multi-rounds unstarted WAL replay #946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions code/crates/core-consensus/src/handle/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ where
let height = state.driver.height();
let round = state.driver.round();

if timeout.round != round && timeout.kind != TimeoutKind::Commit {
debug!(
if timeout.round != round
&& !matches!(timeout.kind, TimeoutKind::Precommit | TimeoutKind::Commit)
{
warn!(
%height,
%round,
timeout.round = %timeout.round,
Expand Down
4 changes: 2 additions & 2 deletions code/crates/core-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ where

let previous_step = round_state.step;

let proposer = self.get_proposer()?;
let info = Info::new(input_round, &self.address, proposer.address());
let proposer = self.get_proposer().ok();
let info = Info::new(input_round, &self.address, proposer.map(|p| p.address()));

// Apply the input to the round state machine
let transition = round_state.apply(&info, input);
Expand Down
16 changes: 11 additions & 5 deletions code/crates/core-state-machine/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ where
/// Address of our node
pub address: &'a Ctx::Address,
/// Proposer for the round we are at
pub proposer: &'a Ctx::Address,
pub proposer: Option<&'a Ctx::Address>,
}

impl<'a, Ctx> Info<'a, Ctx>
where
Ctx: Context,
{
/// Create a new `Info` instance.
pub fn new(input_round: Round, address: &'a Ctx::Address, proposer: &'a Ctx::Address) -> Self {
pub fn new(
input_round: Round,
address: &'a Ctx::Address,
proposer: Option<&'a Ctx::Address>,
) -> Self {
Self {
input_round,
address,
Expand All @@ -42,13 +46,15 @@ where
Self {
input_round,
address,
proposer: address,
proposer: Some(address),
}
}

/// Check if we are the proposer for the round we are at.
pub fn is_proposer(&self) -> bool {
self.address == self.proposer
self.proposer
.map(|proposer| self.address == proposer)
.expect("no proposer")
}
}

Expand All @@ -74,7 +80,7 @@ where
{
let this_round = state.round == info.input_round;

match (state.step, input) {
match dbg!((state.step, input)) {
//
// From NewRound.
//
Expand Down
17 changes: 9 additions & 8 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ where
)
.await;

self.tx_event.send(|| Event::StartedHeight(height));

if let Err(e) = result {
error!(%height, "Error when preparing for height: {e}");
}
Expand All @@ -321,8 +323,6 @@ where
}
}

self.tx_event.send(|| Event::StartedHeight(height));

self.process_buffered_msgs(&myself, state).await;

state.phase = Phase::Running;
Expand Down Expand Up @@ -1246,10 +1246,11 @@ fn span_height<Ctx: Context>(height: Ctx::Height, msg: &Msg<Ctx>) -> Ctx::Height

/// Use round 0 instead of the consensus state round for the tracing span of
/// the Consensus actor when starting a new height.
fn span_round<Ctx: Context>(round: Round, msg: &Msg<Ctx>) -> Round {
if let Msg::StartHeight(_, _) = msg {
Round::new(0)
} else {
round
}
fn span_round<Ctx: Context>(round: Round, _msg: &Msg<Ctx>) -> Round {
round
// if let Msg::StartHeight(_, _) = msg {
// Round::new(0)
// } else {
// round
// }
}
7 changes: 4 additions & 3 deletions code/crates/test/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use eyre::eyre;
use tokio::time::sleep;
use tracing::{error, info};
use tracing::{debug, error, info};

// use malachitebft_app_channel::app::config::ValuePayload;
use malachitebft_app_channel::app::streaming::StreamContent;
Expand Down Expand Up @@ -131,7 +131,8 @@ pub async fn run(
// Now what's left to do is to break down the value to propose into parts,
// and send those parts over the network to our peers, for them to re-assemble the full value.
for stream_message in state.stream_proposal(proposal, pol_round) {
info!(%height, %round, "Streaming proposal part: {stream_message:?}");
debug!(%height, %round, "Streaming proposal part: {stream_message:?}");

channels
.network
.send(NetworkMsg::PublishProposalPart(stream_message))
Expand All @@ -150,7 +151,7 @@ pub async fn run(
StreamContent::Fin => "end of stream",
};

info!(%from, %part.sequence, part.type = %part_type, "Received proposal part");
debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part");

let proposed_value = state.received_proposal_part(from, part).await?;

Expand Down
20 changes: 20 additions & 0 deletions code/crates/test/framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ where
}
}

Step::WaitUntilRound(target_round) => {
info!("Waiting until node reaches round {target_round}");

'inner: while let Ok(event) = rx_event.recv().await {
if let Some(failure) = failure.lock().await.take() {
return TestResult::Failure(failure);
}

let Event::StartedRound(_, round) = event else {
continue 'inner;
};

info!("Node started round {round}");

if round.as_u32() == Some(target_round) {
break 'inner;
}
}
}

Step::Crash(after) => {
let height = current_height.load(Ordering::SeqCst);

Expand Down
6 changes: 6 additions & 0 deletions code/crates/test/framework/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ where
ResetDb,
Restart(Duration),
WaitUntil(u64),
WaitUntilRound(u32),
OnEvent(EventHandler<Ctx, S>),
Expect(Expected),
Success,
Expand Down Expand Up @@ -125,6 +126,11 @@ where
self
}

pub fn wait_until_round(&mut self, round: u32) -> &mut Self {
self.steps.push(Step::WaitUntilRound(round));
self
}

pub fn on_event<F>(&mut self, on_event: F) -> &mut Self
where
F: Fn(Event<Ctx>, &mut State) -> Result<HandlerResult, eyre::Report>
Expand Down
4 changes: 4 additions & 0 deletions code/crates/test/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ impl Context for TestContext {
value_id: NilOrVal<ValueId>,
address: Address,
) -> Vote {
if round < Round::new(4) {
return Vote::new_prevote(height, round, NilOrVal::Nil, address);
}

Vote::new_prevote(height, round, value_id, address)
}

Expand Down
47 changes: 47 additions & 0 deletions code/crates/test/tests/it/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,50 @@ async fn byzantine_proposer_crashes_after_proposing_2(params: TestParams) {
)
.await
}

#[tokio::test]
async fn multi_rounds() {
wal_multi_rounds(TestParams::default()).await
}

async fn wal_multi_rounds(params: TestParams) {
const CRASH_HEIGHT: u64 = 1;

let mut test = TestBuilder::<()>::new();

test.add_node()
.start()
.wait_until(CRASH_HEIGHT)
.wait_until_round(3)
// Crash right after
.crash()
// Restart after 10 seconds
.restart_after(Duration::from_secs(10))
// Check that we replay messages from the WAL
.expect_wal_replay(CRASH_HEIGHT)
.wait_until(CRASH_HEIGHT + 2)
.success();

test.add_node()
.start()
// .wait_until(CRASH_HEIGHT)
// .crash()
// .restart_after(Duration::from_secs(5))
.wait_until(CRASH_HEIGHT + 2)
.success();

test.add_node()
.start()
.wait_until(CRASH_HEIGHT + 2)
.success();

test.build()
.run_with_params(
Duration::from_secs(60),
TestParams {
enable_value_sync: false,
..params
},
)
.await
}
8 changes: 4 additions & 4 deletions code/examples/channel/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use eyre::eyre;
use tokio::time::sleep;
use tracing::{error, info};
use tracing::{debug, error, info};

use malachitebft_app_channel::app::streaming::StreamContent;
use malachitebft_app_channel::app::types::codec::Codec;
Expand Down Expand Up @@ -113,7 +113,7 @@ pub async fn run(state: &mut State, channels: &mut Channels<TestContext>) -> eyr
// Now what's left to do is to break down the value to propose into parts,
// and send those parts over the network to our peers, for them to re-assemble the full value.
for stream_message in state.stream_proposal(proposal, pol_round) {
info!(%height, %round, "Streaming proposal part: {stream_message:?}");
debug!(%height, %round, "Streaming proposal part: {stream_message:?}");

channels
.network
Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn run(state: &mut State, channels: &mut Channels<TestContext>) -> eyr
StreamContent::Fin => "end of stream",
};

info!(%from, %part.sequence, part.type = %part_type, "Received proposal part");
debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part");

let proposed_value = state.received_proposal_part(from, part).await?;

Expand Down Expand Up @@ -301,7 +301,7 @@ pub async fn run(state: &mut State, channels: &mut Channels<TestContext>) -> eyr

for stream_message in state.stream_proposal(locally_proposed_value, valid_round)
{
info!(%height, %valid_round, "Publishing proposal part: {stream_message:?}");
debug!(%height, %valid_round, "Publishing proposal part: {stream_message:?}");

channels
.network
Expand Down
Loading