diff --git a/consensus/config/src/committee.rs b/consensus/config/src/committee.rs index 0d941a3aa39fd..b82fcbdcaab59 100644 --- a/consensus/config/src/committee.rs +++ b/consensus/config/src/committee.rs @@ -175,15 +175,9 @@ impl AuthorityIndex { } } -// TODO: re-evaluate formats for production debugging. impl Display for AuthorityIndex { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if self.value() < 26 { - let c = (b'A' + self.value() as u8) as char; - f.write_str(c.to_string().as_str()) - } else { - write!(f, "[{:02}]", self.value()) - } + write!(f, "[{}]", self.value()) } } diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 706f8154f49e8..e08a62a7646da 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -4,6 +4,7 @@ use std::{sync::Arc, time::Instant}; use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; +use itertools::Itertools; use parking_lot::RwLock; use prometheus::Registry; use sui_protocol_config::{ConsensusNetwork, ProtocolConfig}; @@ -176,11 +177,25 @@ where registry: Registry, boot_counter: u64, ) -> Self { + assert!( + committee.is_valid_index(own_index), + "Invalid own index {}", + own_index + ); + let own_hostname = &committee.authority(own_index).hostname; + info!( + "Starting consensus authority {} {}, {:?}, boot counter {}", + own_index, own_hostname, protocol_config.version, boot_counter + ); info!( - "Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}", - own_index, committee, parameters, protocol_config.version, boot_counter + "Consensus authorities: {}", + committee + .authorities() + .map(|(i, a)| format!("{}: {}", i, a.hostname)) + .join(", ") ); - assert!(committee.is_valid_index(own_index)); + info!("Consensus parameters: {:?}", parameters); + info!("Consensus committee: {:?}", committee); let context = Arc::new(Context::new( own_index, committee, diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 7ad8d37a04c2d..b22331d6d07cc 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -11,7 +11,7 @@ use parking_lot::RwLock; use sui_macros::fail_point_async; use tokio::{sync::broadcast, time::sleep}; use tokio_util::sync::ReusableBoxFuture; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, warn}; use crate::{ block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND}, @@ -114,8 +114,8 @@ impl NetworkService for AuthorityService { return Err(e); } let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); - - trace!("Received block {verified_block} via send block."); + let block_ref = verified_block.reference(); + debug!("Received block {} via send block.", block_ref); // Reject block with timestamp too far in the future. let now = self.context.clock.timestamp_utc_ms(); @@ -130,12 +130,12 @@ impl NetworkService for AuthorityService { .inc(); debug!( "Block {:?} timestamp ({} > {}) is too far in the future, rejected.", - verified_block.reference(), + block_ref, verified_block.timestamp_ms(), now, ); return Err(ConsensusError::BlockRejected { - block_ref: verified_block.reference(), + block_ref, reason: format!( "Block timestamp is too far in the future: {} > {}", verified_block.timestamp_ms(), @@ -154,7 +154,7 @@ impl NetworkService for AuthorityService { .inc_by(forward_time_drift.as_millis() as u64); debug!( "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms", - verified_block.reference(), + block_ref, verified_block.timestamp_ms(), now, forward_time_drift.as_millis(), @@ -189,12 +189,12 @@ impl NetworkService for AuthorityService { .inc(); debug!( "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})", - verified_block.reference(), + block_ref, last_commit_index, quorum_commit_index, ); return Err(ConsensusError::BlockRejected { - block_ref: verified_block.reference(), + block_ref, reason: format!( "Last commit index is lagging quorum commit index too much ({} < {})", last_commit_index, quorum_commit_index, diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index e5d9112b319e4..4e54065c2b49c 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -107,8 +107,8 @@ impl BlockManager { let block = match self.try_accept_one_block(block) { TryAcceptResult::Accepted(block) => block, TryAcceptResult::Suspended(ancestors_to_fetch) => { - trace!( - "Missing ancestors for block {block_ref}: {}", + debug!( + "Missing ancestors to fetch for block {block_ref}: {}", ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") ); missing_blocks.extend(ancestors_to_fetch); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 70c2909483a5a..87e2eacdf50d7 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -279,7 +279,10 @@ impl Core { }; if !missing_block_refs.is_empty() { - debug!("Missing block refs: {:?}", missing_block_refs); + trace!( + "Missing block refs: {}", + missing_block_refs.iter().map(|b| b.to_string()).join(", ") + ); } Ok(missing_block_refs) @@ -497,9 +500,26 @@ impl Core { .node_metrics .proposed_block_size .observe(serialized.len() as f64); - // Unnecessary to verify own blocks. + // Own blocks are assumed to be valid. let verified_block = VerifiedBlock::new_verified(signed_block, serialized); + // Record the interval from last proposal, before accepting the proposed block. + let last_proposed_block = self.last_proposed_block(); + if last_proposed_block.round() > 0 { + self.context + .metrics + .node_metrics + .block_proposal_interval + .observe( + Duration::from_millis( + verified_block + .timestamp_ms() + .saturating_sub(last_proposed_block.timestamp_ms()), + ) + .as_secs_f64(), + ); + } + // Accept the block into BlockManager and DagState. let (accepted_blocks, missing) = self .block_manager @@ -513,18 +533,6 @@ impl Core { // Ensure the new block and its ancestors are persisted, before broadcasting it. self.dag_state.write().flush(); - let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms()); - let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms()); - self.context - .metrics - .node_metrics - .block_proposal_interval - .observe( - current_proposal_duration - .saturating_sub(previous_proposal_duration) - .as_secs_f64(), - ); - // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 2c09b29655710..b4981b2ae2e66 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -1378,7 +1378,9 @@ mod test { } #[tokio::test] - #[should_panic(expected = "Attempted to check for slot A8 that is <= the last evicted round 8")] + #[should_panic( + expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8" + )] async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() { /// Only keep elements up to 2 rounds before the last committed round const CACHED_ROUNDS: Round = 2; @@ -1420,7 +1422,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3" + expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3" )] async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() { /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold @@ -2036,7 +2038,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" + expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]" )] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() { // GIVEN @@ -2083,7 +2085,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" + expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]" )] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() { // GIVEN diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 27920071e1763..3e15953d96102 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -16,7 +16,7 @@ use crate::network::metrics::NetworkMetrics; const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.000_001, 0.000_050, 0.000_100, 0.000_500, 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.5, 3.0, 3.5, - 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., + 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., 20., 30., 60., 120., ]; const NUM_BUCKETS: &[f64] = &[ @@ -126,6 +126,7 @@ pub(crate) struct NodeMetrics { pub(crate) fetch_blocks_scheduler_skipped: IntCounterVec, pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec, pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec, + pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec, pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, @@ -356,18 +357,24 @@ impl NodeMetrics { &["peer", "type"], registry, ).unwrap(), - synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( - "synchronizer_fetched_blocks_by_authority", - "Number of fetched blocks per block author via the synchronizer", - &["authority", "type"], - registry, - ).unwrap(), synchronizer_missing_blocks_by_authority: register_int_counter_vec_with_registry!( "synchronizer_missing_blocks_by_authority", "Number of missing blocks per block author, as observed by the synchronizer during periodic sync.", &["authority"], registry, ).unwrap(), + synchronizer_current_missing_blocks_by_authority: register_int_gauge_vec_with_registry!( + "synchronizer_current_missing_blocks_by_authority", + "Current number of missing blocks per block author, as observed by the synchronizer during periodic sync.", + &["authority"], + registry, + ).unwrap(), + synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_authority", + "Number of fetched blocks per block author via the synchronizer", + &["authority", "type"], + registry, + ).unwrap(), last_known_own_block_round: register_int_gauge_with_registry!( "last_known_own_block_round", "The highest round of our own block as this has been synced from peers during an amnesia recovery", diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 9099b6b368c94..6a05d42bb0282 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -170,6 +170,7 @@ impl RoundProber { tokio::select! { result = requests.next() => { let Some((peer, result)) = result else { break }; + let peer_name = &self.context.committee.authority(peer).hostname; match result { Ok(Ok((received, accepted))) => { if received.len() == self.context.committee.size() @@ -177,7 +178,7 @@ impl RoundProber { highest_received_rounds[peer] = received; } else { node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc(); - tracing::warn!("Received invalid number of received rounds from peer {}", peer); + tracing::warn!("Received invalid number of received rounds from peer {}", peer_name); } if self @@ -188,7 +189,7 @@ impl RoundProber { highest_accepted_rounds[peer] = accepted; } else { node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc(); - tracing::warn!("Received invalid number of accepted rounds from peer {}", peer); + tracing::warn!("Received invalid number of accepted rounds from peer {}", peer_name); } } @@ -205,11 +206,11 @@ impl RoundProber { // own probing failures and actual propagation issues. Ok(Err(err)) => { node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc(); - tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer, err); + tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer_name, err); }, Err(_) => { node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc(); - tracing::warn!("Timeout while getting latest rounds from peer {}", peer); + tracing::warn!("Timeout while getting latest rounds from peer {}", peer_name); }, } } diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index 3782ba74cba6d..e60c3fad77a3b 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -160,7 +160,10 @@ impl Subscriber { .await { Ok(blocks) => { - debug!("Subscribed to peer {} after {} attempts", peer, retries); + debug!( + "Subscribed to peer {} {} after {} attempts", + peer, peer_hostname, retries + ); context .metrics .node_metrics @@ -170,7 +173,10 @@ impl Subscriber { blocks } Err(e) => { - debug!("Failed to subscribe to blocks from peer {}: {}", peer, e); + debug!( + "Failed to subscribe to blocks from peer {} {}: {}", + peer, peer_hostname, e + ); context .metrics .node_metrics @@ -182,7 +188,6 @@ impl Subscriber { }; // Now can consider the subscription successful - let peer_hostname = &context.committee.authority(peer).hostname; context .metrics .node_metrics @@ -206,12 +211,15 @@ impl Subscriber { match e { ConsensusError::BlockRejected { block_ref, reason } => { debug!( - "Failed to process block from peer {} for block {:?}: {}", - peer, block_ref, reason + "Failed to process block from peer {} {} for block {:?}: {}", + peer, peer_hostname, block_ref, reason ); } _ => { - info!("Invalid block received from peer {}: {}", peer, e,); + info!( + "Invalid block received from peer {} {}: {}", + peer, peer_hostname, e + ); } } } @@ -219,7 +227,10 @@ impl Subscriber { retries = 0; } None => { - debug!("Subscription to blocks from peer {} ended", peer); + debug!( + "Subscription to blocks from peer {} {} ended", + peer, peer_hostname + ); retries += 1; break 'stream; } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 3b6d1c4f4fa9f..d112abce7ff08 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -440,7 +440,7 @@ impl Synchronizer, ) { const MAX_RETRIES: u32 = 5; - + let peer_hostname = &context.committee.authority(peer_index).hostname; let mut requests = FuturesUnordered::new(); loop { @@ -464,14 +464,14 @@ impl Synchronizer { if retries <= MAX_RETRIES { requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries)) } else { - warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index}."); + warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}."); // we don't necessarily need to do, but dropping the guard here to unlock the blocks drop(blocks_guard); } @@ -558,8 +558,9 @@ impl Synchronizer Synchronizer Synchronizer Synchronizer, network_client: Arc, missing_blocks: BTreeSet, - _core_dispatcher: Arc, dag_state: Arc>, ) -> Vec<(BlocksGuard, Vec, AuthorityIndex)> { const MAX_PEERS: usize = 3; @@ -945,6 +944,7 @@ impl Synchronizer>(); + let mut missing_blocks_per_authority = vec![0; context.committee.size()]; for block in &missing_blocks { missing_blocks_per_authority[block.author] += 1; @@ -959,6 +959,12 @@ impl Synchronizer Synchronizer>(); // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) { + info!( + "Periodic sync of {} missing blocks from peer {} {}: {}", + block_refs.len(), + peer, + peer_hostname, + block_refs + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(", ") + ); request_futures.push(Self::fetch_blocks_request( network_client.clone(), peer, @@ -1005,7 +1023,8 @@ impl Synchronizer + Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => { + let peer_hostname = &context.committee.authority(peer_index).hostname; match response { Ok(fetched_blocks) => { results.push((blocks_guard, fetched_blocks, peer_index)); @@ -1020,6 +1039,16 @@ impl Synchronizer>() + .join(", ") + ); request_futures.push(Self::fetch_blocks_request( network_client.clone(), next_peer, @@ -1035,9 +1064,10 @@ impl Synchronizer { - debug!("Timed out while fetching all the blocks"); + debug!("Timed out while fetching missing blocks"); break; } } diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index 01341f8a7376c..381ad4c115079 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -167,7 +167,7 @@ impl ConsensusManagerTrait for MysticetiManager { *boot_counter += 1; } else { info!( - "Node has not participated in previous run. Boot counter will not increment {}", + "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.", *boot_counter ); }