@@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter;
12
12
#[ cfg( test) ]
13
13
mod tests;
14
14
15
- use crate :: blob_verification:: { GossipBlobError , GossipVerifiedBlob } ;
15
+ use crate :: blob_verification:: { GossipBlobError , KzgVerifiedBlob } ;
16
16
use crate :: block_verification_types:: AsBlock ;
17
17
use crate :: data_column_verification:: { KzgVerifiedCustodyDataColumn , KzgVerifiedDataColumn } ;
18
18
#[ cfg_attr( test, double) ]
19
19
use crate :: fetch_blobs:: fetch_blobs_beacon_adapter:: FetchBlobsBeaconAdapter ;
20
20
use crate :: kzg_utils:: blobs_to_data_column_sidecars;
21
21
use crate :: observed_block_producers:: ProposalKey ;
22
- use crate :: observed_data_sidecars :: DoNotObserve ;
22
+ use crate :: validator_monitor :: timestamp_now ;
23
23
use crate :: {
24
24
metrics, AvailabilityProcessingStatus , BeaconChain , BeaconChainError , BeaconChainTypes ,
25
25
BlockError ,
@@ -34,19 +34,19 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h
34
34
use std:: collections:: HashSet ;
35
35
use std:: sync:: Arc ;
36
36
use tracing:: { debug, warn} ;
37
- use types:: blob_sidecar:: { BlobSidecarError , FixedBlobSidecarList } ;
37
+ use types:: blob_sidecar:: BlobSidecarError ;
38
38
use types:: data_column_sidecar:: DataColumnSidecarError ;
39
39
use types:: {
40
- BeaconStateError , Blob , BlobSidecar , ChainSpec , ColumnIndex , EthSpec , FullPayload , Hash256 ,
41
- KzgProofs , SignedBeaconBlock , SignedBeaconBlockHeader , VersionedHash ,
40
+ BeaconStateError , Blob , BlobSidecar , ColumnIndex , EthSpec , FullPayload , Hash256 , KzgProofs ,
41
+ SignedBeaconBlock , SignedBeaconBlockHeader , VersionedHash ,
42
42
} ;
43
43
44
44
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
45
45
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
46
46
/// be published immediately.
47
47
#[ derive( Debug ) ]
48
48
pub enum EngineGetBlobsOutput < T : BeaconChainTypes > {
49
- Blobs ( Vec < GossipVerifiedBlob < T , DoNotObserve > > ) ,
49
+ Blobs ( Vec < KzgVerifiedBlob < T :: EthSpec > > ) ,
50
50
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
51
51
CustodyColumns ( Vec < KzgVerifiedCustodyDataColumn < T :: EthSpec > > ) ,
52
52
}
@@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
186
186
. signed_block_header_and_kzg_commitments_proof ( )
187
187
. map_err ( FetchEngineBlobError :: BeaconStateError ) ?;
188
188
189
- let fixed_blob_sidecar_list = build_blob_sidecars (
189
+ let mut blob_sidecar_list = build_blob_sidecars (
190
190
& block,
191
191
response,
192
192
signed_block_header,
193
193
& kzg_commitments_proof,
194
- chain_adapter. spec ( ) ,
195
194
) ?;
196
195
197
- // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
198
- // the EL making it into the data availability checker. We do not immediately add these
199
- // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip
200
- // and be accepted (and propagated) while we are waiting to publish. Just before publishing
201
- // we will observe the blobs/columns and only proceed with publishing if they are not yet seen.
202
- let blobs_to_import_and_publish = fixed_blob_sidecar_list
203
- . into_iter ( )
204
- . filter_map ( |opt_blob| {
205
- let blob = opt_blob. as_ref ( ) ?;
206
- match chain_adapter. verify_blob_for_gossip ( blob) {
207
- Ok ( verified) => Some ( Ok ( verified) ) ,
208
- // Ignore already seen blobs.
209
- Err ( GossipBlobError :: RepeatBlob { .. } ) => None ,
210
- Err ( e) => Some ( Err ( e) ) ,
211
- }
212
- } )
213
- . collect :: < Result < Vec < _ > , _ > > ( )
214
- . map_err ( FetchEngineBlobError :: GossipBlob ) ?;
196
+ if let Some ( observed_blobs) =
197
+ chain_adapter. blobs_known_for_proposal ( block. message ( ) . proposer_index ( ) , block. slot ( ) )
198
+ {
199
+ blob_sidecar_list. retain ( |blob| !observed_blobs. contains ( & blob. blob_index ( ) ) ) ;
200
+ if blob_sidecar_list. is_empty ( ) {
201
+ debug ! (
202
+ info = "blobs have already been seen on gossip" ,
203
+ "Ignoring EL blobs response"
204
+ ) ;
205
+ return Ok ( None ) ;
206
+ }
207
+ }
215
208
216
- if blobs_to_import_and_publish. is_empty ( ) {
217
- return Ok ( None ) ;
209
+ if let Some ( known_blobs) = chain_adapter. cached_blob_indexes ( & block_root) {
210
+ blob_sidecar_list. retain ( |blob| !known_blobs. contains ( & blob. blob_index ( ) ) ) ;
211
+ if blob_sidecar_list. is_empty ( ) {
212
+ debug ! (
213
+ info = "blobs have already been imported into data availability checker" ,
214
+ "Ignoring EL blobs response"
215
+ ) ;
216
+ return Ok ( None ) ;
217
+ }
218
218
}
219
219
220
- publish_fn ( EngineGetBlobsOutput :: Blobs (
221
- blobs_to_import_and_publish. clone ( ) ,
222
- ) ) ;
220
+ // Up until this point we have not observed the blobs in the gossip cache, which allows them to
221
+ // arrive independently while this function is running. In `publish_fn` we will observe them
222
+ // and then publish any blobs that had not already been observed.
223
+ publish_fn ( EngineGetBlobsOutput :: Blobs ( blob_sidecar_list. clone ( ) ) ) ;
223
224
224
225
let availability_processing_status = chain_adapter
225
226
. process_engine_blobs (
226
227
block. slot ( ) ,
227
228
block_root,
228
- EngineGetBlobsOutput :: Blobs ( blobs_to_import_and_publish ) ,
229
+ EngineGetBlobsOutput :: Blobs ( blob_sidecar_list ) ,
229
230
)
230
231
. await ?;
231
232
@@ -408,37 +409,28 @@ fn build_blob_sidecars<E: EthSpec>(
408
409
response : Vec < Option < BlobAndProofV1 < E > > > ,
409
410
signed_block_header : SignedBeaconBlockHeader ,
410
411
kzg_commitments_inclusion_proof : & FixedVector < Hash256 , E :: KzgCommitmentsInclusionProofDepth > ,
411
- spec : & ChainSpec ,
412
- ) -> Result < FixedBlobSidecarList < E > , FetchEngineBlobError > {
413
- let epoch = block. epoch ( ) ;
414
- let mut fixed_blob_sidecar_list =
415
- FixedBlobSidecarList :: default ( spec. max_blobs_per_block ( epoch) as usize ) ;
412
+ ) -> Result < Vec < KzgVerifiedBlob < E > > , FetchEngineBlobError > {
413
+ let mut sidecars = vec ! [ ] ;
416
414
for ( index, blob_and_proof) in response
417
415
. into_iter ( )
418
416
. enumerate ( )
419
- . filter_map ( |( i , opt_blob) | Some ( ( i , opt_blob?) ) )
417
+ . filter_map ( |( index , opt_blob) | Some ( ( index , opt_blob?) ) )
420
418
{
421
- match BlobSidecar :: new_with_existing_proof (
419
+ let blob_sidecar = BlobSidecar :: new_with_existing_proof (
422
420
index,
423
421
blob_and_proof. blob ,
424
422
block,
425
423
signed_block_header. clone ( ) ,
426
424
kzg_commitments_inclusion_proof,
427
425
blob_and_proof. proof ,
428
- ) {
429
- Ok ( blob) => {
430
- if let Some ( blob_mut) = fixed_blob_sidecar_list. get_mut ( index) {
431
- * blob_mut = Some ( Arc :: new ( blob) ) ;
432
- } else {
433
- return Err ( FetchEngineBlobError :: InternalError ( format ! (
434
- "Blobs from EL contains blob with invalid index {index}"
435
- ) ) ) ;
436
- }
437
- }
438
- Err ( e) => {
439
- return Err ( FetchEngineBlobError :: BlobSidecarError ( e) ) ;
440
- }
441
- }
426
+ )
427
+ . map_err ( FetchEngineBlobError :: BlobSidecarError ) ?;
428
+
429
+ sidecars. push ( KzgVerifiedBlob :: from_execution_verified (
430
+ Arc :: new ( blob_sidecar) ,
431
+ timestamp_now ( ) ,
432
+ ) ) ;
442
433
}
443
- Ok ( fixed_blob_sidecar_list)
434
+
435
+ Ok ( sidecars)
444
436
}
0 commit comments