Skip to content

Avoid attempting to serve blobs after Fulu fork #7756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.collect::<Vec<_>>();
let mut send_blob_count = 0;

let fulu_start_slot = self
.chain
.spec
.fulu_fork_epoch
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()));

let mut blob_list_results = HashMap::new();
for id in request.blob_ids.as_slice() {
let BlobIdentifier {
block_root: root,
index,
} = id;

// First attempt to get the blobs from the RPC cache.
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
// Check if the blob requested is from a Fulu slot, if so, skip the current blob id and proceed to the next
if let Some(fulu_slot) = fulu_start_slot {
if blob.slot() >= fulu_slot {
debug!(
%peer_id,
request_root = %root,
blob_slot = %blob.slot(),
%fulu_slot,
"BlobsByRoot request is at or after Fulu slot, returning empty response"
);
continue;
}
}

self.send_response(
peer_id,
inbound_request_id,
Response::BlobsByRoot(Some(blob)),
);
send_blob_count += 1;
} else {
let BlobIdentifier {
block_root: root,
index,
} = id;

let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self.chain.get_blobs_checking_early_attester_cache(root))
Expand All @@ -306,6 +326,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(blobs_sidecar_list) => {
'inner: for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == *index {
// Same logic as above to check for Fulu slot
if let Some(fulu_slot) = fulu_start_slot {
if blob_sidecar.slot() >= fulu_slot {
debug!(
%peer_id,
request_root = %root,
blob_slot = %blob_sidecar.slot(),
%fulu_slot,
"BlobsByRoot request is at or after Fulu slot, returning empty response"
);
break 'inner;
}
}

self.send_response(
peer_id,
inbound_request_id,
Expand Down Expand Up @@ -884,6 +918,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);

let request_start_slot = Slot::from(req.start_slot);
// This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot
let mut effective_count = req.count;

if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch {
let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch());
let request_end_slot = request_start_slot + req.count - 1;

// If the request_start_slot is at or after a Fulu slot, return empty response
if request_start_slot >= fulu_start_slot {
debug!(
%peer_id,
%request_start_slot,
%fulu_start_slot,
returned = 0,
"BlobsByRange request is at or after a Fulu slot, returning empty response"
);
return Ok(());
// For the case that the request slots spans across the Fulu fork slot
} else if request_start_slot < fulu_start_slot && request_end_slot >= fulu_start_slot {
effective_count = (fulu_start_slot - request_start_slot).as_u64();
debug!(
%peer_id,
%request_start_slot,
%fulu_start_slot,
requested = req.count,
returned = effective_count,
"BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots"
)
}
}

let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
Expand Down Expand Up @@ -921,7 +985,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

let block_roots =
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?;

let current_slot = self
.chain
Expand All @@ -948,7 +1012,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Due to skip slots, blobs could be out of the range, we ensure they
// are in the range before sending
if blob_sidecar.slot() >= request_start_slot
&& blob_sidecar.slot() < request_start_slot + req.count
&& blob_sidecar.slot() < request_start_slot + effective_count
{
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
Expand Down
163 changes: 153 additions & 10 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::{
discv5::enr::{self, CombinedKey},
Expand All @@ -34,11 +34,12 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, RuntimeVariableList,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot,
SubnetId,
};

type E = MainnetEthSpec;
Expand Down Expand Up @@ -417,15 +418,22 @@ impl TestRig {
}
}

pub fn enqueue_blobs_by_range_request(&self, count: u64) {
pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRangeRequest {
start_slot: 0,
count,
},
BlobsByRangeRequest { start_slot, count },
)
.unwrap();
}

pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList<BlobIdentifier>) {
self.network_beacon_processor
.send_blobs_by_roots_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRootRequest { blob_ids },
)
.unwrap();
}
Expand Down Expand Up @@ -1325,8 +1333,9 @@ async fn test_blobs_by_range() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
rig.enqueue_blobs_by_range_request(slot_count);
rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut blob_count = 0;
for slot in 0..slot_count {
Expand Down Expand Up @@ -1362,3 +1371,137 @@ async fn test_blobs_by_range() {
}
assert_eq!(blob_count, actual_count);
}

#[tokio::test]
async fn test_blobs_by_range_post_fulu_should_return_empty() {
// Only test for Fulu fork
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
// Post-Fulu should return 0 blobs
assert_eq!(0, actual_count);
}

#[tokio::test]
async fn test_blobs_by_range_spans_fulu_fork() {
// Only test for Electra & Fulu fork transition
if test_spec::<E>().electra_fork_epoch.is_none() {
return;
};
let mut spec = test_spec::<E>();
spec.fulu_fork_epoch = Some(Epoch::new(1));

let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await;

let start_slot = 16;
// This will span from epoch 0 (Electra) to epoch 1 (Fulu)
let slot_count = 32;

rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut blob_count = 0;
for slot in start_slot..slot_count {
let root = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap();
blob_count += root
.map(|root| {
rig.chain
.get_blobs(&root)
.map(|list| list.len())
.unwrap_or(0)
})
.unwrap_or(0);
}

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(blob_count, actual_count);
}

#[tokio::test]
async fn test_blobs_by_root_post_fulu_should_return_empty() {
// Only test for Fulu fork
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};

let mut rig = TestRig::new(64).await;

// Get the block root of a sample slot, e.g., slot 1
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();

let blob_ids = vec![BlobIdentifier {
block_root,
index: 0,
}];

let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap();

rig.enqueue_blobs_by_root_request(blob_ids_list);

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRoot(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
// Post-Fulu should return 0 blobs
assert_eq!(0, actual_count);
}