Skip to content

Remove KZG verification on blobs fetched from the EL #7771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3664,7 +3664,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?;
self.data_availability_checker
.put_gossip_verified_blobs(block_root, blobs)?
.put_kzg_verified_blobs(block_root, blobs)?
}
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
self.check_columns_for_slashability(
Expand Down
26 changes: 10 additions & 16 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::block_verification::{
BlockSlashInfo,
};
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe};
use crate::observed_data_sidecars::{ObservationStrategy, Observe};
use crate::{metrics, BeaconChainError};
use kzg::{Error as KzgError, Kzg, KzgCommitment};
use ssz_derive::{Decode, Encode};
Expand Down Expand Up @@ -304,6 +304,14 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
seen_timestamp: Duration::from_secs(0),
}
}
/// Mark a blob as KZG verified. Caller must ONLY use this on blob sidecars constructed
/// from EL blobs.
pub fn from_execution_verified(blob: Arc<BlobSidecar<E>>, seen_timestamp: Duration) -> Self {
Self {
blob,
seen_timestamp,
}
}
}

/// Complete kzg verification for a `BlobSidecar`.
Expand Down Expand Up @@ -594,21 +602,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrat
})
}

impl<T: BeaconChainTypes> GossipVerifiedBlob<T, DoNotObserve> {
pub fn observe(
self,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlob<T, Observe>, GossipBlobError> {
observe_gossip_blob(&self.blob.blob, chain)?;
Ok(GossipVerifiedBlob {
block_root: self.block_root,
blob: self.blob,
_phantom: PhantomData,
})
}
}

fn observe_gossip_blob<T: BeaconChainTypes>(
pub fn observe_gossip_blob<T: BeaconChainTypes>(
blob_sidecar: &BlobSidecar<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), GossipBlobError> {
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList};
use crate::blob_verification::{
verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList,
};
use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
};
Expand Down Expand Up @@ -264,6 +266,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner()))
}

pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
blobs: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_kzg_verified_blobs(block_root, blobs)
}

/// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the data column sidecar.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use kzg::Kzg;
Expand All @@ -10,7 +8,7 @@ use mockall::automock;
use std::collections::HashSet;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot};
use types::{ChainSpec, ColumnIndex, Hash256, Slot};

/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
Expand Down Expand Up @@ -69,11 +67,17 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.map_err(FetchEngineBlobError::RequestFailed)
}

pub(crate) fn verify_blob_for_gossip(
pub(crate) fn blobs_known_for_proposal(
&self,
blob: &Arc<BlobSidecar<T::EthSpec>>,
) -> Result<GossipVerifiedBlob<T, DoNotObserve>, GossipBlobError> {
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
proposer: u64,
slot: Slot,
) -> Option<HashSet<u64>> {
let proposer_key = ProposalKey::new(proposer, slot);
self.chain
.observed_blob_sidecars
.read()
.known_for_proposal(&proposer_key)
.cloned()
}

pub(crate) fn data_column_known_for_proposal(
Expand All @@ -87,6 +91,12 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.cloned()
}

pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
.cached_blob_indexes(block_root)
}

pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
Expand Down
100 changes: 46 additions & 54 deletions beacon_node/beacon_chain/src/fetch_blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter;
#[cfg(test)]
mod tests;

use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob};
use crate::block_verification_types::AsBlock;
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
#[cfg_attr(test, double)]
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::validator_monitor::timestamp_now;
use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
BlockError,
Expand All @@ -34,19 +34,19 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{debug, warn};
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
use types::blob_sidecar::BlobSidecarError;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256,
KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs,
SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
};

/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
/// be published immediately.
#[derive(Debug)]
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
Blobs(Vec<KzgVerifiedBlob<T::EthSpec>>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
}
Expand Down Expand Up @@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
.signed_block_header_and_kzg_commitments_proof()
.map_err(FetchEngineBlobError::BeaconStateError)?;

let fixed_blob_sidecar_list = build_blob_sidecars(
let mut blob_sidecar_list = build_blob_sidecars(
&block,
response,
signed_block_header,
&kzg_commitments_proof,
chain_adapter.spec(),
)?;

// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
// the EL making it into the data availability checker. We do not immediately add these
// blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip
// and be accepted (and propagated) while we are waiting to publish. Just before publishing
// we will observe the blobs/columns and only proceed with publishing if they are not yet seen.
let blobs_to_import_and_publish = fixed_blob_sidecar_list
.into_iter()
.filter_map(|opt_blob| {
let blob = opt_blob.as_ref()?;
match chain_adapter.verify_blob_for_gossip(blob) {
Ok(verified) => Some(Ok(verified)),
// Ignore already seen blobs.
Err(GossipBlobError::RepeatBlob { .. }) => None,
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipBlob)?;
if let Some(observed_blobs) =
chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot())
{
blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index()));
if blob_sidecar_list.is_empty() {
debug!(
info = "blobs have already been seen on gossip",
"Ignoring EL blobs response"
);
return Ok(None);
}
}

if blobs_to_import_and_publish.is_empty() {
return Ok(None);
if let Some(known_blobs) = chain_adapter.cached_blob_indexes(&block_root) {
blob_sidecar_list.retain(|blob| !known_blobs.contains(&blob.blob_index()));
if blob_sidecar_list.is_empty() {
debug!(
info = "blobs have already been imported into data availability checker",
"Ignoring EL blobs response"
);
return Ok(None);
}
}

publish_fn(EngineGetBlobsOutput::Blobs(
blobs_to_import_and_publish.clone(),
));
// Up until this point we have not observed the blobs in the gossip cache, which allows them to
// arrive independently while this function is running. In `publish_fn` we will observe them
// and then publish any blobs that had not already been observed.
publish_fn(EngineGetBlobsOutput::Blobs(blob_sidecar_list.clone()));

let availability_processing_status = chain_adapter
.process_engine_blobs(
block.slot(),
block_root,
EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish),
EngineGetBlobsOutput::Blobs(blob_sidecar_list),
)
.await?;

Expand Down Expand Up @@ -408,37 +409,28 @@ fn build_blob_sidecars<E: EthSpec>(
response: Vec<Option<BlobAndProofV1<E>>>,
signed_block_header: SignedBeaconBlockHeader,
kzg_commitments_inclusion_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
spec: &ChainSpec,
) -> Result<FixedBlobSidecarList<E>, FetchEngineBlobError> {
let epoch = block.epoch();
let mut fixed_blob_sidecar_list =
FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize);
) -> Result<Vec<KzgVerifiedBlob<E>>, FetchEngineBlobError> {
let mut sidecars = vec![];
for (index, blob_and_proof) in response
.into_iter()
.enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
.filter_map(|(index, opt_blob)| Some((index, opt_blob?)))
{
match BlobSidecar::new_with_existing_proof(
let blob_sidecar = BlobSidecar::new_with_existing_proof(
index,
blob_and_proof.blob,
block,
signed_block_header.clone(),
kzg_commitments_inclusion_proof,
blob_and_proof.proof,
) {
Ok(blob) => {
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) {
*blob_mut = Some(Arc::new(blob));
} else {
return Err(FetchEngineBlobError::InternalError(format!(
"Blobs from EL contains blob with invalid index {index}"
)));
}
}
Err(e) => {
return Err(FetchEngineBlobError::BlobSidecarError(e));
}
}
)
.map_err(FetchEngineBlobError::BlobSidecarError)?;

sidecars.push(KzgVerifiedBlob::from_execution_verified(
Arc::new(blob_sidecar),
timestamp_now(),
));
}
Ok(fixed_blob_sidecar_list)

Ok(sidecars)
}
37 changes: 24 additions & 13 deletions beacon_node/beacon_chain/src/fetch_blobs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ mod get_blobs_v2 {

mod get_blobs_v1 {
use super::*;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_verification_types::AsBlock;
use std::collections::HashSet;

const ELECTRA_FORK: ForkName = ForkName::Electra;

Expand Down Expand Up @@ -325,10 +325,13 @@ mod get_blobs_v1 {
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// AND block is not imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// AND all blobs returned are valid
// AND all blobs have not yet been seen
mock_adapter
.expect_verify_blob_for_gossip()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
// Returned blobs should be processed
mock_process_engine_blobs_result(
&mut mock_adapter,
Expand Down Expand Up @@ -408,17 +411,22 @@ mod get_blobs_v1 {
// **GIVEN**:
// All blobs returned
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
let all_blob_indices = blob_and_proof_opts
.iter()
.enumerate()
.map(|(i, _)| i as u64)
.collect::<HashSet<_>>();

mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All blobs already seen on gossip
mock_adapter.expect_verify_blob_for_gossip().returning(|b| {
Err(GossipBlobError::RepeatBlob {
proposer: b.block_proposer_index(),
slot: b.slot(),
index: b.index,
})
});
mock_adapter
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(move |_, _| Some(all_blob_indices.clone()));

// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
Expand Down Expand Up @@ -454,8 +462,11 @@ mod get_blobs_v1 {
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter
.expect_verify_blob_for_gossip()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),
Expand Down
Loading