Skip to content

feat(code/engine): Store proposed values in WAL #896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
4da7504
feat(code/engine): Store locally proposed value in WAL and re-use it …
romac Mar 7, 2025
4065d06
Cleanup
romac Mar 7, 2025
6b70046
Merge all three WalAppend effects into one
romac Mar 7, 2025
b041f19
Cleanup
romac Mar 7, 2025
bf848d0
Merge branch 'main' into romac/wal-proposed-value
romac Mar 27, 2025
9dc0042
Add multi-round test with timeouts
romac Mar 27, 2025
4f201b2
Cleanup
romac Mar 27, 2025
580a450
Add a middleware to the test context to override vote building
romac Mar 27, 2025
d0eb574
Less noisy logs in test app
romac Mar 27, 2025
8e6dd5c
Merge remote-tracking branch 'origin/main' into romac/wal-proposed-value
romac Mar 28, 2025
23ab356
Add concept of middleware to change from the tests how votes and prop…
romac Mar 28, 2025
fb33d28
Merge remote-tracking branch 'origin/main' into romac/wal-proposed-value
romac Mar 28, 2025
1fe7b38
Deduplicate timeout elapsed code
romac Mar 28, 2025
a9f73f0
Replay proposed values before other entries
romac Mar 28, 2025
091c277
Split `StartHeight` into `PrepareHeight` and `StartHeight`
romac Mar 25, 2025
cf49f0e
Add check that we are in the right state before starting a height
romac Mar 25, 2025
fd34681
Actually replay the proposed values
romac Mar 28, 2025
2649113
Cleanup
romac Mar 28, 2025
f2462b0
Store proposed values for other rounds
romac Mar 28, 2025
254b9c5
Better naming
romac Mar 28, 2025
e0ccb97
Replay locally proposed value as an `ProposedValue` input instead of …
romac Mar 28, 2025
434cc37
Fix unused variable warning
romac Mar 31, 2025
0267c25
Merge branch 'main' into romac/wal-proposed-value
romac Apr 1, 2025
b142a49
Replay proposals
romac Apr 2, 2025
8416236
On GetValue output from the driver, re-use any full proposal proposed…
romac Apr 2, 2025
62019c9
Remove outdated check in Starknet tests
romac Apr 2, 2025
81d7709
Merge branch 'main' into romac/wal-proposed-value
romac Apr 2, 2025
9965158
Cleanup
romac Apr 2, 2025
2fce398
Merge branch 'main' into romac/wal-proposed-value
romac Apr 3, 2025
1debb22
Simplify WAL replay
romac Apr 3, 2025
7e1fda6
Re-enable checks in byzantine WAL tests
romac Apr 3, 2025
ed8554b
Cleanup
romac Apr 3, 2025
eb9c5dc
Cleanup protos
romac Apr 3, 2025
d426a5f
Replay ProposedValue exactly as stored in the WAL
ancazamfir Apr 3, 2025
cb696f6
Log proposed values to WAL only if not already seen and therefore
ancazamfir Apr 4, 2025
1868db0
Fix issue with last commit, store proposed values before getting the
ancazamfir Apr 4, 2025
66e7ca5
Fix comments
romac Apr 7, 2025
25c37a6
Ensure all timeouts can be decoded
romac Apr 7, 2025
130008a
Re-add check that was previously removed when we were replaying WAL b…
romac Apr 7, 2025
8f9679f
Merge `WalReplay` events into one
romac Apr 7, 2025
f3e06a9
Review comments
ancazamfir Apr 7, 2025
f92b07f
Apply suggestions from code review
ancazamfir Apr 7, 2025
adecaf7
Apply suggestions from code review
ancazamfir Apr 7, 2025
9d7ce77
Add comments to handler functions
ancazamfir Apr 7, 2025
6035541
Add comments to on_propose handler
ancazamfir Apr 7, 2025
a960ae4
Add encode and decode functions for all wal entries
ancazamfir Apr 7, 2025
31a926f
Merge branch 'main' into romac/wal-proposed-value
romac Apr 8, 2025
f631c01
Apply suggestion from code review
romac Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use malachitebft_core_types::*;

use crate::input::RequestId;
use crate::types::SignedConsensusMsg;
use crate::{ConsensusMsg, VoteExtensionError};
use crate::{ConsensusMsg, VoteExtensionError, WalEntry};

/// Provides a way to construct the appropriate [`Resume`] value to
/// resume execution after handling an [`Effect`].
Expand Down Expand Up @@ -182,15 +182,10 @@ where
resume::Continue,
),

/// Append a consensus message to the Write-Ahead Log for crash recovery
/// Append an entry to the Write-Ahead Log for crash recovery
///
/// Resume with: [`resume::Continue`]`
WalAppendMessage(SignedConsensusMsg<Ctx>, resume::Continue),

/// Append a timeout to the Write-Ahead Log for crash recovery
///
/// Resume with: [`resume::Continue`]`
WalAppendTimeout(Timeout, resume::Continue),
WalAppend(WalEntry<Ctx>, resume::Continue),

/// Allows the application to extend the pre-commit vote with arbitrary data.
///
Expand Down
25 changes: 23 additions & 2 deletions code/crates/core-consensus/src/full_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<Ctx: Context> FullProposalKeeper<Ctx> {
Self::default()
}

pub fn full_proposals_for_value(
pub fn proposals_for_value(
&self,
proposed_value: &ProposedValue<Ctx>,
) -> Vec<SignedProposal<Ctx>> {
Expand Down Expand Up @@ -157,7 +157,28 @@ impl<Ctx: Context> FullProposalKeeper<Ctx> {
None
}

#[allow(clippy::type_complexity)]
pub fn full_proposal_at_round_and_proposer(
&self,
height: &Ctx::Height,
round: Round,
proposer: &Ctx::Address,
) -> Option<&FullProposal<Ctx>> {
let entries = self
.keeper
.get(&(*height, round))
.filter(|entries| !entries.is_empty())?;

for entry in entries {
if let Entry::Full(p) = entry {
if p.proposal.validator_address() == proposer {
return Some(p);
}
}
}

None
}

pub fn get_value<'a>(
&self,
height: &Ctx::Height,
Expand Down
27 changes: 22 additions & 5 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use crate::handle::vote::on_vote;
use crate::prelude::*;
use crate::types::SignedConsensusMsg;
use crate::util::pretty::PrettyVal;
use crate::LocallyProposedValue;
use crate::VoteSyncMode;

use super::propose::on_propose;

#[async_recursion]
pub async fn apply_driver_input<Ctx>(
co: &Co<Ctx>,
Expand Down Expand Up @@ -322,12 +325,26 @@ where
}

DriverOutput::GetValue(height, round, timeout) => {
info!(%height, %round, "Requesting value");
if let Some(full_proposal) =
state.full_proposal_at_round_and_proposer(&height, round, state.address())
{
info!(%height, %round, "Using already existing value");

let local_value = LocallyProposedValue {
height: full_proposal.proposal.height(),
round: full_proposal.proposal.round(),
value: full_proposal.builder_value.clone(),
};

perform!(
co,
Effect::GetValue(height, round, timeout, Default::default())
);
on_propose(co, state, metrics, local_value).await?;
} else {
info!(%height, %round, "Requesting value from application");

perform!(
co,
Effect::GetValue(height, round, timeout, Default::default())
);
}

Ok(())
}
Expand Down
24 changes: 7 additions & 17 deletions code/crates/core-consensus/src/handle/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use crate::handle::signature::verify_signature;
use crate::handle::validator_set::get_validator_set;
use crate::input::Input;
use crate::prelude::*;
use crate::types::ConsensusMsg;
use crate::types::{ConsensusMsg, ProposedValue, SignedConsensusMsg, WalEntry};
use crate::util::pretty::PrettyProposal;
use crate::{ProposedValue, SignedConsensusMsg};

pub async fn on_proposal<Ctx>(
co: &Co<Ctx>,
Expand Down Expand Up @@ -38,23 +37,14 @@ where
}

info!(
height = %consensus_height,
%proposal_height,
address = %proposer_address,
consensus.height = %consensus_height,
proposal.height = %proposal_height,
proposal.round = %proposal_round,
proposer = %proposer_address,
message = %PrettyProposal::<Ctx>(&signed_proposal.message),
"Received proposal"
);

// Queue messages if driver is not initialized, or if they are for higher height.
// Process messages received for the current height.
// Drop all others.
if state.driver.round() == Round::Nil {
debug!("Received proposal at round -1, queuing for later");
state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal));

return Ok(());
}

if proposal_height > consensus_height {
debug!("Received proposal for higher height, queuing for later");
state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal));
Expand All @@ -72,8 +62,8 @@ where
if state.params.value_payload.include_proposal() {
perform!(
co,
Effect::WalAppendMessage(
SignedConsensusMsg::Proposal(signed_proposal.clone()),
Effect::WalAppend(
WalEntry::ConsensusMsg(SignedConsensusMsg::Proposal(signed_proposal.clone())),
Default::default()
)
);
Expand Down
51 changes: 30 additions & 21 deletions code/crates/core-consensus/src/handle/propose.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,61 @@
use crate::prelude::*;

use crate::handle::driver::apply_driver_input;
use crate::types::{LocallyProposedValue, ProposedValue};
use crate::types::{LocallyProposedValue, ProposedValue, WalEntry};

pub async fn on_propose<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
value: LocallyProposedValue<Ctx>,
local_value: LocallyProposedValue<Ctx>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
let LocallyProposedValue {
height,
round,
value,
} = value;

if state.driver.height() != height {
if state.driver.height() != local_value.height {
warn!(
"Ignoring proposal for height {height}, current height: {}",
"Ignoring proposal for height {}, current height: {}",
local_value.height,
state.driver.height()
);

return Ok(());
}

if state.driver.round() != round {
if state.driver.round() != local_value.round {
warn!(
"Ignoring propose value for round {round}, current round: {}",
"Ignoring proposal for round {}, current round: {}",
local_value.round,
state.driver.round()
);

return Ok(());
}

#[cfg(feature = "metrics")]
metrics.consensus_start();

state.store_value(&ProposedValue {
height,
round,
let proposed_value = ProposedValue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove Propose input and replace it with ProposedValue input?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment answer your question?

height: local_value.height,
round: local_value.round,
valid_round: Round::Nil,
proposer: state.address().clone(),
value: value.clone(),
value: local_value.value.clone(),
validity: Validity::Valid,
});
};

state.store_value(&proposed_value);

#[cfg(feature = "metrics")]
metrics.consensus_start();

apply_driver_input(co, state, metrics, DriverInput::ProposeValue(round, value)).await
perform!(
co,
Effect::WalAppend(WalEntry::ProposedValue(proposed_value), Default::default())
);

apply_driver_input(
co,
state,
metrics,
DriverInput::ProposeValue(local_value.round, local_value.value),
)
.await
}
14 changes: 11 additions & 3 deletions code/crates/core-consensus/src/handle/proposed_value.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::prelude::*;

use crate::handle::driver::apply_driver_input;
use crate::types::ProposedValue;
use crate::types::{ProposedValue, WalEntry};

use super::decide::try_decide;
use super::signature::sign_proposal;
Expand Down Expand Up @@ -54,7 +54,15 @@ where
state.store_proposal(signed_proposal);
}

let proposals = state.full_proposals_for_value(&proposed_value);
let validity = proposed_value.validity;
let proposals = state.proposals_for_value(&proposed_value);

// Append the proposed value to the WAL, so it can be used for recovery.
perform!(
co,
Effect::WalAppend(WalEntry::ProposedValue(proposed_value), Default::default())
);

for signed_proposal in proposals {
debug!(
proposal.height = %signed_proposal.height(),
Expand All @@ -66,7 +74,7 @@ where
co,
state,
metrics,
DriverInput::Proposal(signed_proposal, proposed_value.validity),
DriverInput::Proposal(signed_proposal, validity),
)
.await?;
}
Expand Down
7 changes: 5 additions & 2 deletions code/crates/core-consensus/src/handle/start_height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ where
debug_assert_eq!(state.height(), height);
debug_assert_eq!(state.round(), Round::Nil);

start_height(co, state, metrics, height).await
on_start_height(co, state, metrics, height).await
}

pub async fn start_height<Ctx>(
async fn on_start_height<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
Expand All @@ -36,6 +36,9 @@ pub async fn start_height<Ctx>(
where
Ctx: Context,
{
debug_assert_eq!(state.height(), height);
debug_assert_eq!(state.round(), Round::Nil);

let round = Round::new(0);
info!(%height, "Starting new height");

Expand Down
6 changes: 5 additions & 1 deletion code/crates/core-consensus/src/handle/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::handle::driver::apply_driver_input;
use crate::handle::rebroadcast_timeout::on_rebroadcast_timeout;
use crate::handle::step_timeout::on_step_limit_timeout;
use crate::prelude::*;
use crate::types::WalEntry;

pub async fn on_timeout_elapsed<Ctx>(
co: &Co<Ctx>,
Expand Down Expand Up @@ -41,7 +42,10 @@ where
) {
// Persist the timeout in the Write-ahead Log.
// Time-limit and rebroadcast timeouts are not persisted because they only occur when consensus is stuck.
perform!(co, Effect::WalAppendTimeout(timeout, Default::default()));
perform!(
co,
Effect::WalAppend(WalEntry::Timeout(timeout), Default::default())
);
}

apply_driver_input(co, state, metrics, DriverInput::TimeoutElapsed(timeout)).await?;
Expand Down
7 changes: 3 additions & 4 deletions code/crates/core-consensus/src/handle/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::handle::signature::verify_signature;
use crate::handle::validator_set::get_validator_set;
use crate::input::Input;
use crate::prelude::*;
use crate::types::ConsensusMsg;
use crate::types::{ConsensusMsg, SignedConsensusMsg, WalEntry};
use crate::util::pretty::PrettyVote;
use crate::SignedConsensusMsg;

pub async fn on_vote<Ctx>(
co: &Co<Ctx>,
Expand Down Expand Up @@ -82,8 +81,8 @@ where
// Append the vote to the Write-ahead Log
perform!(
co,
Effect::WalAppendMessage(
SignedConsensusMsg::Vote(signed_vote.clone()),
Effect::WalAppend(
WalEntry::ConsensusMsg(SignedConsensusMsg::Vote(signed_vote.clone())),
Default::default()
)
);
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum Input<Ctx>
where
Ctx: Context,
{
/// Start a new height with the given validator set.
/// Start consensus for the given height with the given validator set
StartHeight(Ctx::Height, Ctx::ValidatorSet),

/// Process a vote received over the network.
Expand Down
14 changes: 12 additions & 2 deletions code/crates/core-consensus/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,22 @@ where
.full_proposal_at_round_and_value(height, round, &value.id())
}

pub fn full_proposals_for_value(
pub fn full_proposal_at_round_and_proposer(
&self,
height: &Ctx::Height,
round: Round,
address: &Ctx::Address,
) -> Option<&FullProposal<Ctx>> {
self.full_proposal_keeper
.full_proposal_at_round_and_proposer(height, round, address)
}

pub fn proposals_for_value(
&self,
proposed_value: &ProposedValue<Ctx>,
) -> Vec<SignedProposal<Ctx>> {
self.full_proposal_keeper
.full_proposals_for_value(proposed_value)
.proposals_for_value(proposed_value)
}

pub fn store_proposal(&mut self, new_proposal: SignedProposal<Ctx>) {
Expand Down
Loading
Loading