From 2519c827cc8785e05a561b08e02179c032a4afb7 Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Tue, 7 Jan 2025 17:57:53 -0800 Subject: [PATCH 1/2] [consensus] logging and metrics improvements (#20807) ## Description - Include peer hostname in round prober errors. - Fix block proposal interval metric. - Log missing blocks and peers fetched from in info. - Add `synchronizer_current_missing_blocks_by_authority` metric. ## Test plan CI PT --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- consensus/core/src/core.rs | 31 ++++++++++++---------- consensus/core/src/metrics.rs | 21 ++++++++++----- consensus/core/src/round_prober.rs | 9 ++++--- consensus/core/src/synchronizer.rs | 41 +++++++++++++++++++++++++----- 4 files changed, 72 insertions(+), 30 deletions(-) diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 70c2909483a5a..2950e85b5aa63 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -497,9 +497,26 @@ impl Core { .node_metrics .proposed_block_size .observe(serialized.len() as f64); - // Unnecessary to verify own blocks. + // Own blocks are assumed to be valid. let verified_block = VerifiedBlock::new_verified(signed_block, serialized); + // Record the interval from last proposal, before accepting the proposed block. + let last_proposed_block = self.last_proposed_block(); + if last_proposed_block.round() > 0 { + self.context + .metrics + .node_metrics + .block_proposal_interval + .observe( + Duration::from_millis( + verified_block + .timestamp_ms() + .saturating_sub(last_proposed_block.timestamp_ms()), + ) + .as_secs_f64(), + ); + } + // Accept the block into BlockManager and DagState. let (accepted_blocks, missing) = self .block_manager @@ -513,18 +530,6 @@ impl Core { // Ensure the new block and its ancestors are persisted, before broadcasting it. self.dag_state.write().flush(); - let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms()); - let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms()); - self.context - .metrics - .node_metrics - .block_proposal_interval - .observe( - current_proposal_duration - .saturating_sub(previous_proposal_duration) - .as_secs_f64(), - ); - // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 27920071e1763..3e15953d96102 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -16,7 +16,7 @@ use crate::network::metrics::NetworkMetrics; const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.000_001, 0.000_050, 0.000_100, 0.000_500, 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.5, 3.0, 3.5, - 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., + 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., 20., 30., 60., 120., ]; const NUM_BUCKETS: &[f64] = &[ @@ -126,6 +126,7 @@ pub(crate) struct NodeMetrics { pub(crate) fetch_blocks_scheduler_skipped: IntCounterVec, pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec, pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec, + pub(crate) synchronizer_current_missing_blocks_by_authority: IntGaugeVec, pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, @@ -356,18 +357,24 @@ impl NodeMetrics { &["peer", "type"], registry, ).unwrap(), - synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( - "synchronizer_fetched_blocks_by_authority", - "Number of fetched blocks per block author via the synchronizer", - &["authority", "type"], - registry, - ).unwrap(), synchronizer_missing_blocks_by_authority: register_int_counter_vec_with_registry!( "synchronizer_missing_blocks_by_authority", "Number of missing blocks per block author, as observed by the synchronizer during periodic sync.", &["authority"], registry, ).unwrap(), + synchronizer_current_missing_blocks_by_authority: register_int_gauge_vec_with_registry!( + "synchronizer_current_missing_blocks_by_authority", + "Current number of missing blocks per block author, as observed by the synchronizer during periodic sync.", + &["authority"], + registry, + ).unwrap(), + synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_authority", + "Number of fetched blocks per block author via the synchronizer", + &["authority", "type"], + registry, + ).unwrap(), last_known_own_block_round: register_int_gauge_with_registry!( "last_known_own_block_round", "The highest round of our own block as this has been synced from peers during an amnesia recovery", diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 9099b6b368c94..6a05d42bb0282 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -170,6 +170,7 @@ impl RoundProber { tokio::select! { result = requests.next() => { let Some((peer, result)) = result else { break }; + let peer_name = &self.context.committee.authority(peer).hostname; match result { Ok(Ok((received, accepted))) => { if received.len() == self.context.committee.size() @@ -177,7 +178,7 @@ impl RoundProber { highest_received_rounds[peer] = received; } else { node_metrics.round_prober_request_errors.with_label_values(&["invalid_received_rounds"]).inc(); - tracing::warn!("Received invalid number of received rounds from peer {}", peer); + tracing::warn!("Received invalid number of received rounds from peer {}", peer_name); } if self @@ -188,7 +189,7 @@ impl RoundProber { highest_accepted_rounds[peer] = accepted; } else { node_metrics.round_prober_request_errors.with_label_values(&["invalid_accepted_rounds"]).inc(); - tracing::warn!("Received invalid number of accepted rounds from peer {}", peer); + tracing::warn!("Received invalid number of accepted rounds from peer {}", peer_name); } } @@ -205,11 +206,11 @@ impl RoundProber { // own probing failures and actual propagation issues. Ok(Err(err)) => { node_metrics.round_prober_request_errors.with_label_values(&["failed_fetch"]).inc(); - tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer, err); + tracing::warn!("Failed to get latest rounds from peer {}: {:?}", peer_name, err); }, Err(_) => { node_metrics.round_prober_request_errors.with_label_values(&["timeout"]).inc(); - tracing::warn!("Timeout while getting latest rounds from peer {}", peer); + tracing::warn!("Timeout while getting latest rounds from peer {}", peer_name); }, } } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 3b6d1c4f4fa9f..8f3c071762233 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -893,10 +893,9 @@ impl Synchronizer Synchronizer, network_client: Arc, missing_blocks: BTreeSet, - _core_dispatcher: Arc, dag_state: Arc>, ) -> Vec<(BlocksGuard, Vec, AuthorityIndex)> { const MAX_PEERS: usize = 3; @@ -945,6 +943,7 @@ impl Synchronizer>(); + let mut missing_blocks_per_authority = vec![0; context.committee.size()]; for block in &missing_blocks { missing_blocks_per_authority[block.author] += 1; @@ -959,6 +958,12 @@ impl Synchronizer Synchronizer>(); // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) { + info!( + "Fetching {} missing blocks from peer {}: {}", + block_refs.len(), + peer_hostname, + block_refs + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(", ") + ); request_futures.push(Self::fetch_blocks_request( network_client.clone(), peer, @@ -1005,9 +1021,11 @@ impl Synchronizer + Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => { + let peer_hostname = &context.committee.authority(peer_index).hostname; match response { Ok(fetched_blocks) => { + info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname); results.push((blocks_guard, fetched_blocks, peer_index)); // no more pending requests are left, just break the loop @@ -1020,6 +1038,16 @@ impl Synchronizer>() + .join(", ") + ); request_futures.push(Self::fetch_blocks_request( network_client.clone(), next_peer, @@ -1035,9 +1063,10 @@ impl Synchronizer { - debug!("Timed out while fetching all the blocks"); + debug!("Timed out while fetching missing blocks"); break; } } From e54e6c95b15554b19e5421f871d7fa5b9162bbdb Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Wed, 8 Jan 2025 19:55:41 -0800 Subject: [PATCH 2/2] [consensus improve logging when receiving blocks (#20825) ## Description Improve logging of blocks received through streaming and synchronizations. ## Test plan CI PT --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- consensus/config/src/committee.rs | 8 +----- consensus/core/src/authority_node.rs | 21 +++++++++++++--- consensus/core/src/authority_service.rs | 16 ++++++------ consensus/core/src/block_manager.rs | 4 +-- consensus/core/src/core.rs | 5 +++- consensus/core/src/dag_state.rs | 10 +++++--- consensus/core/src/subscriber.rs | 25 +++++++++++++------ consensus/core/src/synchronizer.rs | 21 ++++++++-------- .../consensus_manager/mysticeti_manager.rs | 2 +- 9 files changed, 69 insertions(+), 43 deletions(-) diff --git a/consensus/config/src/committee.rs b/consensus/config/src/committee.rs index 0d941a3aa39fd..b82fcbdcaab59 100644 --- a/consensus/config/src/committee.rs +++ b/consensus/config/src/committee.rs @@ -175,15 +175,9 @@ impl AuthorityIndex { } } -// TODO: re-evaluate formats for production debugging. impl Display for AuthorityIndex { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if self.value() < 26 { - let c = (b'A' + self.value() as u8) as char; - f.write_str(c.to_string().as_str()) - } else { - write!(f, "[{:02}]", self.value()) - } + write!(f, "[{}]", self.value()) } } diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 706f8154f49e8..e08a62a7646da 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -4,6 +4,7 @@ use std::{sync::Arc, time::Instant}; use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; +use itertools::Itertools; use parking_lot::RwLock; use prometheus::Registry; use sui_protocol_config::{ConsensusNetwork, ProtocolConfig}; @@ -176,11 +177,25 @@ where registry: Registry, boot_counter: u64, ) -> Self { + assert!( + committee.is_valid_index(own_index), + "Invalid own index {}", + own_index + ); + let own_hostname = &committee.authority(own_index).hostname; + info!( + "Starting consensus authority {} {}, {:?}, boot counter {}", + own_index, own_hostname, protocol_config.version, boot_counter + ); info!( - "Starting consensus authority {}\n{:#?}\n{:#?}\n{:?}\nBoot counter: {}", - own_index, committee, parameters, protocol_config.version, boot_counter + "Consensus authorities: {}", + committee + .authorities() + .map(|(i, a)| format!("{}: {}", i, a.hostname)) + .join(", ") ); - assert!(committee.is_valid_index(own_index)); + info!("Consensus parameters: {:?}", parameters); + info!("Consensus committee: {:?}", committee); let context = Arc::new(Context::new( own_index, committee, diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 7ad8d37a04c2d..b22331d6d07cc 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -11,7 +11,7 @@ use parking_lot::RwLock; use sui_macros::fail_point_async; use tokio::{sync::broadcast, time::sleep}; use tokio_util::sync::ReusableBoxFuture; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, warn}; use crate::{ block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND}, @@ -114,8 +114,8 @@ impl NetworkService for AuthorityService { return Err(e); } let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); - - trace!("Received block {verified_block} via send block."); + let block_ref = verified_block.reference(); + debug!("Received block {} via send block.", block_ref); // Reject block with timestamp too far in the future. let now = self.context.clock.timestamp_utc_ms(); @@ -130,12 +130,12 @@ impl NetworkService for AuthorityService { .inc(); debug!( "Block {:?} timestamp ({} > {}) is too far in the future, rejected.", - verified_block.reference(), + block_ref, verified_block.timestamp_ms(), now, ); return Err(ConsensusError::BlockRejected { - block_ref: verified_block.reference(), + block_ref, reason: format!( "Block timestamp is too far in the future: {} > {}", verified_block.timestamp_ms(), @@ -154,7 +154,7 @@ impl NetworkService for AuthorityService { .inc_by(forward_time_drift.as_millis() as u64); debug!( "Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms", - verified_block.reference(), + block_ref, verified_block.timestamp_ms(), now, forward_time_drift.as_millis(), @@ -189,12 +189,12 @@ impl NetworkService for AuthorityService { .inc(); debug!( "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})", - verified_block.reference(), + block_ref, last_commit_index, quorum_commit_index, ); return Err(ConsensusError::BlockRejected { - block_ref: verified_block.reference(), + block_ref, reason: format!( "Last commit index is lagging quorum commit index too much ({} < {})", last_commit_index, quorum_commit_index, diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index e5d9112b319e4..4e54065c2b49c 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -107,8 +107,8 @@ impl BlockManager { let block = match self.try_accept_one_block(block) { TryAcceptResult::Accepted(block) => block, TryAcceptResult::Suspended(ancestors_to_fetch) => { - trace!( - "Missing ancestors for block {block_ref}: {}", + debug!( + "Missing ancestors to fetch for block {block_ref}: {}", ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") ); missing_blocks.extend(ancestors_to_fetch); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 2950e85b5aa63..87e2eacdf50d7 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -279,7 +279,10 @@ impl Core { }; if !missing_block_refs.is_empty() { - debug!("Missing block refs: {:?}", missing_block_refs); + trace!( + "Missing block refs: {}", + missing_block_refs.iter().map(|b| b.to_string()).join(", ") + ); } Ok(missing_block_refs) diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 2c09b29655710..b4981b2ae2e66 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -1378,7 +1378,9 @@ mod test { } #[tokio::test] - #[should_panic(expected = "Attempted to check for slot A8 that is <= the last evicted round 8")] + #[should_panic( + expected = "Attempted to check for slot [0]8 that is <= the last evicted round 8" + )] async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() { /// Only keep elements up to 2 rounds before the last committed round const CACHED_ROUNDS: Round = 2; @@ -1420,7 +1422,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3" + expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3" )] async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() { /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold @@ -2036,7 +2038,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" + expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]" )] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() { // GIVEN @@ -2083,7 +2085,7 @@ mod test { #[tokio::test] #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" + expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]" )] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() { // GIVEN diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index 3782ba74cba6d..e60c3fad77a3b 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -160,7 +160,10 @@ impl Subscriber { .await { Ok(blocks) => { - debug!("Subscribed to peer {} after {} attempts", peer, retries); + debug!( + "Subscribed to peer {} {} after {} attempts", + peer, peer_hostname, retries + ); context .metrics .node_metrics @@ -170,7 +173,10 @@ impl Subscriber { blocks } Err(e) => { - debug!("Failed to subscribe to blocks from peer {}: {}", peer, e); + debug!( + "Failed to subscribe to blocks from peer {} {}: {}", + peer, peer_hostname, e + ); context .metrics .node_metrics @@ -182,7 +188,6 @@ impl Subscriber { }; // Now can consider the subscription successful - let peer_hostname = &context.committee.authority(peer).hostname; context .metrics .node_metrics @@ -206,12 +211,15 @@ impl Subscriber { match e { ConsensusError::BlockRejected { block_ref, reason } => { debug!( - "Failed to process block from peer {} for block {:?}: {}", - peer, block_ref, reason + "Failed to process block from peer {} {} for block {:?}: {}", + peer, peer_hostname, block_ref, reason ); } _ => { - info!("Invalid block received from peer {}: {}", peer, e,); + info!( + "Invalid block received from peer {} {}: {}", + peer, peer_hostname, e + ); } } } @@ -219,7 +227,10 @@ impl Subscriber { retries = 0; } None => { - debug!("Subscription to blocks from peer {} ended", peer); + debug!( + "Subscription to blocks from peer {} {} ended", + peer, peer_hostname + ); retries += 1; break 'stream; } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 8f3c071762233..d112abce7ff08 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -440,7 +440,7 @@ impl Synchronizer, ) { const MAX_RETRIES: u32 = 5; - + let peer_hostname = &context.committee.authority(peer_index).hostname; let mut requests = FuturesUnordered::new(); loop { @@ -464,14 +464,14 @@ impl Synchronizer { if retries <= MAX_RETRIES { requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, FETCH_REQUEST_TIMEOUT, retries)) } else { - warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index}."); + warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}."); // we don't necessarily need to do, but dropping the guard here to unlock the blocks drop(blocks_guard); } @@ -558,8 +558,9 @@ impl Synchronizer Synchronizer Synchronizer Synchronizer { - info!("Fetched {} blocks from peer {}", fetched_blocks.len(), peer_hostname); results.push((blocks_guard, fetched_blocks, peer_index)); // no more pending requests are left, just break the loop @@ -1039,7 +1040,7 @@ impl Synchronizer