diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index f057c0619d5..33fee5cdea0 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -1143,6 +1143,7 @@ fn verify_head_block_is_known( .fork_choice_read_lock() .get_block(&attestation_data.beacon_block_root) .or_else(|| { + // FIXME(sproul): abolish? chain .early_attester_cache .get_proto_block(attestation_data.beacon_block_root) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01075ae4a4c..1939cfb1880 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3824,7 +3824,7 @@ impl BeaconChain { /// (i.e., this function is not atomic). #[allow(clippy::too_many_arguments)] fn import_block( - &self, + self: &Arc, signed_block: AvailableBlock, block_root: Hash256, mut state: BeaconState, @@ -3907,57 +3907,23 @@ impl BeaconChain { .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; } - // If the block is recent enough and it was not optimistically imported, check to see if it - // becomes the head block. If so, apply it to the early attester cache. This will allow - // attestations to the block without waiting for the block and state to be inserted to the - // database. - // - // Only performing this check on recent blocks avoids slowing down sync with lots of calls - // to fork choice `get_head`. + // Run a fork choice update immediately in the common case where we are in sync. // - // Optimistically imported blocks are not added to the cache since the cache is only useful - // for a small window of time and the complexity of keeping track of the optimistic status - // is not worth it. + // This gets the fcU to the EL ASAP, prior to disk writes and other slow operations. if !payload_verification_status.is_optimistic() && block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { - let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE); - match fork_choice.get_head(current_slot, &self.spec) { - // This block became the head, add it to the early attester cache. - Ok(new_head_root) if new_head_root == block_root => { - if let Some(proto_block) = fork_choice.get_block(&block_root) { - if let Err(e) = self.early_attester_cache.add_head_block( - block_root, - &signed_block, - proto_block, - &state, - &self.spec, - ) { - warn!( - error = ?e, - "Early attester cache insert failed" - ); - } else { - let attestable_timestamp = - self.slot_clock.now_duration().unwrap_or_default(); - self.block_times_cache.write().set_time_attestable( - block_root, - signed_block.slot(), - attestable_timestamp, - ) - } - } else { - warn!(?block_root, "Early attester block missing"); - } - } - // This block did not become the head, nothing to do. - Ok(_) => (), - Err(e) => error!( - error = ?e, - "Failed to compute head during block import" - ), - } - drop(fork_choice_timer); + let _fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE); + let pending_block_snapshot = BeaconSnapshot { + beacon_block: signed_block.block_cloned(), + beacon_block_root: block_root, + beacon_state: state.clone(), + }; + self.recompute_head_at_slot_blocking( + current_slot, + fork_choice, + Some(pending_block_snapshot), + )?; } drop(post_exec_timer); @@ -4005,10 +3971,13 @@ impl BeaconChain { ?block_root, "Failed to store data columns into the database" ); + return Err(BlockError::InternalError(e)); + /* return Err(self .handle_import_block_db_write_error(fork_choice) .err() .unwrap_or(BlockError::InternalError(e))); + */ } } @@ -4023,15 +3992,19 @@ impl BeaconChain { error = ?e, "Database write failed!" ); + /* FIXME(sproul): hmmm return Err(self .handle_import_block_db_write_error(fork_choice) .err() .unwrap_or(e.into())); + */ + return Err(e.into()); } // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. - drop(fork_choice); + // FIXME(sproul): consider how to restore this invariant + // drop(fork_choice); // We're declaring the block "imported" at this point, since fork choice and the DB know // about it. @@ -4070,6 +4043,7 @@ impl BeaconChain { Ok(block_root) } + #[allow(dead_code)] fn handle_import_block_db_write_error( &self, // We don't actually need this value, however it's always present when we call this function diff --git a/beacon_node/beacon_chain/src/block_times_cache.rs b/beacon_node/beacon_chain/src/block_times_cache.rs index bd1adb7e407..069ddf2b0d4 100644 --- a/beacon_node/beacon_chain/src/block_times_cache.rs +++ b/beacon_node/beacon_chain/src/block_times_cache.rs @@ -50,7 +50,7 @@ pub struct BlockDelays { /// We need to use `available` again rather than `attestable` to handle the case where the block /// does not get added to the early-attester cache. pub imported: Option, - /// Time after `imported`. + /// Time after `available`. pub set_as_head: Option, } @@ -83,7 +83,7 @@ impl BlockDelays { .and_then(|imported_time| imported_time.checked_sub(available_time?)); let set_as_head = times .set_as_head - .and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?)); + .and_then(|set_as_head_time| set_as_head_time.checked_sub(available_time?)); BlockDelays { observed, all_blobs_observed, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index f96b59aec4b..eb46250504d 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -500,8 +500,15 @@ impl BeaconChain { let chain = self.clone(); match self .spawn_blocking_handle( - move || chain.recompute_head_at_slot_internal(current_slot), - "recompute_head_internal", + move || { + let fork_choice_write_lock = chain.canonical_head.fork_choice_write_lock(); + chain.recompute_head_at_slot_blocking( + current_slot, + fork_choice_write_lock, + None, + ) + }, + "recompute_head_blocking", ) .await { @@ -546,10 +553,13 @@ impl BeaconChain { /// /// This function performs long-running, heavy-lifting tasks which should not be performed on /// the core `tokio` executor. - fn recompute_head_at_slot_internal( + pub fn recompute_head_at_slot_blocking( self: &Arc, current_slot: Slot, + mut fork_choice_write_lock: RwLockWriteGuard<'_, BeaconForkChoice>, + pending_block_snapshot: Option>, ) -> Result>>, Error> { + // FIXME(sproul): consider getting rid of this lock let recompute_head_lock = self.canonical_head.recompute_head_lock.lock(); // Take a clone of the current ("old") head. @@ -567,8 +577,6 @@ impl BeaconChain { finalized_checkpoint: old_cached_head.finalized_checkpoint(), }; - let mut fork_choice_write_lock = self.canonical_head.fork_choice_write_lock(); - // Recompute the current head via the fork choice algorithm. fork_choice_write_lock.get_head(current_slot, &self.spec)?; @@ -636,34 +644,77 @@ impl BeaconChain { // will just cause lock contention. drop(fork_choice_read_lock); + // Update the early attester cache as soon as the fork choice lock is dropped. + if let Some(ref snapshot) = pending_block_snapshot { + if snapshot.beacon_block_root == new_view.head_block_root { + // FIXME(sproul): update add_head_block to take a snapshot + if let Err(e) = self.early_attester_cache.add_head_block( + snapshot.beacon_block_root, + snapshot.beacon_block.clone(), + new_head_proto_block.clone(), + &snapshot.beacon_state, + &self.spec, + ) { + warn!( + error = ?e, + "Early attester cache insert failed" + ); + } else { + let attestable_timestamp = self.slot_clock.now_duration().unwrap_or_default(); + self.block_times_cache.write().set_time_attestable( + snapshot.beacon_block_root, + snapshot.beacon_block.slot(), + attestable_timestamp, + ) + } + } + } + + // The execution layer updates might attempt to take a write-lock on fork choice, so it's + // important to ensure the fork-choice lock isn't being held (dropped a few lines earlier). + // + // We want to shoot this update off ASAP so that the EL can update its view of the head and + // start work like block building, etc. We used to unnecessarily do a bunch of work before + // this: https://github.com/sigp/lighthouse/issues/7745 + let el_update_handle = + spawn_execution_layer_updates(self.clone(), new_forkchoice_update_parameters)?; + // If the head has changed, update `self.canonical_head`. let new_cached_head = if new_view.head_block_root != old_view.head_block_root { metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); - let mut new_snapshot = { - let beacon_block = self - .store - .get_full_block(&new_view.head_block_root)? - .ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?; + // FIXME(sproul): this would be nicer with a let chain. + let head_block_root = new_view.head_block_root; + let store = self.store.clone(); + let get_snapshot = move || -> Result, Error> { + if let Some(snapshot) = pending_block_snapshot { + if snapshot.beacon_block_root == head_block_root { + return Ok(snapshot); + } + } + let beacon_block = store + .get_full_block(&head_block_root)? + .ok_or(Error::MissingBeaconBlock(head_block_root))?; - let (_, beacon_state) = self - .store + let (_, beacon_state) = store .get_advanced_hot_state( - new_view.head_block_root, + head_block_root, current_slot, beacon_block.state_root(), )? .ok_or(Error::MissingBeaconState(beacon_block.state_root()))?; - BeaconSnapshot { + Ok(BeaconSnapshot { beacon_block: Arc::new(beacon_block), beacon_block_root: new_view.head_block_root, beacon_state, - } + }) }; // Regardless of where we got the state from, attempt to build all the // caches except the tree hash cache. + let mut new_snapshot = get_snapshot()?; + new_snapshot.beacon_state.build_all_caches(&self.spec)?; let new_cached_head = CachedHead { @@ -747,11 +798,6 @@ impl BeaconChain { } } - // The execution layer updates might attempt to take a write-lock on fork choice, so it's - // important to ensure the fork-choice lock isn't being held. - let el_update_handle = - spawn_execution_layer_updates(self.clone(), new_forkchoice_update_parameters)?; - // We have completed recomputing the head and it's now valid for another process to do the // same. drop(recompute_head_lock); diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 5665ef3775c..56f0f0a1278 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -1,4 +1,3 @@ -use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; use crate::{ attester_cache::{CommitteeLengths, Error}, metrics, @@ -21,8 +20,6 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - blobs: Option>, - data_columns: Option>, proto_block: ProtoBlock, } @@ -52,7 +49,7 @@ impl EarlyAttesterCache { pub fn add_head_block( &self, beacon_block_root: Hash256, - block: &AvailableBlock, + block: Arc>, proto_block: ProtoBlock, state: &BeaconState, spec: &ChainSpec, @@ -70,21 +67,13 @@ impl EarlyAttesterCache { }, }; - let (blobs, data_columns) = match block.data() { - AvailableBlockData::NoData => (None, None), - AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), - AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), - }; - let item = CacheItem { epoch, committee_lengths, beacon_block_root, source, target, - block: block.block_cloned(), - blobs, - data_columns, + block, proto_block, }; @@ -163,21 +152,27 @@ impl EarlyAttesterCache { } /// Returns the blobs, if `block_root` matches the cached item. - pub fn get_blobs(&self, block_root: Hash256) -> Option> { + pub fn get_blobs(&self, _block_root: Hash256) -> Option> { + /* FIXME(sproul): nah bruv self.item .read() .as_ref() .filter(|item| item.beacon_block_root == block_root) .and_then(|item| item.blobs.clone()) + */ + None } /// Returns the data columns, if `block_root` matches the cached item. - pub fn get_data_columns(&self, block_root: Hash256) -> Option> { + pub fn get_data_columns(&self, _block_root: Hash256) -> Option> { + /* FIXME(sproul): nah bruv self.item .read() .as_ref() .filter(|item| item.beacon_block_root == block_root) .and_then(|item| item.data_columns.clone()) + */ + None } /// Returns the proto-array block, if `block_root` matches the cached item. diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index d89a8530e1b..cd5854121f5 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -242,7 +242,7 @@ async fn produces_attestations() { .early_attester_cache .add_head_block( block_root, - &available_block, + available_block.block_cloned(), proto_block, &state, &chain.spec, @@ -310,7 +310,7 @@ async fn early_attester_cache_old_request() { .early_attester_cache .add_head_block( head.beacon_block_root, - &available_block, + available_block.block_cloned(), head_proto_block, &head.beacon_state, &harness.chain.spec,