-
Notifications
You must be signed in to change notification settings - Fork 756
Implement serialize/deserialize for obsolete accounts #8414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9997967
c4bcf2f
27fd90b
1d8843e
db5e1c7
e1ecfbc
516d9df
053ca4d
9bb7a1e
391fe73
d24ef21
8634ddd
7592bb5
a08ef30
94d0722
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,18 @@ | ||
use {crate::account_info::Offset, solana_clock::Slot}; | ||
|
||
#[derive(Debug, Clone, PartialEq)] | ||
struct ObsoleteAccountItem { | ||
pub struct ObsoleteAccountItem { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, public for access in storage.rs |
||
/// Offset of the account in the account storage entry | ||
offset: Offset, | ||
pub offset: Offset, | ||
/// Length of the account data | ||
data_len: usize, | ||
pub data_len: usize, | ||
/// Slot when the account was marked obsolete | ||
slot: Slot, | ||
pub slot: Slot, | ||
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Default)] | ||
pub struct ObsoleteAccounts { | ||
accounts: Vec<ObsoleteAccountItem>, | ||
pub accounts: Vec<ObsoleteAccountItem>, | ||
} | ||
|
||
impl ObsoleteAccounts { | ||
|
@@ -45,6 +45,22 @@ impl ObsoleteAccounts { | |
.filter(move |obsolete_account| slot.is_none_or(|s| obsolete_account.slot <= s)) | ||
.map(|obsolete_account| (obsolete_account.offset, obsolete_account.data_len)) | ||
} | ||
|
||
/// Returns the accounts that were marked obsolete as of the passed in slot | ||
/// or earlier. Returned data includes the slots that the accounts were marked | ||
/// obsolete at | ||
pub fn obsolete_accounts_for_snapshots(&self, slot: Slot) -> ObsoleteAccounts { | ||
let filtered_accounts = self | ||
.accounts | ||
.iter() | ||
.filter(|account| account.slot <= slot) | ||
.cloned() | ||
.collect(); | ||
|
||
ObsoleteAccounts { | ||
accounts: filtered_accounts, | ||
} | ||
} | ||
} | ||
#[cfg(test)] | ||
mod tests { | ||
|
@@ -99,4 +115,40 @@ mod tests { | |
|
||
assert_eq!(filtered_accounts, vec![(10, 100), (20, 200), (30, 300)]); | ||
} | ||
|
||
#[test] | ||
fn test_obsolete_accounts_for_snapshots() { | ||
let mut obsolete_accounts = ObsoleteAccounts::default(); | ||
let new_accounts = vec![(10, 100, 40), (20, 200, 42), (30, 300, 44)] | ||
.into_iter() | ||
.map(|(offset, data_len, slot)| ObsoleteAccountItem { | ||
offset, | ||
data_len, | ||
slot, | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
// Mark accounts obsolete with different slots | ||
new_accounts.iter().for_each(|item| { | ||
obsolete_accounts | ||
.mark_accounts_obsolete([(item.offset, item.data_len)].into_iter(), item.slot) | ||
}); | ||
|
||
// Filter accounts obsolete as of slot 42 | ||
let obsolete_accounts_for_snapshots = obsolete_accounts.obsolete_accounts_for_snapshots(42); | ||
|
||
let expected_accounts: Vec<_> = new_accounts | ||
.iter() | ||
.filter(|account| account.slot <= 42) | ||
.cloned() | ||
.collect(); | ||
|
||
assert_eq!(obsolete_accounts_for_snapshots.accounts, expected_accounts); | ||
|
||
// Filter accounts obsolete passing in no slot (i.e., all obsolete accounts) | ||
let obsolete_accounts_for_snapshots = | ||
obsolete_accounts.obsolete_accounts_for_snapshots(100); | ||
|
||
assert_eq!(obsolete_accounts_for_snapshots.accounts, new_accounts); | ||
} | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,15 +54,17 @@ use { | |
types::SerdeAccountsLtHash, | ||
}; | ||
|
||
mod obsolete_accounts; | ||
mod status_cache; | ||
mod storage; | ||
mod tests; | ||
mod types; | ||
mod utils; | ||
|
||
pub(crate) use { | ||
obsolete_accounts::SerdeObsoleteAccountsMap, | ||
status_cache::{deserialize_status_cache, serialize_status_cache}, | ||
storage::{SerdeObsoleteAccounts, SerializableAccountStorageEntry, SerializedAccountsFileId}, | ||
storage::{SerializableAccountStorageEntry, SerializedAccountsFileId}, | ||
}; | ||
|
||
const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024; | ||
|
@@ -369,7 +371,7 @@ impl<T> SnapshotAccountsDbFields<T> { | |
} | ||
} | ||
|
||
fn deserialize_from<R, T>(reader: R) -> bincode::Result<T> | ||
pub(crate) fn deserialize_from<R, T>(reader: R) -> bincode::Result<T> | ||
where | ||
R: Read, | ||
T: DeserializeOwned, | ||
|
@@ -381,6 +383,17 @@ where | |
.deserialize_from::<R, T>(reader) | ||
} | ||
|
||
pub(crate) fn serialize_into<W, T>(writer: W, value: &T) -> bincode::Result<()> | ||
where | ||
W: Write, | ||
T: Serialize, | ||
{ | ||
bincode::options() | ||
.with_fixint_encoding() | ||
.with_limit(MAX_STREAM_SIZE) | ||
.serialize_into(writer, value) | ||
} | ||
|
||
fn deserialize_accounts_db_fields<R>( | ||
stream: &mut BufReader<R>, | ||
) -> Result<AccountsDbFields<SerializableAccountStorageEntry>, Error> | ||
|
@@ -855,22 +868,21 @@ pub(crate) fn reconstruct_single_storage( | |
current_len: usize, | ||
id: AccountsFileId, | ||
storage_access: StorageAccess, | ||
obsolete_accounts: Option<SerdeObsoleteAccounts>, | ||
obsolete_accounts: Option<(ObsoleteAccounts, AccountsFileId, usize)>, | ||
) -> Result<Arc<AccountStorageEntry>, SnapshotError> { | ||
// When restoring from an archive, obsolete accounts will always be `None` | ||
// When restoring from fastboot, obsolete accounts will be 'Some' if the storage contained | ||
// accounts marked obsolete at the time the snapshot was taken. | ||
let (current_len, obsolete_accounts) = if let Some(obsolete_accounts) = obsolete_accounts { | ||
let updated_len = current_len + obsolete_accounts.bytes as usize; | ||
let id = id as SerializedAccountsFileId; | ||
if obsolete_accounts.id != id { | ||
let updated_len = current_len + obsolete_accounts.2; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's more clear if you do a diff to the last two commits, but there was a translation from the SerdeObsoleteAccounts to ObsoleteAccounts performed in this function. I moved the translation into obsolete_accounts.rs. Generally I would like to avoid the .0, .1, .2 calls here, but the other options I see are
|
||
if obsolete_accounts.1 != id { | ||
return Err(SnapshotError::MismatchedAccountsFileId( | ||
id, | ||
obsolete_accounts.id, | ||
obsolete_accounts.1, | ||
)); | ||
} | ||
|
||
(updated_len, obsolete_accounts.accounts) | ||
(updated_len, obsolete_accounts.0) | ||
} else { | ||
(current_len, ObsoleteAccounts::default()) | ||
}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
use { | ||
crate::serde_snapshot::SerializedAccountsFileId, | ||
dashmap::DashMap, | ||
rayon::iter::{IntoParallelRefIterator, ParallelIterator}, | ||
serde::{Deserialize, Serialize}, | ||
solana_accounts_db::{ | ||
account_info::Offset, | ||
accounts_db::{AccountStorageEntry, AccountsFileId}, | ||
ObsoleteAccountItem, ObsoleteAccounts, | ||
}, | ||
solana_clock::Slot, | ||
std::sync::Arc, | ||
}; | ||
|
||
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] | ||
#[derive(Debug, Default, Serialize, Deserialize)] | ||
struct SerdeObsoleteAccounts { | ||
/// The ID of the associated account file. Used for verification to ensure the restored | ||
/// obsolete accounts correspond to the correct account file | ||
pub id: SerializedAccountsFileId, | ||
/// The number of obsolete bytes in the storage. These bytes are removed during archive | ||
/// serialization/deserialization but are present when restoring from directories. This value | ||
/// is used to validate the size when creating the accounts file. | ||
pub bytes: u64, | ||
/// A list of accounts that are obsolete in the storage being restored. | ||
pub accounts: Vec<(Offset, usize, Slot)>, | ||
} | ||
|
||
impl SerdeObsoleteAccounts { | ||
/// Creates a new `SerdeObsoleteAccounts` instance from a given storage entry and snapshot slot. | ||
fn new_from_storage_entry_at_slot(storage: &AccountStorageEntry, snapshot_slot: Slot) -> Self { | ||
let accounts = storage | ||
.obsolete_accounts_for_snapshots(snapshot_slot) | ||
.accounts | ||
.into_iter() | ||
.map(|item| (item.offset, item.data_len, item.slot)) | ||
.collect(); | ||
|
||
SerdeObsoleteAccounts { | ||
id: storage.id() as SerializedAccountsFileId, | ||
bytes: storage.get_obsolete_bytes(Some(snapshot_slot)) as u64, | ||
accounts, | ||
} | ||
} | ||
} | ||
|
||
/// Represents a map of obsolete accounts data for multiple slots. | ||
/// This struct is serialized/deserialized as part of the snapshot process | ||
/// to capture and restore obsolete accounts information for account storages. | ||
#[cfg_attr( | ||
feature = "frozen-abi", | ||
derive(AbiExample), | ||
frozen_abi(digest = "12qimMBghYs9dL4nhws7Xe1B5MXWBV75VTMTGLHxevYE") | ||
)] | ||
#[derive(Serialize, Deserialize, Debug)] | ||
pub(crate) struct SerdeObsoleteAccountsMap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the new type added, and the frozen_abi |
||
map: DashMap<Slot, SerdeObsoleteAccounts>, | ||
} | ||
|
||
impl SerdeObsoleteAccountsMap { | ||
/// Creates a new `SerdeObsoleteAccountsMap` from a list of storage entries and a snapshot slot. | ||
pub(crate) fn new_from_storages( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this new and pushed the complexity of creating it into here. |
||
snapshot_storages: &[Arc<AccountStorageEntry>], | ||
snapshot_slot: Slot, | ||
) -> Self { | ||
let map = DashMap::with_capacity(snapshot_storages.len()); | ||
snapshot_storages.par_iter().for_each(|storage| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this to par_iter. Providing large benefits: without par_iter
With par_iter
|
||
map.insert( | ||
storage.slot(), | ||
SerdeObsoleteAccounts::new_from_storage_entry_at_slot(storage, snapshot_slot), | ||
); | ||
}); | ||
SerdeObsoleteAccountsMap { map } | ||
} | ||
|
||
/// Removes and returns the obsolete accounts data for a given slot. | ||
pub(crate) fn remove(&self, slot: &Slot) -> Option<(ObsoleteAccounts, AccountsFileId, usize)> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the transformation to a more general type here as well. |
||
self.map.remove(slot).map(|(_, entry)| { | ||
let accounts = entry | ||
.accounts | ||
.into_iter() | ||
.map(|(offset, data_len, slot)| ObsoleteAccountItem { | ||
offset, | ||
data_len, | ||
slot, | ||
}) | ||
.collect(); | ||
|
||
( | ||
ObsoleteAccounts { accounts }, | ||
entry.id as AccountsFileId, | ||
entry.bytes as usize, | ||
) | ||
}) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use { | ||
super::*, | ||
crate::serde_snapshot::{deserialize_from, serialize_into}, | ||
std::io::{BufReader, BufWriter, Cursor}, | ||
test_case::test_case, | ||
}; | ||
|
||
/// Tests the serialization and deserialization of obsolete accounts | ||
#[test_case(0, 0)] | ||
#[test_case(1, 0)] | ||
#[test_case(10, 15)] | ||
fn test_serialize_and_deserialize_obsolete_accounts( | ||
num_storages: u64, | ||
num_obsolete_accounts_per_storage: usize, | ||
) { | ||
// Create a set of obsolete accounts | ||
let obsolete_accounts = DashMap::<Slot, ObsoleteAccounts>::new(); | ||
for slot in 1..=num_storages { | ||
let obsolete_accounts_list = ObsoleteAccounts { | ||
accounts: (0..num_obsolete_accounts_per_storage) | ||
.map(|j| ObsoleteAccountItem { | ||
offset: j as Offset, | ||
data_len: j * 10, | ||
slot: slot + 1, | ||
}) | ||
.collect(), | ||
}; | ||
|
||
obsolete_accounts.insert(slot, obsolete_accounts_list); | ||
} | ||
|
||
// Convert the obsolete accounts into a SerdeObsoleteAccountsMap | ||
let map = obsolete_accounts | ||
.iter() | ||
.map(|entry| { | ||
let accounts = entry | ||
.value() | ||
.accounts | ||
.iter() | ||
.map(|item| (item.offset, item.data_len, item.slot)) | ||
.collect(); | ||
let serde_obsolete_accounts = SerdeObsoleteAccounts { | ||
id: *entry.key() as SerializedAccountsFileId, | ||
bytes: num_obsolete_accounts_per_storage as u64 * 1000, | ||
accounts, | ||
}; | ||
(*entry.key(), serde_obsolete_accounts) | ||
}) | ||
.collect(); | ||
let obsolete_accounts_map = SerdeObsoleteAccountsMap { map }; | ||
|
||
// Serialize the obsolete accounts map | ||
let mut buf = Vec::new(); | ||
let cursor = Cursor::new(&mut buf); | ||
let mut writer = BufWriter::new(cursor); | ||
serialize_into(&mut writer, &obsolete_accounts_map).unwrap(); | ||
drop(writer); | ||
|
||
// Deserialize the obsolete accounts map | ||
let cursor = Cursor::new(buf.as_slice()); | ||
let mut reader = BufReader::new(cursor); | ||
let deserialized_obsolete_accounts: SerdeObsoleteAccountsMap = | ||
deserialize_from(&mut reader).unwrap(); | ||
|
||
// Verify the deserialized data matches the original obsolete accounts | ||
assert_eq!( | ||
deserialized_obsolete_accounts.map.len(), | ||
obsolete_accounts.len() | ||
); | ||
for (slot, obsolete_accounts) in obsolete_accounts { | ||
let deserialized_obsolete_accounts = | ||
deserialized_obsolete_accounts.remove(&slot).unwrap(); | ||
assert_eq!(obsolete_accounts, deserialized_obsolete_accounts.0); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.