From e0aa917ab988535f77615ba67922340410d13e34 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 12:52:19 +0800 Subject: [PATCH 1/6] add fulu_start_slot empty response --- .../src/network_beacon_processor/rpc_methods.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 4004305f83c..68da3dbfb59 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -885,6 +885,22 @@ impl NetworkBeaconProcessor { let request_start_slot = Slot::from(req.start_slot); + // Check if the request slot is after a Fulu slot; if so, return empty response + if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + if request_start_slot >= fulu_start_slot { + debug!( + %peer_id, + %request_start_slot, + %fulu_start_slot, + returned = 0, + "BlobsByRange request is at or after a Fulu slot, returning empty response" + ); + return Ok(()); + } + } + let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { From 4f8be4fc5a51774d6c653519c8f52f63e1962b7f Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 21:21:13 +0800 Subject: [PATCH 2/6] BlobsByRange that spans Fulu fork --- .../network_beacon_processor/rpc_methods.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 68da3dbfb59..2bdfbe8e9c7 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -884,11 +884,14 @@ impl NetworkBeaconProcessor { ); let request_start_slot = Slot::from(req.start_slot); + // This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot + let mut effective_count = req.count; - // Check if the request slot is after a Fulu slot; if so, return empty response if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let request_end_slot = request_start_slot + req.count - 1; + // If the request_start_slot is at or after a Fulu slot, return empty response if request_start_slot >= fulu_start_slot { debug!( %peer_id, @@ -898,6 +901,17 @@ impl NetworkBeaconProcessor { "BlobsByRange request is at or after a Fulu slot, returning empty response" ); return Ok(()); + // For the case that the request slots spans across the Fulu fork slot + } else if request_start_slot < fulu_start_slot && request_end_slot >= fulu_start_slot { + effective_count = (fulu_start_slot - request_start_slot).as_u64(); + debug!( + %peer_id, + %request_start_slot, + %fulu_start_slot, + requested_count = req.count, + served_count = effective_count, + "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" + ) } } @@ -937,7 +951,7 @@ impl NetworkBeaconProcessor { } let block_roots = - self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; + self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?; let current_slot = self .chain @@ -964,7 +978,7 @@ impl NetworkBeaconProcessor { // Due to skip slots, blobs could be out of the range, we ensure they // are in the range before sending if blob_sidecar.slot() >= request_start_slot - && blob_sidecar.slot() < request_start_slot + req.count + && blob_sidecar.slot() < request_start_slot + effective_count { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { From cbc4306c12632da942a8087d5e84d54c2f890e20 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 21:44:24 +0800 Subject: [PATCH 3/6] rename --- .../network/src/network_beacon_processor/rpc_methods.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 2bdfbe8e9c7..53594c6a352 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -908,8 +908,8 @@ impl NetworkBeaconProcessor { %peer_id, %request_start_slot, %fulu_start_slot, - requested_count = req.count, - served_count = effective_count, + requested = req.count, + returned = effective_count, "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" ) } From b9149f6126e19e0517d06b3d353a8ff980f73e83 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 18 Jul 2025 21:52:12 +0800 Subject: [PATCH 4/6] Add BlobsByRange test --- .../src/network_beacon_processor/tests.rs | 104 +++++++++++++++++- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 109c361ebe8..a540e402315 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -417,15 +417,12 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot: 0, - count, - }, + BlobsByRangeRequest { start_slot, count }, ) .unwrap(); } @@ -1325,8 +1322,9 @@ async fn test_blobs_by_range() { return; }; let mut rig = TestRig::new(64).await; + let start_slot = 0; let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; for slot in 0..slot_count { @@ -1362,3 +1360,97 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_range_post_fulu_should_returns_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_range_spans_fulu_fork() { + // Only test for Electra & Fulu fork transition + if test_spec::().electra_fork_epoch.is_none() { + return; + }; + let mut spec = test_spec::(); + spec.fulu_fork_epoch = Some(Epoch::new(1)); + + let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await; + + let start_slot = 16; + // This will span from epoch 0 (Electra) to epoch 1 (Fulu) + let slot_count = 32; + + println!("Deneb fork epoch is: {:?}", rig.chain.spec.deneb_fork_epoch); + println!( + "Electra fork epoch is: {:?}", + rig.chain.spec.electra_fork_epoch + ); + println!("Fulu fork epoch is: {:?}", rig.chain.spec.fulu_fork_epoch); + + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut blob_count = 0; + for slot in start_slot..slot_count { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + } + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} From befef72f92f742fab2846203deae4e326207274b Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 22 Jul 2025 07:50:26 +0800 Subject: [PATCH 5/6] add Fuly BlobsByRoot --- .../network_beacon_processor/rpc_methods.rs | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 53594c6a352..39a663d3660 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -279,10 +279,35 @@ impl NetworkBeaconProcessor { .collect::>(); let mut send_blob_count = 0; + let fulu_start_slot = self + .chain + .spec + .fulu_fork_epoch + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); + let mut blob_list_results = HashMap::new(); for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { + // Check if the blob requested is from a Fulu slot, if so, skip the current blob id and proceed to the next + if let Some(fulu_slot) = fulu_start_slot { + if blob.slot() >= fulu_slot { + debug!( + %peer_id, + request_root = %root, + blob_slot = %blob.slot(), + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + continue; + } + } + self.send_response( peer_id, inbound_request_id, @@ -290,11 +315,6 @@ impl NetworkBeaconProcessor { ); send_blob_count += 1; } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) @@ -306,6 +326,20 @@ impl NetworkBeaconProcessor { Ok(blobs_sidecar_list) => { 'inner: for blob_sidecar in blobs_sidecar_list.iter() { if blob_sidecar.index == *index { + // Same logic as above to check for Fulu slot + if let Some(fulu_slot) = fulu_start_slot { + if blob_sidecar.slot() >= fulu_slot { + debug!( + %peer_id, + request_root = %root, + blob_slot = %blob_sidecar.slot(), + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + break 'inner; + } + } + self.send_response( peer_id, inbound_request_id, From 18322732cee92f5a218c98216e8ef69cef77d859 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 22 Jul 2025 08:19:29 +0800 Subject: [PATCH 6/6] test --- .../src/network_beacon_processor/tests.rs | 75 ++++++++++++++++--- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index a540e402315..f55958a8a95 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -20,7 +20,7 @@ use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::{ discv5::enr::{self, CombinedKey}, @@ -34,11 +34,12 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, - DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, RuntimeVariableList, + SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, + SubnetId, }; type E = MainnetEthSpec; @@ -427,6 +428,16 @@ impl TestRig { .unwrap(); } + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { blob_ids }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self) { self.network_beacon_processor .send_chain_segment( @@ -1362,7 +1373,7 @@ async fn test_blobs_by_range() { } #[tokio::test] -async fn test_blobs_by_range_post_fulu_should_returns_empty() { +async fn test_blobs_by_range_post_fulu_should_return_empty() { // Only test for Fulu fork if test_spec::().fulu_fork_epoch.is_none() { return; @@ -1409,13 +1420,6 @@ async fn test_blobs_by_range_spans_fulu_fork() { // This will span from epoch 0 (Electra) to epoch 1 (Fulu) let slot_count = 32; - println!("Deneb fork epoch is: {:?}", rig.chain.spec.deneb_fork_epoch); - println!( - "Electra fork epoch is: {:?}", - rig.chain.spec.electra_fork_epoch - ); - println!("Fulu fork epoch is: {:?}", rig.chain.spec.fulu_fork_epoch); - rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; @@ -1454,3 +1458,50 @@ async fn test_blobs_by_range_spans_fulu_fork() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_root_post_fulu_should_return_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Get the block root of a sample slot, e.g., slot 1 + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blob_ids = vec![BlobIdentifier { + block_root, + index: 0, + }]; + + let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +}