Skip to content

Commit 4cc3b9e

Browse files
authored
Merge of #7771
2 parents 28bfa8a + 8acbed3 commit 4cc3b9e

File tree

7 files changed

+118
-98
lines changed

7 files changed

+118
-98
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3654,7 +3654,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36543654
EngineGetBlobsOutput::Blobs(blobs) => {
36553655
self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?;
36563656
self.data_availability_checker
3657-
.put_gossip_verified_blobs(block_root, blobs)?
3657+
.put_kzg_verified_blobs(block_root, blobs)?
36583658
}
36593659
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
36603660
self.check_columns_for_slashability(

beacon_node/beacon_chain/src/blob_verification.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::block_verification::{
99
BlockSlashInfo,
1010
};
1111
use crate::kzg_utils::{validate_blob, validate_blobs};
12-
use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe};
12+
use crate::observed_data_sidecars::{ObservationStrategy, Observe};
1313
use crate::{metrics, BeaconChainError};
1414
use kzg::{Error as KzgError, Kzg, KzgCommitment};
1515
use ssz_derive::{Decode, Encode};
@@ -304,6 +304,14 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
304304
seen_timestamp: Duration::from_secs(0),
305305
}
306306
}
307+
/// Mark a blob as KZG verified. Caller must ONLY use this on blob sidecars constructed
308+
/// from EL blobs.
309+
pub fn from_execution_verified(blob: Arc<BlobSidecar<E>>, seen_timestamp: Duration) -> Self {
310+
Self {
311+
blob,
312+
seen_timestamp,
313+
}
314+
}
307315
}
308316

309317
/// Complete kzg verification for a `BlobSidecar`.
@@ -594,21 +602,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrat
594602
})
595603
}
596604

597-
impl<T: BeaconChainTypes> GossipVerifiedBlob<T, DoNotObserve> {
598-
pub fn observe(
599-
self,
600-
chain: &BeaconChain<T>,
601-
) -> Result<GossipVerifiedBlob<T, Observe>, GossipBlobError> {
602-
observe_gossip_blob(&self.blob.blob, chain)?;
603-
Ok(GossipVerifiedBlob {
604-
block_root: self.block_root,
605-
blob: self.blob,
606-
_phantom: PhantomData,
607-
})
608-
}
609-
}
610-
611-
fn observe_gossip_blob<T: BeaconChainTypes>(
605+
pub fn observe_gossip_blob<T: BeaconChainTypes>(
612606
blob_sidecar: &BlobSidecar<T::EthSpec>,
613607
chain: &BeaconChain<T>,
614608
) -> Result<(), GossipBlobError> {

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList};
1+
use crate::blob_verification::{
2+
verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList,
3+
};
24
use crate::block_verification_types::{
35
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
46
};
@@ -264,6 +266,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
264266
.put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner()))
265267
}
266268

269+
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
270+
&self,
271+
block_root: Hash256,
272+
blobs: I,
273+
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
274+
self.availability_cache
275+
.put_kzg_verified_blobs(block_root, blobs)
276+
}
277+
267278
/// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also
268279
/// have a block cached, return the `Availability` variant triggering block import.
269280
/// Otherwise cache the data column sidecar.

beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
21
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
32
use crate::observed_block_producers::ProposalKey;
4-
use crate::observed_data_sidecars::DoNotObserve;
53
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
64
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
75
use kzg::Kzg;
@@ -10,7 +8,7 @@ use mockall::automock;
108
use std::collections::HashSet;
119
use std::sync::Arc;
1210
use task_executor::TaskExecutor;
13-
use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot};
11+
use types::{ChainSpec, ColumnIndex, Hash256, Slot};
1412

1513
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
1614
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
@@ -69,11 +67,17 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
6967
.map_err(FetchEngineBlobError::RequestFailed)
7068
}
7169

72-
pub(crate) fn verify_blob_for_gossip(
70+
pub(crate) fn blobs_known_for_proposal(
7371
&self,
74-
blob: &Arc<BlobSidecar<T::EthSpec>>,
75-
) -> Result<GossipVerifiedBlob<T, DoNotObserve>, GossipBlobError> {
76-
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
72+
proposer: u64,
73+
slot: Slot,
74+
) -> Option<HashSet<u64>> {
75+
let proposer_key = ProposalKey::new(proposer, slot);
76+
self.chain
77+
.observed_blob_sidecars
78+
.read()
79+
.known_for_proposal(&proposer_key)
80+
.cloned()
7781
}
7882

7983
pub(crate) fn data_column_known_for_proposal(
@@ -87,6 +91,12 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
8791
.cloned()
8892
}
8993

94+
pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
95+
self.chain
96+
.data_availability_checker
97+
.cached_blob_indexes(block_root)
98+
}
99+
90100
pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
91101
self.chain
92102
.data_availability_checker

beacon_node/beacon_chain/src/fetch_blobs/mod.rs

Lines changed: 46 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter;
1212
#[cfg(test)]
1313
mod tests;
1414

15-
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
15+
use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob};
1616
use crate::block_verification_types::AsBlock;
1717
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
1818
#[cfg_attr(test, double)]
1919
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
2020
use crate::kzg_utils::blobs_to_data_column_sidecars;
2121
use crate::observed_block_producers::ProposalKey;
22-
use crate::observed_data_sidecars::DoNotObserve;
22+
use crate::validator_monitor::timestamp_now;
2323
use crate::{
2424
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
2525
BlockError,
@@ -34,19 +34,19 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h
3434
use std::collections::HashSet;
3535
use std::sync::Arc;
3636
use tracing::{debug, warn};
37-
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
37+
use types::blob_sidecar::BlobSidecarError;
3838
use types::data_column_sidecar::DataColumnSidecarError;
3939
use types::{
40-
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256,
41-
KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
40+
BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs,
41+
SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
4242
};
4343

4444
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
4545
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
4646
/// be published immediately.
4747
#[derive(Debug)]
4848
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
49-
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
49+
Blobs(Vec<KzgVerifiedBlob<T::EthSpec>>),
5050
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
5151
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
5252
}
@@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
186186
.signed_block_header_and_kzg_commitments_proof()
187187
.map_err(FetchEngineBlobError::BeaconStateError)?;
188188

189-
let fixed_blob_sidecar_list = build_blob_sidecars(
189+
let mut blob_sidecar_list = build_blob_sidecars(
190190
&block,
191191
response,
192192
signed_block_header,
193193
&kzg_commitments_proof,
194-
chain_adapter.spec(),
195194
)?;
196195

197-
// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
198-
// the EL making it into the data availability checker. We do not immediately add these
199-
// blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip
200-
// and be accepted (and propagated) while we are waiting to publish. Just before publishing
201-
// we will observe the blobs/columns and only proceed with publishing if they are not yet seen.
202-
let blobs_to_import_and_publish = fixed_blob_sidecar_list
203-
.into_iter()
204-
.filter_map(|opt_blob| {
205-
let blob = opt_blob.as_ref()?;
206-
match chain_adapter.verify_blob_for_gossip(blob) {
207-
Ok(verified) => Some(Ok(verified)),
208-
// Ignore already seen blobs.
209-
Err(GossipBlobError::RepeatBlob { .. }) => None,
210-
Err(e) => Some(Err(e)),
211-
}
212-
})
213-
.collect::<Result<Vec<_>, _>>()
214-
.map_err(FetchEngineBlobError::GossipBlob)?;
196+
if let Some(observed_blobs) =
197+
chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot())
198+
{
199+
blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index()));
200+
if blob_sidecar_list.is_empty() {
201+
debug!(
202+
info = "blobs have already been seen on gossip",
203+
"Ignoring EL blobs response"
204+
);
205+
return Ok(None);
206+
}
207+
}
215208

216-
if blobs_to_import_and_publish.is_empty() {
217-
return Ok(None);
209+
if let Some(known_blobs) = chain_adapter.cached_blob_indexes(&block_root) {
210+
blob_sidecar_list.retain(|blob| !known_blobs.contains(&blob.blob_index()));
211+
if blob_sidecar_list.is_empty() {
212+
debug!(
213+
info = "blobs have already been imported into data availability checker",
214+
"Ignoring EL blobs response"
215+
);
216+
return Ok(None);
217+
}
218218
}
219219

220-
publish_fn(EngineGetBlobsOutput::Blobs(
221-
blobs_to_import_and_publish.clone(),
222-
));
220+
// Up until this point we have not observed the blobs in the gossip cache, which allows them to
221+
// arrive independently while this function is running. In `publish_fn` we will observe them
222+
// and then publish any blobs that had not already been observed.
223+
publish_fn(EngineGetBlobsOutput::Blobs(blob_sidecar_list.clone()));
223224

224225
let availability_processing_status = chain_adapter
225226
.process_engine_blobs(
226227
block.slot(),
227228
block_root,
228-
EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish),
229+
EngineGetBlobsOutput::Blobs(blob_sidecar_list),
229230
)
230231
.await?;
231232

@@ -408,37 +409,28 @@ fn build_blob_sidecars<E: EthSpec>(
408409
response: Vec<Option<BlobAndProofV1<E>>>,
409410
signed_block_header: SignedBeaconBlockHeader,
410411
kzg_commitments_inclusion_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
411-
spec: &ChainSpec,
412-
) -> Result<FixedBlobSidecarList<E>, FetchEngineBlobError> {
413-
let epoch = block.epoch();
414-
let mut fixed_blob_sidecar_list =
415-
FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize);
412+
) -> Result<Vec<KzgVerifiedBlob<E>>, FetchEngineBlobError> {
413+
let mut sidecars = vec![];
416414
for (index, blob_and_proof) in response
417415
.into_iter()
418416
.enumerate()
419-
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
417+
.filter_map(|(index, opt_blob)| Some((index, opt_blob?)))
420418
{
421-
match BlobSidecar::new_with_existing_proof(
419+
let blob_sidecar = BlobSidecar::new_with_existing_proof(
422420
index,
423421
blob_and_proof.blob,
424422
block,
425423
signed_block_header.clone(),
426424
kzg_commitments_inclusion_proof,
427425
blob_and_proof.proof,
428-
) {
429-
Ok(blob) => {
430-
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) {
431-
*blob_mut = Some(Arc::new(blob));
432-
} else {
433-
return Err(FetchEngineBlobError::InternalError(format!(
434-
"Blobs from EL contains blob with invalid index {index}"
435-
)));
436-
}
437-
}
438-
Err(e) => {
439-
return Err(FetchEngineBlobError::BlobSidecarError(e));
440-
}
441-
}
426+
)
427+
.map_err(FetchEngineBlobError::BlobSidecarError)?;
428+
429+
sidecars.push(KzgVerifiedBlob::from_execution_verified(
430+
Arc::new(blob_sidecar),
431+
timestamp_now(),
432+
));
442433
}
443-
Ok(fixed_blob_sidecar_list)
434+
435+
Ok(sidecars)
444436
}

beacon_node/beacon_chain/src/fetch_blobs/tests.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ mod get_blobs_v2 {
250250

251251
mod get_blobs_v1 {
252252
use super::*;
253-
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
254253
use crate::block_verification_types::AsBlock;
254+
use std::collections::HashSet;
255255

256256
const ELECTRA_FORK: ForkName = ForkName::Electra;
257257

@@ -325,10 +325,13 @@ mod get_blobs_v1 {
325325
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
326326
// AND block is not imported into fork choice
327327
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
328-
// AND all blobs returned are valid
328+
// AND all blobs have not yet been seen
329329
mock_adapter
330-
.expect_verify_blob_for_gossip()
331-
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
330+
.expect_cached_blob_indexes()
331+
.returning(|_| None);
332+
mock_adapter
333+
.expect_blobs_known_for_proposal()
334+
.returning(|_, _| None);
332335
// Returned blobs should be processed
333336
mock_process_engine_blobs_result(
334337
&mut mock_adapter,
@@ -408,17 +411,22 @@ mod get_blobs_v1 {
408411
// **GIVEN**:
409412
// All blobs returned
410413
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
414+
let all_blob_indices = blob_and_proof_opts
415+
.iter()
416+
.enumerate()
417+
.map(|(i, _)| i as u64)
418+
.collect::<HashSet<_>>();
419+
411420
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
412421
// block not yet imported into fork choice
413422
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
414423
// All blobs already seen on gossip
415-
mock_adapter.expect_verify_blob_for_gossip().returning(|b| {
416-
Err(GossipBlobError::RepeatBlob {
417-
proposer: b.block_proposer_index(),
418-
slot: b.slot(),
419-
index: b.index,
420-
})
421-
});
424+
mock_adapter
425+
.expect_cached_blob_indexes()
426+
.returning(|_| None);
427+
mock_adapter
428+
.expect_blobs_known_for_proposal()
429+
.returning(move |_, _| Some(all_blob_indices.clone()));
422430

423431
// **WHEN**: Trigger `fetch_blobs` on the block
424432
let custody_columns = hashset![0, 1, 2];
@@ -454,8 +462,11 @@ mod get_blobs_v1 {
454462
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
455463
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
456464
mock_adapter
457-
.expect_verify_blob_for_gossip()
458-
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
465+
.expect_cached_blob_indexes()
466+
.returning(|_| None);
467+
mock_adapter
468+
.expect_blobs_known_for_proposal()
469+
.returning(|_, _| None);
459470
mock_process_engine_blobs_result(
460471
&mut mock_adapter,
461472
Ok(AvailabilityProcessingStatus::Imported(block_root)),

0 commit comments

Comments
 (0)