Skip to content

Prevent gaps between the global validator cache and its on-disk representation #7241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3823,16 +3823,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut ops = {
let (indices_to_write, mut ops) = {
let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_PUBKEY_CACHE_LOCK);
let pubkey_cache = self.validator_pubkey_cache.upgradable_read();

// Only take a write lock if there are new keys to import.
if state.validators().len() > pubkey_cache.len() {
parking_lot::RwLockUpgradableReadGuard::upgrade(pubkey_cache)
.import_new_pubkeys(&state)?
let mut pubkey_cache =
parking_lot::RwLockUpgradableReadGuard::upgrade(pubkey_cache);
pubkey_cache.import_new_pubkeys(&state)?;
pubkey_cache.get_db_ops()
} else {
vec![]
// Even if no new validators were written to the cache,
// we may still have validators staged to be written to the db
// because of a previous failed import.
pubkey_cache.get_db_ops()
}
};

Expand Down Expand Up @@ -4005,6 +4010,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);

// We can safely flush `indices_to_write` from `staged_indices` as they are now guaranteed to be written to disk.
if !indices_to_write.is_empty() {
let mut pubkey_cache = self.validator_pubkey_cache.write();
pubkey_cache.flush_staged_indices(indices_to_write);
}

// We're declaring the block "imported" at this point, since fork choice and the DB know
// about it.
let block_time_imported = timestamp_now();
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,10 @@ where
.map(|mut validator_pubkey_cache| {
// If any validators weren't persisted to disk on previous runs, this will use the head state to
// "top-up" the in-memory validator cache and its on-disk representation with any missing validators.
let pubkey_store_ops = validator_pubkey_cache
validator_pubkey_cache
.import_new_pubkeys(&head_snapshot.beacon_state)
.map_err(|e| format!("Unable to top-up persisted pubkey cache {:?}", e))?;
let (_, pubkey_store_ops) = validator_pubkey_cache.get_db_ops();
if !pubkey_store_ops.is_empty() {
// Write any missed validators to disk
debug!(
Expand Down
60 changes: 42 additions & 18 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
use smallvec::SmallVec;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, FixedBytesExtended, Hash256, PublicKey, PublicKeyBytes};
Expand All @@ -21,6 +21,8 @@ pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
// Indices in this set should be flushed to disk
staged_indices: BTreeMap<usize, PublicKey>,
_phantom: PhantomData<T>,
}

Expand All @@ -36,11 +38,13 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
staged_indices: BTreeMap::new(),
_phantom: PhantomData,
};

let store_ops = cache.import_new_pubkeys(state)?;
store.do_atomically_with_block_and_blobs_cache(store_ops)?;
cache.import_new_pubkeys(state)?;
let (_, db_ops) = cache.get_db_ops();
store.do_atomically_with_block_and_blobs_cache(db_ops)?;

Ok(cache)
}
Expand Down Expand Up @@ -68,6 +72,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys,
indices,
pubkey_bytes,
staged_indices: BTreeMap::new(),
_phantom: PhantomData,
})
}
Expand All @@ -80,7 +85,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
) -> Result<(), BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state
Expand All @@ -89,49 +94,42 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
.map(|v| v.pubkey),
)
} else {
Ok(vec![])
Ok(())
}
}

/// Adds zero or more validators to `self`.
fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
fn import<I>(&mut self, validator_keys: I) -> Result<(), BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());

let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey_bytes in validator_keys {
let i = self.pubkeys.len();

if self.indices.contains_key(&pubkey_bytes) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}

let pubkey = (&pubkey_bytes)
let pubkey: PublicKey = (&pubkey_bytes)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;

// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey::from_pubkey(&pubkey)
.as_kv_store_op(DatabasePubkey::key_for_index(i)),
));
self.staged_indices.insert(i, pubkey.clone());

self.pubkeys.push(pubkey);
self.pubkey_bytes.push(pubkey_bytes);
self.indices.insert(pubkey_bytes, i);
}

Ok(store_ops)
Ok(())
}

/// Get the public key for a validator with index `i`.
Expand Down Expand Up @@ -163,6 +161,30 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}

/// Returns a list of validator indices and their associated database operations
/// from the mapping of `staged_indices` that are currently being persisted in the cache.
pub fn get_db_ops(&self) -> (Vec<usize>, Vec<StoreOp<'static, T::EthSpec>>) {
let mut store_ops = vec![];
let mut indices = vec![];

for (i, pubkey) in self.staged_indices.iter() {
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey::from_pubkey(pubkey)
.as_kv_store_op(DatabasePubkey::key_for_index(*i)),
));
indices.push(*i);
}
(indices, store_ops)
}

/// Flush `indices` from the `staged_indices` cache. This action should only
/// be made after writing validators to disk.
pub fn flush_staged_indices(&mut self, indices: Vec<usize>) {
for index in indices {
self.staged_indices.remove(&index);
}
}
}

/// Wrapper for a public key stored in the database.
Expand Down Expand Up @@ -313,10 +335,12 @@ mod test {

// Add some more keypairs.
let (state, keypairs) = get_state(12);
let ops = cache
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.do_atomically_with_block_and_blobs_cache(ops).unwrap();
store
.do_atomically_with_block_and_blobs_cache(cache.get_db_ops().1)
.unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);

Expand Down