Skip to content

feat(code): Implement Solution 2 for the RoundSync #1032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,11 @@ pub struct TimeoutConfig {
/// the vote synchronization protocol.
#[serde(with = "humantime_serde")]
pub timeout_step: Duration,

/// How long we wait after entering a round before starting
/// the rebroadcast liveness protocol
#[serde(with = "humantime_serde")]
pub timeout_rebroadcast: Duration,
}

impl TimeoutConfig {
Expand All @@ -550,7 +555,9 @@ impl TimeoutConfig {
TimeoutKind::PrevoteTimeLimit => self.timeout_step,
TimeoutKind::PrecommitTimeLimit => self.timeout_step,
// TODO - clarify the rebroadcast timeout duration
TimeoutKind::Rebroadcast => self.timeout_prevote,
TimeoutKind::Rebroadcast => {
self.timeout_propose + self.timeout_prevote + self.timeout_precommit
}
}
}

Expand All @@ -568,14 +575,21 @@ impl TimeoutConfig {

impl Default for TimeoutConfig {
fn default() -> Self {
let timeout_propose = Duration::from_secs(3);
let timeout_prevote = Duration::from_secs(1);
let timeout_precommit = Duration::from_secs(1);
let timeout_step = Duration::from_secs(2);
let timeout_rebroadcast = timeout_propose + timeout_prevote + timeout_precommit;

Self {
timeout_propose: Duration::from_secs(3),
timeout_propose,
timeout_propose_delta: Duration::from_millis(500),
timeout_prevote: Duration::from_secs(1),
timeout_prevote,
timeout_prevote_delta: Duration::from_millis(500),
timeout_precommit: Duration::from_secs(1),
timeout_precommit,
timeout_precommit_delta: Duration::from_millis(500),
timeout_step: Duration::from_secs(2),
timeout_step,
timeout_rebroadcast,
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ where
co,
Effect::StartRound(*height, *round, proposer.clone(), Default::default())
);

#[cfg(feature = "metrics")]
metrics.rebroadcast_timeouts.inc();

// Schedule rebroadcast timer if necessary
if state.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
let timeout = Timeout::rebroadcast(*round);
perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
}
}

DriverInput::ProposeValue(round, _) => {
Expand Down Expand Up @@ -396,13 +405,6 @@ where
);

state.set_last_vote(signed_vote);

// Schedule rebroadcast timer if necessary
if state.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
let timeout = Timeout::rebroadcast(state.driver.round());

perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
}
}

Ok(())
Expand Down
14 changes: 13 additions & 1 deletion code/crates/core-consensus/src/handle/liveness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ pub async fn on_round_certificate<Ctx>(
where
Ctx: Context,
{
info!(%certificate.height, %certificate.round, "Received round certificate");
info!(
%certificate.height,
%certificate.round,
"Received round certificate"
);

if certificate.height != state.height() {
warn!(
Expand Down Expand Up @@ -139,5 +143,13 @@ where
}
apply_driver_input(co, state, metrics, DriverInput::Vote(vote)).await?;
}

// Cancel rebroadcast timer
// TODO: Should do only if the round certificate is well formed, i.e. either PrecommitAny or SkipRound
perform!(
co,
Effect::CancelTimeout(Timeout::rebroadcast(certificate.round), Default::default())
);

Ok(())
}
10 changes: 5 additions & 5 deletions code/crates/core-consensus/src/handle/rebroadcast_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub async fn on_rebroadcast_timeout<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
timeout: Timeout,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
Expand All @@ -19,8 +18,8 @@ where
if let Some(vote) = state.last_signed_prevote.as_ref() {
warn!(
%height, %round, vote_height = %vote.height(), vote_round = %vote.round(),
"Rebroadcasting vote at {:?} step after {:?} timeout",
state.driver.step(), timeout.kind,
"Rebroadcasting vote at {:?} step",
state.driver.step()
);

perform!(
Expand All @@ -32,8 +31,8 @@ where
if let Some(vote) = state.last_signed_precommit.as_ref() {
warn!(
%height, %round, vote_height = %vote.height(), vote_round = %vote.round(),
"Rebroadcasting vote at {:?} step after {:?} timeout",
state.driver.step(), timeout.kind,
"Rebroadcasting vote at {:?} step",
state.driver.step()
);
perform!(
co,
Expand All @@ -60,6 +59,7 @@ where
#[cfg(feature = "metrics")]
metrics.rebroadcast_timeouts.inc();

let timeout = Timeout::rebroadcast(round);
perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/handle/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
apply_driver_input(co, state, metrics, DriverInput::TimeoutElapsed(timeout)).await?;

match timeout.kind {
TimeoutKind::Rebroadcast => on_rebroadcast_timeout(co, state, metrics, timeout).await,
TimeoutKind::Rebroadcast => on_rebroadcast_timeout(co, state, metrics).await,
TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => {
on_step_limit_timeout(co, state, metrics, timeout.round).await
}
Expand Down
11 changes: 9 additions & 2 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ impl Timeouts {
TimeoutKind::Precommit => self.config.timeout_precommit,
TimeoutKind::PrevoteTimeLimit => self.config.timeout_step,
TimeoutKind::PrecommitTimeLimit => self.config.timeout_step,
TimeoutKind::Rebroadcast => self.config.timeout_prevote,
TimeoutKind::Rebroadcast => {
self.config.timeout_propose
+ self.config.timeout_prevote
+ self.config.timeout_precommit
}
}
}

Expand All @@ -202,7 +206,10 @@ impl Timeouts {
TimeoutKind::Precommit => c.timeout_precommit += c.timeout_precommit_delta,
TimeoutKind::PrevoteTimeLimit => (),
TimeoutKind::PrecommitTimeLimit => (),
TimeoutKind::Rebroadcast => (),
TimeoutKind::Rebroadcast => {
c.timeout_rebroadcast +=
c.timeout_propose_delta + c.timeout_prevote_delta + c.timeout_precommit_delta
}
};
}
}
Expand Down
4 changes: 4 additions & 0 deletions code/crates/test/app/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ timeout_commit = "0s"
# Override with MALACHITE__CONSENSUS__TIMEOUT_STEP env variable
timeout_step = "2s"

# How long we wait after entering a round before starting the rebroadcast liveness protocol
# Override with MALACHITE__CONSENSUS__TIMEOUT_REBROADCAST env variable
timeout_rebroadcast = "5s"

# The message(s) required to carry the value payload.
# Available options are:
# - "parts-only": Full value is included in the proposal parts and there is no explicit Proposal message (default)
Expand Down
4 changes: 4 additions & 0 deletions code/examples/channel/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ timeout_commit = "0s"
# Override with MALACHITE__CONSENSUS__TIMEOUT_STEP env variable
timeout_step = "2s"

# How long we wait after entering a round before starting the rebroadcast liveness protocol
# Override with MALACHITE__CONSENSUS__TIMEOUT_REBROADCAST env variable
timeout_rebroadcast = "5s"

# The message(s) required to carry the value payload.
# Available options are:
# - "parts-only": Full value is included in the proposal parts and there is no explicit Proposal message (default)
Expand Down
7 changes: 4 additions & 3 deletions code/scripts/spawn.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ fi

# Environment variables
export MALACHITE__CONSENSUS__P2P__PROTOCOL__TYPE="gossipsub"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE="2s"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE_DELTA="1s"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE="3s"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE_DELTA="500ms"
export MALACHITE__CONSENSUS__TIMEOUT_PREVOTE="1s"
export MALACHITE__CONSENSUS__TIMEOUT_PREVOTE_DELTA="500ms"
export MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT="1s"
export MALACHITE__CONSENSUS__TIMEOUT_COMMIT="0s"
export MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT_DELTA="500ms"
# Set the timeout step to 2 seconds to trigger the vote sync and polka certificate faster
export MALACHITE__CONSENSUS__TIMEOUT_STEP="2s"
# Set to request-response to be able to sync polka certificates, "broadcast" does not yet send the certificates
Expand Down
Loading