Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 77 additions & 19 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,24 @@ impl Core {
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) {
info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}");
info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}");
info!(
"Received quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(received_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
info!(
"Accepted quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(accepted_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
self.ancestor_state_manager
.set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
info!("Propagation round delay set to: {delay}");
Expand Down Expand Up @@ -831,10 +847,8 @@ impl Core {
clock_round: Round,
smart_select: bool,
) -> Vec<VerifiedBlock> {
let _s = self
.context
.metrics
.node_metrics
let node_metrics = &self.context.metrics.node_metrics;
let _s = node_metrics
.scope_processing_time
.with_label_values(&["Core::smart_ancestors_to_propose"])
.start_timer();
Expand Down Expand Up @@ -904,7 +918,7 @@ impl Core {
}

if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
self.context.metrics.node_metrics.smart_selection_wait.inc();
node_metrics.smart_selection_wait.inc();
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return vec![];
}
Expand All @@ -924,32 +938,76 @@ impl Core {
debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "strong"])
.with_label_values(&[block_hostname, "timeout"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
}

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

// Include partially propagated blocks from excluded authorities, to help propagate the blocks
// across the network with less latency impact.
// TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic.
for (score, ancestor) in excluded_ancestors.iter() {
let excluded_author = ancestor.author();
let block_hostname = &self.context.committee.authority(excluded_author).hostname;
// A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
let mut accepted_low_quorum_round = self
.ancestor_state_manager
.accepted_quorum_round_per_authority[excluded_author]
.0;
// If the accepted quorum round of this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round the new block can include as an ancestor.
accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);

let last_included_round = self.last_included_ancestors[excluded_author]
.map(|block_ref| block_ref.round)
.unwrap_or(GENESIS_ROUND);
if last_included_round >= accepted_low_quorum_round {
trace!(
"Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}",
ancestor.reference(), last_included_round, accepted_low_quorum_round,
);
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
}

trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
// Include the ancestor block as it has been seen & accepted by a strong quorum.
let ancestor = if ancestor.round() == accepted_low_quorum_round {
ancestor.clone()
} else {
// Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated.
let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range(
excluded_author,
last_included_round + 1,
accepted_low_quorum_round + 1,
) else {
trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference());
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
};
ancestor
};
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference());
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "quorum"])
.inc();
}

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

info!(
"Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}",
ancestors_to_propose.len(),
Expand Down
98 changes: 86 additions & 12 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,38 @@ impl DagState {
blocks
}

/// Returns the last block proposed per authority with `round < end_round`.
// Retrieves the cached block within the range [start_round, end_round) from a given authority.
// NOTE: end_round must be greater than GENESIS_ROUND.
pub(crate) fn get_last_cached_block_in_range(
&self,
authority: AuthorityIndex,
start_round: Round,
end_round: Round,
) -> Option<VerifiedBlock> {
if end_round == GENESIS_ROUND {
panic!(
"Attempted to retrieve blocks earlier than the genesis round which is impossible"
);
}

let block_ref = self.recent_refs_by_authority[authority]
.range((
Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
Excluded(BlockRef::new(
end_round,
AuthorityIndex::MIN,
BlockDigest::MIN,
)),
))
.last()?;

self.recent_blocks.get(block_ref).cloned()
}

/// Returns the last block proposed per authority with `evicted round < round < end_round`.
/// The method is guaranteed to return results only when the `end_round` is not earlier of the
/// available cached data for each authority, otherwise the method will panic - it's the caller's
/// responsibility to ensure that is not requesting filtering for earlier rounds .
/// available cached data for each authority (evicted round + 1), otherwise the method will panic.
/// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
/// In case of equivocation for an authority's last slot only one block will be returned (the last in order).
pub(crate) fn get_last_cached_block_per_authority(
&self,
Expand Down Expand Up @@ -1957,7 +1985,7 @@ mod test {

#[rstest]
#[tokio::test]
async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) {
async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
// GIVEN
const CACHED_ROUNDS: Round = 2;
let (mut context, _) = Context::new_for_test(4);
Expand Down Expand Up @@ -2010,14 +2038,46 @@ mod test {

// WHEN search for the latest blocks
let end_round = 4;
let expected_rounds = vec![0, 1, 2, 3];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN starting from round 2
let start_round = 2;
let expected_rounds = [0, 0, 2, 3];

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 3);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
start_round,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
//
Expand All @@ -2027,13 +2087,27 @@ mod test {

// AND we request before round 3
let end_round = 3;
let expected_rounds = vec![0, 1, 2, 2];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 2);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}
}

#[tokio::test]
Expand Down
Loading