diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index 5cffb4e2fd8..02ac485c7c9 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -173,7 +173,8 @@ jobs: # Tests checkpoint syncing to a live network (current fork) and a running devnet (usually next scheduled fork) checkpoint-sync-test: name: checkpoint-sync-test-${{ matrix.network }} - runs-on: ubuntu-latest + # Use self-hosted runner for Fusaka devnet testing as GitHub hosted ones aren't able to keep up with the chain. + runs-on: ${{ github.repository == 'sigp/lighthouse' && matrix.network == 'devnet' && fromJson('["self-hosted", "linux", "Kurtosis", "large"]') || 'ubuntu-latest' }} needs: dockerfile-ubuntu if: contains(github.event.pull_request.labels.*.name, 'syncing') continue-on-error: true diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 6559b247241..7dd4e6544d8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -253,15 +253,17 @@ impl PeerDB { /// /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the /// required data. + /// + /// If `allowed_peers` is `Some`, then filters for the epoch only for those peers. pub fn synced_peers_for_epoch<'a>( &'a self, epoch: Epoch, - allowed_peers: &'a HashSet, + allowed_peers: Option<&'a HashSet>, ) -> impl Iterator { self.peers .iter() .filter(move |(peer_id, info)| { - allowed_peers.contains(peer_id) + allowed_peers.is_none_or(|allowed| allowed.contains(peer_id)) && info.is_connected() && match info.sync_status() { SyncStatus::Synced { info } => { @@ -270,7 +272,9 @@ impl PeerDB { SyncStatus::Advanced { info } => { info.has_slot(epoch.end_slot(E::slots_per_epoch())) } - _ => false, + SyncStatus::IrrelevantPeer + | SyncStatus::Behind { .. } + | SyncStatus::Unknown => false, } }) .map(|(peer_id, _)| peer_id) @@ -320,22 +324,36 @@ impl PeerDB { } /// Returns an iterator of all peers that are supposed to be custodying - /// the given subnet id that also belong to `allowed_peers`. - pub fn good_range_sync_custody_subnet_peer<'a>( - &'a self, + /// the given subnet id. + pub fn good_range_sync_custody_subnet_peers( + &self, subnet: DataColumnSubnetId, - allowed_peers: &'a HashSet, - ) -> impl Iterator { + ) -> impl Iterator { self.peers .iter() - .filter(move |(peer_id, info)| { + .filter(move |(_, info)| { // The custody_subnets hashset can be populated via enr or metadata - let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); - allowed_peers.contains(peer_id) && info.is_connected() && is_custody_subnet_peer + info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) }) .map(|(peer_id, _)| peer_id) } + /// Returns `true` if the given peer is assigned to the given subnet. + /// else returns `false` + /// + /// Returns `false` if peer doesn't exist in peerdb. + pub fn is_good_range_sync_custody_subnet_peer( + &self, + subnet: DataColumnSubnetId, + peer: &PeerId, + ) -> bool { + if let Some(info) = self.peers.get(peer) { + info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) + } else { + false + } + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ba66e41aca3..44870e1dedc 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -9,6 +9,7 @@ //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, @@ -28,7 +29,7 @@ use std::collections::{ }; use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; -use types::{Epoch, EthSpec}; +use types::{ColumnIndex, Epoch, EthSpec}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -223,9 +224,11 @@ impl BackFillSync { .network_globals .peers .read() - .synced_peers() + .synced_peers_for_epoch(self.to_be_downloaded, None) .next() .is_some() + // backfill can't progress if we do not have peers in the required subnets post peerdas. + && self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { // If there are peers to resume with, begin the resume. debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync"); @@ -334,6 +337,48 @@ impl BackFillSync { err: RpcResponseError, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { + match coupling_error { + CouplingError::PeerFailure { + error, + faulty_peers, + action, + } => { + debug!(?batch_id, error, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in faulty_peers { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); + } + CouplingError::ExceededMaxRetries(peers, action) => { + for peer in peers.iter() { + network.report_peer( + *peer, + *action, + "failed to return columns, exceeded retry attempts", + ); + } + } + CouplingError::InternalError(msg) => { + debug!(?batch_id, msg, "Block components coupling internal error"); + } + } + } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer @@ -903,12 +948,16 @@ impl BackFillSync { network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { + if matches!(self.state(), BackFillState::Paused) { + return Err(BackFillError::Paused); + } if let Some(batch) = self.batches.get_mut(&batch_id) { + debug!(?batch_id, "Sending backfill batch"); let synced_peers = self .network_globals .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, None) .cloned() .collect::>(); @@ -967,6 +1016,54 @@ impl BackFillSync { Ok(()) } + /// Retries partial column requests within the batch by creating new requests for the failed columns. + #[instrument(parent = None, + fields(service = "backfill_sync"), + name = "backfill_sync", + skip_all + )] + pub fn retry_partial_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + id: Id, + failed_columns: HashSet, + mut failed_peers: HashSet, + ) -> Result<(), BackFillError> { + if let Some(batch) = self.batches.get_mut(&batch_id) { + failed_peers.extend(&batch.failed_peers()); + let req = batch.to_blocks_by_range_request().0; + + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + match network.retry_columns_by_range( + id, + &synced_peers, + &failed_peers, + req, + &failed_columns, + ) { + Ok(_) => { + debug!( + ?batch_id, + id, "Retried column requests from different peers" + ); + return Ok(()); + } + Err(e) => { + debug!(?batch_id, id, e, "Failed to retry partial batch"); + } + } + } + Ok(()) + } + /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. #[instrument(parent = None, @@ -1057,6 +1154,11 @@ impl BackFillSync { return None; } + if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { + debug!("Waiting for peers to be available on custody column subnets"); + return None; + } + let batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch match self.batches.entry(batch_id) { @@ -1089,6 +1191,36 @@ impl BackFillSync { } } + /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in + /// every sampling column subnet. + /// + /// Returns `true` if peerdas isn't enabled for the epoch. + fn good_peers_on_sampling_subnets( + &self, + epoch: Epoch, + network: &SyncNetworkContext, + ) -> bool { + if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { + // Require peers on all sampling column subnets before sending batches + let peers_on_all_custody_subnets = network + .network_globals() + .sampling_subnets() + .iter() + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_range_sync_custody_subnet_peers(*subnet_id) + .count(); + peer_count > 0 + }); + peers_on_all_custody_subnets + } else { + true + } + } + /// Resets the start epoch based on the beacon chain. /// /// This errors if the beacon chain indicates that backfill sync has already completed or is diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 4653daa44a4..7ed31d601fc 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -17,6 +17,7 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + attempt: usize, } enum ByRangeRequest { @@ -39,9 +40,16 @@ enum RangeBlockDataRequest { } #[derive(Debug)] -pub struct CouplingError { - pub(crate) msg: String, - pub(crate) column_and_peer: Option<(Vec<(ColumnIndex, PeerId)>, PeerAction)>, +pub(crate) enum CouplingError { + InternalError(String), + /// The peer we requested the data from was faulty/malicious + PeerFailure { + error: String, + faulty_peers: Vec<(ColumnIndex, PeerId)>, + action: PeerAction, + }, + /// The batch exceeded the max retries + ExceededMaxRetries(Vec, PeerAction), } impl RangeBlockComponentsRequest { @@ -73,9 +81,14 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + attempt: 0, } } + pub fn attempt(&self) -> usize { + self.attempt + } + /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed /// request for some columns. pub fn reinsert_failed_column_requests( @@ -151,7 +164,7 @@ impl RangeBlockComponentsRequest { return None; }; - match &mut self.block_data_request { + let resp = match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) } @@ -195,19 +208,26 @@ impl RangeBlockComponentsRequest { spec, ); - if let Err(err) = &resp { - if let Some((peers, _)) = &err.column_and_peer { - for (_, peer) in peers.iter() { - // find the req id associated with the peer and - // delete it from the entries - requests.retain(|&k, _| k.peer != *peer); - } + if let Err(CouplingError::PeerFailure { + error: _, + faulty_peers, + action: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries + requests.retain(|&k, _| k.peer != *peer); } } Some(resp) } - } + }; + + // Increment the attempt once this function returns the response or errors + self.attempt += 1; + resp } fn responses_with_blobs( @@ -229,9 +249,8 @@ impl RangeBlockComponentsRequest { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { - msg: "Missing next blob".to_string(), - column_and_peer: None, + blob_list.push(blob_iter.next().ok_or_else(|| { + CouplingError::InternalError("Missing next blob".to_string()) })?); } @@ -239,16 +258,14 @@ impl RangeBlockComponentsRequest { for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { - return Err(CouplingError { - msg: "Invalid blob index".to_string(), - column_and_peer: None, - }); + return Err(CouplingError::InternalError( + "Invalid blob index".to_string(), + )); }; if blob_opt.is_some() { - return Err(CouplingError { - msg: "Repeat blob index".to_string(), - column_and_peer: None, - }); + return Err(CouplingError::InternalError( + "Repeat blob index".to_string(), + )); } else { *blob_opt = Some(blob); } @@ -257,24 +274,20 @@ impl RangeBlockComponentsRequest { blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) - .map_err(|_| CouplingError { - msg: "Blobs returned exceeds max length".to_string(), - column_and_peer: None, + .map_err(|_| { + CouplingError::InternalError("Blobs returned exceeds max length".to_string()) })?; responses.push( - RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { - msg: format!("{e:?}"), - column_and_peer: None, - })?, + RpcBlock::new(None, block, Some(blobs)) + .map_err(|e| CouplingError::InternalError(format!("{e:?}")))?, ) } // if accumulated sidecars is not empty, throw an error. if blob_iter.next().is_some() { - return Err(CouplingError { - msg: "Received sidecars that don't pair well".to_string(), - column_and_peer: None, - }); + return Err(CouplingError::InternalError( + "Received sidecars that don't pair well".to_string(), + )); } Ok(responses) @@ -300,10 +313,9 @@ impl RangeBlockComponentsRequest { .insert(index, column) .is_some() { - return Err(CouplingError { - msg: format!("Repeated column block_root {block_root:?} index {index}"), - column_and_peer: None, - }); + return Err(CouplingError::InternalError(format!( + "Repeated column block_root {block_root:?} index {index}" + ))); } } @@ -316,20 +328,11 @@ impl RangeBlockComponentsRequest { rpc_blocks.push(if block.num_expected_blobs() > 0 { let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { - // This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675 - // which allows blobs to not match blocks. - // TODO(das): on the initial version of PeerDAS the beacon chain does not check - // rpc custody requirements and dropping this check can allow the block to have - // an inconsistent DB. - - // For now, we always assume that the block peer is right. - // This is potentially dangerous as we can get isolated on a chain with a - // malicious block peer. - // TODO: fix this by checking the proposer signature before downloading columns. let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError { - msg: format!("No columns for block {block_root:?} with data"), - column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), + return Err(CouplingError::PeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + action: PeerAction::LowToleranceError, }); }; @@ -337,26 +340,19 @@ impl RangeBlockComponentsRequest { let mut naughty_peers = vec![]; for index in expects_custody_columns { if let Some(data_column) = data_columns_by_index.remove(index) { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { - // Penalize the peer for claiming to have the columns but not returning - // them let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError { - msg: format!("Internal error, no request made for column {}", index), - column_and_peer: None, - }); + return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); }; naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { - return Err(CouplingError { - msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)), + return Err(CouplingError::PeerFailure { + error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + faulty_peers: naughty_peers, + action: PeerAction::LowToleranceError, }); } @@ -364,7 +360,7 @@ impl RangeBlockComponentsRequest { if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); // log the error but don't return an error, we can still progress with extra columns. - tracing::error!( + tracing::debug!( ?block_root, ?remaining_indices, "Not all columns consumed for block" @@ -372,10 +368,7 @@ impl RangeBlockComponentsRequest { } RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| CouplingError { - msg: format!("{:?}", e), - column_and_peer: None, - })? + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) @@ -387,7 +380,7 @@ impl RangeBlockComponentsRequest { let remaining_roots = data_columns_by_block.keys().collect::>(); // log the error but don't return an error, we can still progress with responses. // this is most likely an internal error with overrequesting or a client bug. - tracing::error!(?remaining_roots, "Not all columns consumed for block"); + tracing::debug!(?remaining_roots, "Not all columns consumed for block"); } Ok(rpc_blocks) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a62b8f7382d..9801f9e0e32 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -55,6 +55,9 @@ use types::{ pub mod custody; mod requests; +/// Max retries for block components after which we fail the batch. +pub const MAX_BLOCK_COMPONENT_RETRIES: usize = 3; + #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -448,14 +451,14 @@ impl SyncNetworkContext { /// the batch. pub fn retry_columns_by_range( &mut self, - request_id: Id, + id: Id, peers: &HashSet, peers_to_deprioritize: &HashSet, request: BlocksByRangeRequest, failed_columns: &HashSet, ) -> Result<(), String> { let Some(requester) = self.components_by_range_requests.keys().find_map(|r| { - if r.id == request_id { + if r.id == id { Some(r.requester) } else { None @@ -468,6 +471,8 @@ impl SyncNetworkContext { debug!( ?failed_columns, + ?id, + ?requester, "Retrying only failed column requests from other peers" ); @@ -482,10 +487,7 @@ impl SyncNetworkContext { .map_err(|e| format!("{:?}", e))?; // Reuse the id for the request that received partially correct responses - let id = ComponentsByRangeRequestId { - id: request_id, - requester, - }; + let id = ComponentsByRangeRequestId { id, requester }; let data_column_requests = columns_by_range_peers_to_request .into_iter() @@ -694,18 +696,16 @@ impl SyncNetworkContext { match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { request.add_blocks(req_id, blocks).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) }) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { request.add_blobs(req_id, blobs).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) }) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { @@ -713,10 +713,9 @@ impl SyncNetworkContext { request .add_custody_columns(req_id, custody_columns) .map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) }) }) } @@ -726,10 +725,32 @@ impl SyncNetworkContext { return Some(Err(e)); } - if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) { - if blocks_result.is_ok() { + let range_req = entry.get_mut(); + if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + if let Err(CouplingError::PeerFailure { + action, + error, + faulty_peers, + }) = &blocks_result + { + // Remove the entry and retry the batch if it equals + if range_req.attempt() == MAX_BLOCK_COMPONENT_RETRIES { + debug!( + entry=?entry.key(), + msg = error, + "Request exceeded max retries, failing batch" + ); + entry.remove(); + return Some(Err(RpcResponseError::BlockComponentCouplingError( + CouplingError::ExceededMaxRetries( + faulty_peers.iter().map(|r| r.1).collect(), + *action, + ), + ))); + }; + } else { // remove the entry only if it coupled successfully with - // no errors + // or if its an internal error entry.remove(); } // If the request is finished, dequeue everything diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 0e9178f0f81..25b3390e35c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -827,32 +827,46 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { - if let RpcResponseError::BlockComponentCouplingError(CouplingError { - column_and_peer, - msg, - }) = &err - { - debug!(?batch_id, msg, "Block components coupling error"); - // Note: we don't fail the batch here because a `CouplingError` is - // recoverable by requesting from other honest peers. - if let Some((column_and_peer, action)) = column_and_peer { - let mut failed_columns = HashSet::new(); - let mut failed_peers = HashSet::new(); - for (column, peer) in column_and_peer { - failed_columns.insert(*column); - failed_peers.insert(*peer); + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { + match coupling_error { + CouplingError::PeerFailure { + error, + faulty_peers, + action, + } => { + debug!(?batch_id, error, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in faulty_peers { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); } - for peer in failed_peers.iter() { - network.report_peer(*peer, *action, "failed to return columns"); + CouplingError::ExceededMaxRetries(peers, action) => { + for peer in peers.iter() { + network.report_peer( + *peer, + *action, + "failed to return columns, exceeded retry attempts", + ); + } + } + CouplingError::InternalError(msg) => { + debug!(?batch_id, msg, "Block components coupling internal error"); } - - return self.retry_partial_batch( - network, - batch_id, - request_id, - failed_columns, - failed_peers, - ); } } // A batch could be retried without the peer failing the request (disconnecting/ @@ -911,14 +925,11 @@ impl SyncingChain { let (request, batch_type) = batch.to_blocks_by_range_request(); let failed_peers = batch.failed_peers(); - // TODO(das): we should request only from peers that are part of this SyncingChain. - // However, then we hit the NoPeer error frequently which causes the batch to fail and - // the SyncingChain to be dropped. We need to handle this case more gracefully. let synced_peers = network .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, &self.peers) + .synced_peers_for_epoch(batch_id, Some(&self.peers)) .cloned() .collect::>(); @@ -1098,11 +1109,13 @@ impl SyncingChain { .sampling_subnets() .iter() .all(|subnet_id| { - let peer_count = network - .network_globals() + let peer_db = network.network_globals().peers.read(); + let peer_count = self .peers - .read() - .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) + .iter() + .filter(|peer| { + peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) + }) .count(); peer_count > 0 });