Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ pub struct Parameters {
#[serde(default = "Parameters::default_max_blocks_per_fetch")]
pub max_blocks_per_fetch: usize,

/// Time to wait during node start up until the node has synced the last proposed block via the
/// network peers. When set to `0` the sync mechanism is disabled. This property is meant to be
/// used for amnesia recovery.
#[serde(default = "Parameters::default_sync_last_known_own_block_timeout")]
pub sync_last_known_own_block_timeout: Duration,

/// Interval in milliseconds to probe highest received rounds of peers.
#[serde(default = "Parameters::default_round_prober_interval_ms")]
pub round_prober_interval_ms: u64,

/// Timeout in milliseconds for a round prober request.
#[serde(default = "Parameters::default_round_prober_request_timeout_ms")]
pub round_prober_request_timeout_ms: u64,

/// Proposing new block is stopped when the propagation delay is greater than this threshold.
/// Propagation delay is the difference between the round of the last proposed block and the
/// the highest round from this authority that is received by all validators in a quorum.
#[serde(default = "Parameters::default_propagation_delay_stop_proposal_threshold")]
pub propagation_delay_stop_proposal_threshold: u32,

/// The number of rounds of blocks to be kept in the Dag state cache per authority. The larger
/// the number the more the blocks that will be kept in memory allowing minimising any potential
/// disk access.
Expand Down Expand Up @@ -69,12 +89,6 @@ pub struct Parameters {
/// Tonic network settings.
#[serde(default = "TonicParameters::default")]
pub tonic: TonicParameters,

/// Time to wait during node start up until the node has synced the last proposed block via the
/// network peers. When set to `0` the sync mechanism is disabled. This property is meant to be
/// used for amnesia recovery.
#[serde(default = "Parameters::default_sync_last_known_own_block_timeout")]
pub sync_last_known_own_block_timeout: Duration,
}

impl Parameters {
Expand All @@ -97,6 +111,38 @@ impl Parameters {
Duration::from_millis(500)
}

pub(crate) fn default_max_blocks_per_fetch() -> usize {
if cfg!(msim) {
// Exercise hitting blocks per fetch limit.
10
} else {
1000
}
}

pub(crate) fn default_sync_last_known_own_block_timeout() -> Duration {
if cfg!(msim) {
Duration::from_millis(500)
} else {
// Here we prioritise liveness over the complete de-risking of block equivocation. 5 seconds
// in the majority of cases should be good enough for this given a healthy network.
Duration::from_secs(5)
}
}

pub(crate) fn default_round_prober_interval_ms() -> u64 {
5000
}

pub(crate) fn default_round_prober_request_timeout_ms() -> u64 {
2000
}

pub(crate) fn default_propagation_delay_stop_proposal_threshold() -> u32 {
// Propagation delay is usually 0 round in production.
20
}

pub(crate) fn default_dag_state_cached_rounds() -> u32 {
if cfg!(msim) {
// Exercise reading blocks from store.
Expand All @@ -119,30 +165,11 @@ impl Parameters {
}
}

pub(crate) fn default_max_blocks_per_fetch() -> usize {
if cfg!(msim) {
// Exercise hitting blocks per fetch limit.
10
} else {
1000
}
}

pub(crate) fn default_commit_sync_batches_ahead() -> usize {
// This is set to be a multiple of default commit_sync_parallel_fetches to allow fetching ahead,
// while keeping the total number of inflight fetches and unprocessed fetched commits limited.
80
}

pub(crate) fn default_sync_last_known_own_block_timeout() -> Duration {
if cfg!(msim) {
Duration::from_millis(500)
} else {
// Here we prioritise liveness over the complete de-risking of block equivocation. 5 seconds
// in the majority of cases should be good enough for this given a healthy network.
Duration::from_secs(5)
}
}
}

impl Default for Parameters {
Expand All @@ -152,10 +179,14 @@ impl Default for Parameters {
leader_timeout: Parameters::default_leader_timeout(),
min_round_delay: Parameters::default_min_round_delay(),
max_forward_time_drift: Parameters::default_max_forward_time_drift(),
dag_state_cached_rounds: Parameters::default_dag_state_cached_rounds(),
max_blocks_per_fetch: Parameters::default_max_blocks_per_fetch(),
sync_last_known_own_block_timeout:
Parameters::default_sync_last_known_own_block_timeout(),
round_prober_interval_ms: Parameters::default_round_prober_interval_ms(),
round_prober_request_timeout_ms: Parameters::default_round_prober_request_timeout_ms(),
propagation_delay_stop_proposal_threshold:
Parameters::default_propagation_delay_stop_proposal_threshold(),
dag_state_cached_rounds: Parameters::default_dag_state_cached_rounds(),
commit_sync_parallel_fetches: Parameters::default_commit_sync_parallel_fetches(),
commit_sync_batch_size: Parameters::default_commit_sync_batch_size(),
commit_sync_batches_ahead: Parameters::default_commit_sync_batches_ahead(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ max_forward_time_drift:
secs: 0
nanos: 500000000
max_blocks_per_fetch: 1000
sync_last_known_own_block_timeout:
secs: 5
nanos: 0
round_prober_interval_ms: 5000
round_prober_request_timeout_ms: 2000
propagation_delay_stop_proposal_threshold: 20
dag_state_cached_rounds: 500
commit_sync_parallel_fetches: 20
commit_sync_batch_size: 100
Expand All @@ -25,6 +31,3 @@ tonic:
connection_buffer_size: 33554432
excessive_message_size: 16777216
message_size_limit: 67108864
sync_last_known_own_block_timeout:
secs: 5
nanos: 0
18 changes: 18 additions & 0 deletions consensus/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ fn build_tonic_services(out_dir: &Path) {
.server_streaming()
.build(),
)
.method(
tonic_build::manual::Method::builder()
.name("get_latest_rounds")
.route_name("GetLatestRounds")
.input_type("crate::network::tonic_network::GetLatestRoundsRequest")
.output_type("crate::network::tonic_network::GetLatestRoundsResponse")
.codec_path(codec_path)
.build(),
)
.build();

tonic_build::manual::Builder::new()
Expand Down Expand Up @@ -128,6 +137,15 @@ fn build_anemo_services(out_dir: &Path) {
.codec_path(codec_path)
.build(),
)
.method(
anemo_build::manual::Method::builder()
.name("get_latest_rounds")
.route_name("GetLatestRounds")
.request_type("crate::network::anemo_network::GetLatestRoundsRequest")
.response_type("crate::network::anemo_network::GetLatestRoundsResponse")
.codec_path(codec_path)
.build(),
)
.build();

anemo_build::manual::Builder::new()
Expand Down
22 changes: 21 additions & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
anemo_network::AnemoManager, tonic_network::TonicManager, NetworkClient as _,
NetworkManager,
},
round_prober::{RoundProber, RoundProberHandle},
storage::rocksdb_store::RocksDBStore,
subscriber::Subscriber,
synchronizer::{Synchronizer, SynchronizerHandle},
Expand Down Expand Up @@ -137,6 +138,7 @@ where
transaction_client: Arc<TransactionClient>,
synchronizer: Arc<SynchronizerHandle>,
commit_syncer_handle: CommitSyncerHandle,
round_prober_handle: Option<RoundProberHandle>,
leader_timeout_handle: LeaderTimeoutTaskHandle,
core_thread_handle: CoreThreadHandle,
// Only one of broadcaster and subscriber gets created, depending on
Expand Down Expand Up @@ -259,7 +261,7 @@ where
);

let (core_dispatcher, core_thread_handle) =
ChannelCoreThreadDispatcher::start(core, context.clone());
ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
let core_dispatcher = Arc::new(core_dispatcher);
let leader_timeout_handle =
LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
Expand Down Expand Up @@ -287,6 +289,20 @@ where
)
.start();

let round_prober_handle = if context.protocol_config.consensus_round_prober() {
Some(
RoundProber::new(
context.clone(),
core_dispatcher.clone(),
dag_state.clone(),
network_client.clone(),
)
.start(),
)
} else {
None
};

let network_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
Expand Down Expand Up @@ -328,6 +344,7 @@ where
transaction_client: Arc::new(tx_client),
synchronizer,
commit_syncer_handle,
round_prober_handle,
leader_timeout_handle,
core_thread_handle,
broadcaster,
Expand All @@ -354,6 +371,9 @@ where
);
};
self.commit_syncer_handle.stop().await;
if let Some(round_prober_handle) = self.round_prober_handle.take() {
round_prober_handle.stop().await;
}
self.leader_timeout_handle.stop().await;
// Shutdown Core to stop block productions and broadcast.
// When using streaming, all subscribers to broadcasted blocks stop after this.
Expand Down
37 changes: 34 additions & 3 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {

Ok(result)
}

async fn handle_get_latest_rounds(&self, _peer: AuthorityIndex) -> ConsensusResult<Vec<Round>> {
fail_point_async!("consensus-rpc-response");

let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
// Own blocks do not go through the core dispatcher, so they need to be set separately.
highest_received_rounds[self.context.own_index] = self
.dag_state
.read()
.get_last_block_for_authority(self.context.own_index)
.round();

Ok(highest_received_rounds)
}
}

/// Atomically counts the number of active subscriptions to the block broadcast stream,
Expand All @@ -438,7 +452,7 @@ impl SubscriptionCounter {
*counter += 1;
if *counter == 1 {
self.dispatcher
.set_consumer_availability(true)
.set_subscriber_exists(true)
.map_err(|_| ConsensusError::Shutdown)?;
}
Ok(())
Expand All @@ -449,7 +463,7 @@ impl SubscriptionCounter {
*counter -= 1;
if *counter == 0 {
self.dispatcher
.set_consumer_availability(false)
.set_subscriber_exists(false)
.map_err(|_| ConsensusError::Shutdown)?;
}
Ok(())
Expand Down Expand Up @@ -622,12 +636,21 @@ mod tests {
Ok(Default::default())
}

fn set_consumer_availability(&self, _available: bool) -> Result<(), CoreError> {
fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
todo!()
}

fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
todo!()
}

fn highest_received_rounds(&self) -> Vec<Round> {
todo!()
}
}

#[derive(Default)]
Expand Down Expand Up @@ -682,6 +705,14 @@ mod tests {
) -> ConsensusResult<Vec<Bytes>> {
unimplemented!("Unimplemented")
}

async fn get_latest_rounds(
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
unimplemented!("Unimplemented")
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down
8 changes: 8 additions & 0 deletions consensus/core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ mod test {
) -> ConsensusResult<Vec<Bytes>> {
unimplemented!("Unimplemented")
}

async fn get_latest_rounds(
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
unimplemented!("Unimplemented")
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down
8 changes: 8 additions & 0 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,14 @@ mod tests {
) -> ConsensusResult<Vec<Bytes>> {
unimplemented!("Unimplemented")
}

async fn get_latest_rounds(
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
unimplemented!("Unimplemented")
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down
Loading
Loading