Skip to content

Commit 48d01f7

Browse files
committed
feat: skip message buffer aggregation if all price feeds use v2 aggregation
1 parent a4cafa9 commit 48d01f7

File tree

1 file changed

+92
-96
lines changed

1 file changed

+92
-96
lines changed

runtime/src/bank/pyth_accumulator.rs

Lines changed: 92 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use {
44
byteorder::LittleEndian,
55
itertools::Itertools,
66
log::*,
7+
pyth_oracle::validator::AggregationError,
78
solana_sdk::{
89
account::{AccountSharedData, ReadableAccount},
9-
clock::Slot,
1010
feature_set,
1111
hash::hashv,
1212
pubkey::Pubkey,
@@ -19,14 +19,6 @@ use {
1919

2020
pub const ACCUMULATOR_RING_SIZE: u32 = 10_000;
2121

22-
#[derive(Debug, thiserror::Error)]
23-
pub enum AccumulatorUpdateErrorV2 {
24-
#[error("no oracle pubkey")]
25-
NoOraclePubkey,
26-
#[error("get_program_accounts failed to return accounts: {0}")]
27-
GetProgramAccounts(#[from] ScanError),
28-
}
29-
3022
lazy_static! {
3123
static ref ORACLE_PUBKEY: Option<Pubkey> = match env::var("PYTH_ORACLE_PUBKEY") {
3224
Ok(value) => Some(
@@ -45,8 +37,8 @@ lazy_static! {
4537
/// not include any `Custom` style variant we can leverage, so we introduce our own.
4638
#[derive(Debug, thiserror::Error)]
4739
pub enum AccumulatorUpdateErrorV1 {
48-
#[error("get_program_accounts failed to return accounts")]
49-
GetProgramAccounts,
40+
#[error("get_program_accounts failed to return accounts: {0}")]
41+
GetProgramAccounts(#[from] ScanError),
5042

5143
#[error("failed to serialize sequence account")]
5244
FailedSequenceSerialization,
@@ -59,6 +51,9 @@ pub enum AccumulatorUpdateErrorV1 {
5951

6052
#[error("could not parse Pubkey from environment")]
6153
InvalidEnvPubkey(#[from] solana_sdk::pubkey::ParsePubkeyError),
54+
55+
#[error("no oracle pubkey")]
56+
NoOraclePubkey,
6257
}
6358

6459
/// Updates the Accumulator Sysvar at the start of a new slot. See `update_clock` to see a similar
@@ -82,31 +77,9 @@ pub fn update_accumulator(bank: &Bank) {
8277

8378
info!("Accumulator: Updating accumulator. Slot: {}", bank.slot());
8479

85-
lazy_static! {
86-
static ref ACCUMULATOR_V2_SLOT: Option<Slot> =
87-
match std::env::var("PYTH_ACCUMULATOR_V2_FROM_SLOT") {
88-
Ok(value) => Some(
89-
value
90-
.parse()
91-
.expect("invalid value of PYTH_ACCUMULATOR_V2_FROM_SLOT env var")
92-
),
93-
Err(std::env::VarError::NotPresent) => None,
94-
Err(std::env::VarError::NotUnicode(err)) => {
95-
panic!("invalid value of PYTH_ACCUMULATOR_V2_FROM_SLOT env var: {err:?}");
96-
}
97-
};
80+
if let Err(e) = update_v2(bank) {
81+
error!("Error updating accumulator v2: {:?}", e);
9882
}
99-
100-
// TODO: No longer a slot or feature flag, based on price account flag.
101-
if (*ACCUMULATOR_V2_SLOT).map_or(false, |v2_slot| bank.slot() >= v2_slot) {
102-
if let Err(e) = update_v2(bank) {
103-
error!("Error updating accumulator: {:?}", e);
104-
}
105-
} else {
106-
if let Err(e) = update_v1(bank) {
107-
error!("Error updating accumulator: {:?}", e);
108-
}
109-
};
11083
}
11184

11285
/// Read the pubkey from the environment variable `var` or return `default`
@@ -158,7 +131,11 @@ pub fn get_accumulator_keys() -> Vec<(
158131
accumulator_pubkeys
159132
}
160133

161-
pub fn update_v1(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV1> {
134+
pub fn update_v1(
135+
bank: &Bank,
136+
v2_messages: Vec<&[u8]>,
137+
use_message_buffers: bool,
138+
) -> std::result::Result<(), AccumulatorUpdateErrorV1> {
162139
use {
163140
byteorder::ReadBytesExt,
164141
pythnet_sdk::{
@@ -179,57 +156,66 @@ pub fn update_v1(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
179156
Pubkey::new_from_array(MESSAGE_BUFFER_PID),
180157
)?;
181158

182-
let accounts = bank
183-
.get_program_accounts(&message_buffer_pid, &ScanConfig::new(true))
184-
.map_err(|_| AccumulatorUpdateErrorV1::GetProgramAccounts)?;
185-
186-
let preimage = b"account:MessageBuffer";
187-
let mut expected_sighash = [0u8; 8];
188-
expected_sighash.copy_from_slice(&hashv(&[preimage]).to_bytes()[..8]);
189-
190-
// Filter accounts that don't match the Anchor sighash.
191-
let accounts = accounts.iter().filter(|(_, account)| {
192-
// Remove accounts that do not start with the expected Anchor sighash.
193-
let mut sighash = [0u8; 8];
194-
sighash.copy_from_slice(&account.data()[..8]);
195-
sighash == expected_sighash
196-
});
197-
198-
// This code, using the offsets in each Account, extracts the various data versions from
199-
// the account. We deduplicate this result because the accumulator expects a set.
200-
let accounts = accounts
201-
.map(|(_, account)| {
202-
let data = account.data();
203-
let mut cursor = std::io::Cursor::new(&data);
204-
let _sighash = cursor.read_u64::<LittleEndian>()?;
205-
let _bump = cursor.read_u8()?;
206-
let _version = cursor.read_u8()?;
207-
let header_len = cursor.read_u16::<LittleEndian>()?;
208-
let mut header_begin = header_len;
209-
let mut inputs = Vec::new();
210-
let mut cur_end_offsets_idx: usize = 0;
211-
while let Some(end) = cursor.read_u16::<LittleEndian>().ok() {
212-
if end == 0 || cur_end_offsets_idx == (u8::MAX as usize) {
213-
break;
159+
let message_buffer_accounts;
160+
let v1_messages = if use_message_buffers {
161+
message_buffer_accounts = bank
162+
.get_program_accounts(&message_buffer_pid, &ScanConfig::new(true))
163+
.map_err(AccumulatorUpdateErrorV1::GetProgramAccounts)?;
164+
165+
let preimage = b"account:MessageBuffer";
166+
let mut expected_sighash = [0u8; 8];
167+
expected_sighash.copy_from_slice(&hashv(&[preimage]).to_bytes()[..8]);
168+
169+
// Filter accounts that don't match the Anchor sighash.
170+
let message_buffer_accounts = message_buffer_accounts.iter().filter(|(_, account)| {
171+
// Remove accounts that do not start with the expected Anchor sighash.
172+
let mut sighash = [0u8; 8];
173+
sighash.copy_from_slice(&account.data()[..8]);
174+
sighash == expected_sighash
175+
});
176+
177+
// This code, using the offsets in each Account, extracts the various data versions from
178+
// the account. We deduplicate this result because the accumulator expects a set.
179+
message_buffer_accounts
180+
.map(|(_, account)| {
181+
let data = account.data();
182+
let mut cursor = std::io::Cursor::new(&data);
183+
let _sighash = cursor.read_u64::<LittleEndian>()?;
184+
let _bump = cursor.read_u8()?;
185+
let _version = cursor.read_u8()?;
186+
let header_len = cursor.read_u16::<LittleEndian>()?;
187+
let mut header_begin = header_len;
188+
let mut inputs = Vec::new();
189+
let mut cur_end_offsets_idx: usize = 0;
190+
while let Some(end) = cursor.read_u16::<LittleEndian>().ok() {
191+
if end == 0 || cur_end_offsets_idx == (u8::MAX as usize) {
192+
break;
193+
}
194+
195+
let end_offset = header_len + end;
196+
if end_offset as usize > data.len() {
197+
break;
198+
}
199+
let accumulator_input_data = &data[header_begin as usize..end_offset as usize];
200+
inputs.push(accumulator_input_data);
201+
header_begin = end_offset;
202+
cur_end_offsets_idx += 1;
214203
}
215204

216-
let end_offset = header_len + end;
217-
if end_offset as usize > data.len() {
218-
break;
219-
}
220-
let accumulator_input_data = &data[header_begin as usize..end_offset as usize];
221-
inputs.push(accumulator_input_data);
222-
header_begin = end_offset;
223-
cur_end_offsets_idx += 1;
224-
}
205+
Ok(inputs)
206+
})
207+
.collect::<std::result::Result<Vec<_>, std::io::Error>>()?
208+
.into_iter()
209+
.flatten()
210+
.sorted_unstable()
211+
.dedup()
212+
.collect()
213+
} else {
214+
Vec::new()
215+
};
225216

226-
Ok(inputs)
227-
})
228-
.collect::<std::result::Result<Vec<_>, std::io::Error>>()?
229-
.into_iter()
230-
.flatten()
231-
.sorted_unstable()
232-
.dedup();
217+
let mut messages = v1_messages;
218+
messages.extend(v2_messages);
233219

234220
// We now generate a Proof PDA (Owned by the System Program) to store the resulting Proof
235221
// Set. The derivation includes the ring buffer index to simulate a ring buffer in order
@@ -242,8 +228,7 @@ pub fn update_v1(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
242228
let accumulator_data = {
243229
let mut data = vec![];
244230
let acc_state_magic = &mut b"PAS1".to_vec();
245-
let accounts_data =
246-
&mut borsh::BorshSerialize::try_to_vec(&accounts.clone().collect::<Vec<_>>())?;
231+
let accounts_data = &mut borsh::BorshSerialize::try_to_vec(&messages)?;
247232
data.append(acc_state_magic);
248233
data.append(&mut borsh::BorshSerialize::try_to_vec(&bank.slot())?);
249234
data.append(&mut borsh::BorshSerialize::try_to_vec(
@@ -260,7 +245,7 @@ pub fn update_v1(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
260245
// Generate a Message owned by Wormhole to be sent cross-chain. This short-circuits the
261246
// Wormhole message generation code that would normally be called, but the Guardian
262247
// set filters our messages so this does not pose a security risk.
263-
if let Some(accumulator) = MerkleAccumulator::<Keccak160>::from_set(accounts) {
248+
if let Some(accumulator) = MerkleAccumulator::<Keccak160>::from_set(messages.into_iter()) {
264249
post_accumulator_attestation(bank, accumulator, ring_index)?;
265250
}
266251

@@ -390,14 +375,16 @@ fn post_accumulator_attestation(
390375
Ok(())
391376
}
392377

393-
pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV2> {
378+
pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV1> {
394379
// 1. Find Oracle
395-
let oracle_pubkey = ORACLE_PUBKEY.ok_or(AccumulatorUpdateErrorV2::NoOraclePubkey)?;
380+
let oracle_pubkey = ORACLE_PUBKEY.ok_or(AccumulatorUpdateErrorV1::NoOraclePubkey)?;
396381

397382
// 2. Find Oracle Accounts
398383
let accounts = bank
399384
.get_program_accounts(&oracle_pubkey, &ScanConfig::new(true))
400-
.map_err(AccumulatorUpdateErrorV2::GetProgramAccounts)?;
385+
.map_err(AccumulatorUpdateErrorV1::GetProgramAccounts)?;
386+
387+
let mut any_legacy_mode = false;
401388

402389
// 3. Call Aggregation on Price Accounts.
403390
for (pubkey, mut account) in accounts {
@@ -409,16 +396,25 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
409396
bank.clock().unix_timestamp,
410397
&mut price_account_data,
411398
) {
412-
Ok(success) => {
413-
if success {
414-
account.set_data(price_account_data);
415-
bank.store_account_and_update_capitalization(&pubkey, &account);
416-
}
399+
Ok(()) => {
400+
account.set_data(price_account_data);
401+
bank.store_account_and_update_capitalization(&pubkey, &account);
417402
}
418-
Err(err) => trace!("Aggregation: failed to update_price_cumulative, {:?}", err),
403+
Err(err) => match err {
404+
AggregationError::NotPriceFeedAccount => {}
405+
AggregationError::LegacyAggregationMode => {
406+
any_legacy_mode = true;
407+
}
408+
AggregationError::NotTradingStatus => {
409+
trace!("Aggregation: failed to update_price_cumulative, {:?}", err);
410+
}
411+
},
419412
}
420413
}
421414

415+
// TODO: make new messages
416+
update_v1(bank, Vec::new(), any_legacy_mode)?;
417+
422418
// 5. Merkleize the results.
423419

424420
// 6. Create Wormhole Message Account

0 commit comments

Comments
 (0)