Skip to content

chore(test): Replay WAL before start round #898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions code/crates/core-consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod vote_set;
use proposal::on_proposal;
use propose::on_propose;
use proposed_value::on_proposed_value;
use start_height::reset_and_start_height;
use start_height::{on_prepare_height, on_start_height};
use sync::on_commit_certificate;
use timeout::on_timeout_elapsed;
use vote::on_vote;
Expand Down Expand Up @@ -48,9 +48,10 @@ where
Ctx: Context,
{
match input {
Input::StartHeight(height, validator_set) => {
reset_and_start_height(co, state, metrics, height, validator_set).await
Input::PrepareHeight(height, validator_set) => {
on_prepare_height(co, state, metrics, height, validator_set).await
}
Input::StartHeight(height) => on_start_height(co, state, metrics, height).await,
Input::Vote(vote) => on_vote(co, state, metrics, vote).await,
Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Input::Propose(value) => on_propose(co, state, metrics, value).await,
Expand Down
13 changes: 8 additions & 5 deletions code/crates/core-consensus/src/handle/start_height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use crate::prelude::*;
use crate::handle::driver::apply_driver_input;
use crate::handle::handle_input;

pub async fn reset_and_start_height<Ctx>(
pub async fn on_prepare_height<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
#[allow(unused_variables)] metrics: &Metrics,
height: Ctx::Height,
validator_set: Ctx::ValidatorSet,
) -> Result<(), Error<Ctx>>
Expand All @@ -24,18 +24,21 @@ where
debug_assert_eq!(state.driver.height(), height);
debug_assert_eq!(state.driver.round(), Round::Nil);

start_height(co, state, metrics, height).await
Ok(())
}

pub async fn start_height<Ctx>(
pub async fn on_start_height<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
#[allow(unused_variables)] metrics: &Metrics,
height: Ctx::Height,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
debug_assert_eq!(state.driver.height(), height);
debug_assert_eq!(state.driver.round(), Round::Nil);

let round = Round::new(0);
info!(%height, "Starting new height");

Expand Down
7 changes: 5 additions & 2 deletions code/crates/core-consensus/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ pub enum Input<Ctx>
where
Ctx: Context,
{
/// Start a new height with the given validator set
StartHeight(Ctx::Height, Ctx::ValidatorSet),
/// Prepare state for starting a new height with the given validator set
PrepareHeight(Ctx::Height, Ctx::ValidatorSet),

/// Start consensus for the given height
StartHeight(Ctx::Height),

/// Process a vote
Vote(SignedVote<Ctx>),
Expand Down
22 changes: 17 additions & 5 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,30 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::StartHeight(height, validator_set) => {
// Prepare the driver state for the new height,
// but do not start consensus yet.
let result = self
.process_input(
&myself,
state,
ConsensusInput::StartHeight(height, validator_set),
ConsensusInput::PrepareHeight(height, validator_set),
)
.await;

if let Err(e) = result {
error!(%height, "Error when preparing for height: {e}");
}

// Replay the WAL if necessary
if let Err(e) = self.check_and_replay_wal(&myself, state, height).await {
error!(%height, "Error when checking and replaying WAL: {e}");
}

// Start consensus
let result = self
.process_input(&myself, state, ConsensusInput::StartHeight(height))
.await;

if let Err(e) = result {
error!(%height, "Error when starting height: {e}");
}
Expand All @@ -307,10 +323,6 @@ where

self.tx_event.send(|| Event::StartedHeight(height));

if let Err(e) = self.check_and_replay_wal(&myself, state, height).await {
error!(%height, "Error when checking and replaying WAL: {e}");
}

self.process_buffered_msgs(&myself, state).await;

state.phase = Phase::Running;
Expand Down
1 change: 1 addition & 0 deletions code/crates/test/tests/it/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async fn proposer_crashes_after_proposing(params: TestParams) {
)
}
})
.wait_until(CRASH_HEIGHT + 2)
.success();

test.build()
Expand Down