Skip to content

Backfill peer attribution #7762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: fusaka-devnet-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,17 @@ impl<E: EthSpec> PeerDB<E> {
///
/// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the
/// required data.
///
/// If `allowed_peers` is `Some`, then filters for the epoch only for those peers.
pub fn synced_peers_for_epoch<'a>(
&'a self,
epoch: Epoch,
allowed_peers: &'a HashSet<PeerId>,
allowed_peers: Option<&'a HashSet<PeerId>>,
) -> impl Iterator<Item = &'a PeerId> {
self.peers
.iter()
.filter(move |(peer_id, info)| {
allowed_peers.contains(peer_id)
allowed_peers.is_none_or(|allowed| allowed.contains(peer_id))
&& info.is_connected()
&& match info.sync_status() {
SyncStatus::Synced { info } => {
Expand All @@ -270,7 +272,9 @@ impl<E: EthSpec> PeerDB<E> {
SyncStatus::Advanced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
_ => false,
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
})
.map(|(peer_id, _)| peer_id)
Expand Down Expand Up @@ -320,22 +324,36 @@ impl<E: EthSpec> PeerDB<E> {
}

/// Returns an iterator of all peers that are supposed to be custodying
/// the given subnet id that also belong to `allowed_peers`.
pub fn good_range_sync_custody_subnet_peer<'a>(
&'a self,
/// the given subnet id.
pub fn good_range_sync_custody_subnet_peers(
&self,
subnet: DataColumnSubnetId,
allowed_peers: &'a HashSet<PeerId>,
) -> impl Iterator<Item = &'a PeerId> {
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(peer_id, info)| {
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
allowed_peers.contains(peer_id) && info.is_connected() && is_custody_subnet_peer
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
})
.map(|(peer_id, _)| peer_id)
}

/// Returns `true` if the given peer is assigned to the given subnet.
/// else returns `false`
///
/// Returns `false` if peer doesn't exist in peerdb.
pub fn is_good_range_sync_custody_subnet_peer(
&self,
subnet: DataColumnSubnetId,
peer: &PeerId,
) -> bool {
if let Some(info) = self.peers.get(peer) {
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
} else {
false
}
}

/// Gives the ids of all known disconnected peers.
pub fn disconnected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers
Expand Down
138 changes: 135 additions & 3 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! sync as failed, log an error and attempt to retry once a new peer joins the node.

use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::manager::BatchProcessResult;
use crate::sync::network_context::{
RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
Expand All @@ -28,7 +29,7 @@ use std::collections::{
};
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
use types::{Epoch, EthSpec};
use types::{ColumnIndex, Epoch, EthSpec};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -223,9 +224,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.network_globals
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(self.to_be_downloaded, None)
.next()
.is_some()
// backfill can't progress if we do not have peers in the required subnets post peerdas.
&& self.good_peers_on_sampling_subnets(self.to_be_downloaded, network)
{
// If there are peers to resume with, begin the resume.
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
Expand Down Expand Up @@ -334,6 +337,48 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
err: RpcResponseError,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
match coupling_error {
CouplingError::PeerFailure {
error,
faulty_peers,
action,
} => {
debug!(?batch_id, error, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in faulty_peers {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
}

return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
CouplingError::ExceededMaxRetries(peers, action) => {
for peer in peers.iter() {
network.report_peer(
*peer,
*action,
"failed to return columns, exceeded retry attempts",
);
}
}
CouplingError::InternalError(msg) => {
debug!(?batch_id, msg, "Block components coupling internal error");
}
}
}
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
Expand Down Expand Up @@ -903,12 +948,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> Result<(), BackFillError> {
if matches!(self.state(), BackFillState::Paused) {
return Err(BackFillError::Paused);
}
if let Some(batch) = self.batches.get_mut(&batch_id) {
debug!(?batch_id, "Sending backfill batch");
let synced_peers = self
.network_globals
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(batch_id, None)
.cloned()
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -967,6 +1016,54 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
Ok(())
}

/// Retries partial column requests within the batch by creating new requests for the failed columns.
#[instrument(parent = None,
fields(service = "backfill_sync"),
name = "backfill_sync",
skip_all
)]
pub fn retry_partial_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
id: Id,
failed_columns: HashSet<ColumnIndex>,
mut failed_peers: HashSet<PeerId>,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
failed_peers.extend(&batch.failed_peers());
let req = batch.to_blocks_by_range_request().0;

let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();

match network.retry_columns_by_range(
id,
&synced_peers,
&failed_peers,
req,
&failed_columns,
) {
Ok(_) => {
debug!(
?batch_id,
id, "Retried column requests from different peers"
);
return Ok(());
}
Err(e) => {
debug!(?batch_id, id, e, "Failed to retry partial batch");
}
}
}
Ok(())
}

/// When resuming a chain, this function searches for batches that need to be re-downloaded and
/// transitions their state to redownload the batch.
#[instrument(parent = None,
Expand Down Expand Up @@ -1057,6 +1154,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return None;
}

if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
debug!("Waiting for peers to be available on custody column subnets");
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down Expand Up @@ -1089,6 +1191,36 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}

/// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
/// every sampling column subnet.
///
/// Returns `true` if peerdas isn't enabled for the epoch.
fn good_peers_on_sampling_subnets(
&self,
epoch: Epoch,
network: &SyncNetworkContext<T>,
) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all sampling column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.sampling_subnets()
.iter()
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_range_sync_custody_subnet_peers(*subnet_id)
.count();
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Resets the start epoch based on the beacon chain.
///
/// This errors if the beacon chain indicates that backfill sync has already completed or is
Expand Down
Loading
Loading