Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ impl AuthorityIndex {
}
}

// TODO: re-evaluate formats for production debugging.
impl Display for AuthorityIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.value() < 26 {
let c = (b'A' + self.value() as u8) as char;
f.write_str(c.to_string().as_str())
} else {
write!(f, "[{:02}]", self.value())
}
write!(f, "[{}]", self.value())
}
}

Expand Down
21 changes: 18 additions & 3 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Instant};

use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::Registry;
use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
Expand Down Expand Up @@ -176,11 +177,25 @@ where
registry: Registry,
boot_counter: u64,
) -> Self {
assert!(
committee.is_valid_index(own_index),
"Invalid own index {}",
own_index
);
let own_hostname = &committee.authority(own_index).hostname;
info!(
"Starting consensus authority {} {}, {:?}, boot counter {}",
own_index, own_hostname, protocol_config.version, boot_counter
);
info!(
"Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}",
own_index, committee, parameters, protocol_config.version, boot_counter
"Consensus authorities: {}",
committee
.authorities()
.map(|(i, a)| format!("{}: {}", i, a.hostname))
.join(", ")
);
assert!(committee.is_valid_index(own_index));
info!("Consensus parameters: {:?}", parameters);
info!("Consensus committee: {:?}", committee);
let context = Arc::new(Context::new(
own_index,
committee,
Expand Down
16 changes: 8 additions & 8 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::RwLock;
use sui_macros::fail_point_async;
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
Expand Down Expand Up @@ -114,8 +114,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

trace!("Received block {verified_block} via send block.");
let block_ref = verified_block.reference();
debug!("Received block {} via send block.", block_ref);

// Reject block with timestamp too far in the future.
let now = self.context.clock.timestamp_utc_ms();
Expand All @@ -130,12 +130,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Block timestamp is too far in the future: {} > {}",
verified_block.timestamp_ms(),
Expand All @@ -154,7 +154,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc_by(forward_time_drift.as_millis() as u64);
debug!(
"Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
verified_block.reference(),
block_ref,
verified_block.timestamp_ms(),
now,
forward_time_drift.as_millis(),
Expand Down Expand Up @@ -189,12 +189,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.inc();
debug!(
"Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
verified_block.reference(),
block_ref,
last_commit_index,
quorum_commit_index,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
block_ref,
reason: format!(
"Last commit index is lagging quorum commit index too much ({} < {})",
last_commit_index, quorum_commit_index,
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl BlockManager {
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
trace!(
"Missing ancestors for block {block_ref}: {}",
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
Expand Down
36 changes: 22 additions & 14 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ impl Core {
};

if !missing_block_refs.is_empty() {
debug!("Missing block refs: {:?}", missing_block_refs);
trace!(
"Missing block refs: {}",
missing_block_refs.iter().map(|b| b.to_string()).join(", ")
);
}

Ok(missing_block_refs)
Expand Down Expand Up @@ -497,9 +500,26 @@ impl Core {
.node_metrics
.proposed_block_size
.observe(serialized.len() as f64);
// Unnecessary to verify own blocks.
// Own blocks are assumed to be valid.
let verified_block = VerifiedBlock::new_verified(signed_block, serialized);

// Record the interval from last proposal, before accepting the proposed block.
let last_proposed_block = self.last_proposed_block();
if last_proposed_block.round() > 0 {
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
Duration::from_millis(
verified_block
.timestamp_ms()
.saturating_sub(last_proposed_block.timestamp_ms()),
)
.as_secs_f64(),
);
}

// Accept the block into BlockManager and DagState.
let (accepted_blocks, missing) = self
.block_manager
Expand All @@ -513,18 +533,6 @@ impl Core {
// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms());
let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms());
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
current_proposal_duration
.saturating_sub(previous_proposal_duration)
.as_secs_f64(),
);

// Now acknowledge the transactions for their inclusion to block
ack_transactions(verified_block.reference());

Expand Down
10 changes: 6 additions & 4 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,9 @@ mod test {
}

#[tokio::test]
#[should_panic(expected = "Attempted to check for slot A8 that is <= the last evicted round 8")]
#[should_panic(
expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
/// Only keep elements up to 2 rounds before the last committed round
const CACHED_ROUNDS: Round = 2;
Expand Down Expand Up @@ -1420,7 +1422,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3"
expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
)]
async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() {
/// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
Expand Down Expand Up @@ -2036,7 +2038,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
// GIVEN
Expand Down Expand Up @@ -2083,7 +2085,7 @@ mod test {

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
// GIVEN
Expand Down
21 changes: 14 additions & 7 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::network::metrics::NetworkMetrics;
const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
0.000_001, 0.000_050, 0.000_100, 0.000_500, 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25,
0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.5, 3.0, 3.5,
4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.,
4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., 20., 30., 60., 120.,
];

const NUM_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -126,6 +126,7 @@ pub(crate) struct NodeMetrics {
pub(crate) fetch_blocks_scheduler_skipped: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec,
pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec,
pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec,
pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec,
pub(crate) invalid_blocks: IntCounterVec,
pub(crate) rejected_blocks: IntCounterVec,
Expand Down Expand Up @@ -356,18 +357,24 @@ impl NodeMetrics {
&["peer", "type"],
registry,
).unwrap(),
synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_authority",
"Number of fetched blocks per block author via the synchronizer",
&["authority", "type"],
registry,
).unwrap(),
synchronizer_missing_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_missing_blocks_by_authority",
"Number of missing blocks per block author, as observed by the synchronizer during periodic sync.",
&["authority"],
registry,
).unwrap(),
synchronizer_current_missing_blocks_by_authority: register_int_gauge_vec_with_registry!(
"synchronizer_current_missing_blocks_by_authority",
"Current number of missing blocks per block author, as observed by the synchronizer during periodic sync.",
&["authority"],
registry,
).unwrap(),
synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_authority",
"Number of fetched blocks per block author via the synchronizer",
&["authority", "type"],
registry,
).unwrap(),
last_known_own_block_round: register_int_gauge_with_registry!(
"last_known_own_block_round",
"The highest round of our own block as this has been synced from peers during an amnesia recovery",
Expand Down
9 changes: 5 additions & 4 deletions consensus/core/src/round_prober.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,15 @@ impl<C: NetworkClient> RoundProber<C> {
tokio::select! {
result = requests.next() => {
let Some((peer, result)) = result else { break };
let peer_name = &self.context.committee.authority(peer).hostname;
match result {
Ok(Ok((received, accepted))) => {
if received.len() == self.context.committee.size()
{
highest_received_rounds[peer] = received;
} else {
node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc();
tracing::warn!("Received invalid number of received rounds from peer {}", peer);
tracing::warn!("Received invalid number of received rounds from peer {}", peer_name);
}

if self
Expand All @@ -188,7 +189,7 @@ impl<C: NetworkClient> RoundProber<C> {
highest_accepted_rounds[peer] = accepted;
} else {
node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc();
tracing::warn!("Received invalid number of accepted rounds from peer {}", peer);
tracing::warn!("Received invalid number of accepted rounds from peer {}", peer_name);
}
}

Expand All @@ -205,11 +206,11 @@ impl<C: NetworkClient> RoundProber<C> {
// own probing failures and actual propagation issues.
Ok(Err(err)) => {
node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc();
tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer, err);
tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer_name, err);
},
Err(_) => {
node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc();
tracing::warn!("Timeout while getting latest rounds from peer {}", peer);
tracing::warn!("Timeout while getting latest rounds from peer {}", peer_name);
},
}
}
Expand Down
25 changes: 18 additions & 7 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.await
{
Ok(blocks) => {
debug!("Subscribed to peer {} after {} attempts", peer, retries);
debug!(
"Subscribed to peer {} {} after {} attempts",
peer, peer_hostname, retries
);
context
.metrics
.node_metrics
Expand All @@ -170,7 +173,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
blocks
}
Err(e) => {
debug!("Failed to subscribe to blocks from peer {}: {}", peer, e);
debug!(
"Failed to subscribe to blocks from peer {} {}: {}",
peer, peer_hostname, e
);
context
.metrics
.node_metrics
Expand All @@ -182,7 +188,6 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
};

// Now can consider the subscription successful
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
Expand All @@ -206,20 +211,26 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
match e {
ConsensusError::BlockRejected { block_ref, reason } => {
debug!(
"Failed to process block from peer {} for block {:?}: {}",
peer, block_ref, reason
"Failed to process block from peer {} {} for block {:?}: {}",
peer, peer_hostname, block_ref, reason
);
}
_ => {
info!("Invalid block received from peer {}: {}", peer, e,);
info!(
"Invalid block received from peer {} {}: {}",
peer, peer_hostname, e
);
}
}
}
// Reset retries when a block is received.
retries = 0;
}
None => {
debug!("Subscription to blocks from peer {} ended", peer);
debug!(
"Subscription to blocks from peer {} {} ended",
peer, peer_hostname
);
retries += 1;
break 'stream;
}
Expand Down
Loading
Loading