From 0fb5ef4dd10efd8c0dbd6c397578e1dbcad4aab8 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 24 Jul 2024 10:02:36 +0100 Subject: [PATCH 01/10] feat: batch publish --- Cargo.lock | 2 +- runtime/Cargo.toml | 2 +- runtime/src/bank.rs | 1 + runtime/src/bank/pyth_accumulator.rs | 40 ++- runtime/src/bank/pyth_accumulator_tests.rs | 140 +++++++++- runtime/src/bank/pyth_batch_publish.rs | 310 +++++++++++++++++++++ validator/src/main.rs | 7 +- 7 files changed, 485 insertions(+), 17 deletions(-) create mode 100644 runtime/src/bank/pyth_batch_publish.rs diff --git a/Cargo.lock b/Cargo.lock index 34a08fbd3f..3282e1ee78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3601,7 +3601,7 @@ dependencies = [ [[package]] name = "pyth-oracle" version = "2.33.0" -source = "git+https://github.com/pyth-network/pyth-client?tag=oracle-v2.33.0#5ee8b5f63f8b1d72f84c1177adaf564d7d61d092" +source = "git+https://github.com/pyth-network/pyth-client?branch=batch-publish#44bf1257e14d3ecf06516bc2520d9358e58b1ed7" dependencies = [ "bindgen 0.60.1", "bitflags 2.6.0", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 72c9fd2090..ac5b87ca27 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -35,7 +35,7 @@ num-traits = { version = "0.2" } num_cpus = "1.13.1" once_cell = "1.12.0" ouroboros = "0.15.0" -pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", tag = "oracle-v2.33.0", features = ["library"] } +pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] } pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "33f901aa45f4f0005aa5a84a1479b78ca9033074" } rand = "0.7.0" rayon = "1.5.3" diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 760901289b..e7a3bb53d8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -199,6 +199,7 @@ mod sysvar_cache; mod transaction_account_state_info; pub mod pyth_accumulator; +mod pyth_batch_publish; #[cfg(test)] mod pyth_accumulator_tests; diff --git a/runtime/src/bank/pyth_accumulator.rs b/runtime/src/bank/pyth_accumulator.rs index f915cc3f98..c37e7c516d 100644 --- a/runtime/src/bank/pyth_accumulator.rs +++ b/runtime/src/bank/pyth_accumulator.rs @@ -1,5 +1,5 @@ use { - super::Bank, + super::{pyth_batch_publish, Bank}, crate::accounts_index::{IndexKey, ScanConfig, ScanError}, byteorder::{LittleEndian, ReadBytesExt}, log::*, @@ -17,7 +17,10 @@ use { hash::hashv, pubkey::Pubkey, }, - std::env::{self, VarError}, + std::{ + collections::HashMap, + env::{self, VarError}, + }, }; pub const ACCUMULATOR_RING_SIZE: u32 = 10_000; @@ -51,6 +54,13 @@ lazy_static! { .parse() .unwrap(), ); + pub static ref BATCH_PUBLISH_PID: Pubkey = env_pubkey_or( + "BATCH_PUBLISH_PID", + // TODO: replace with real program id + "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epA" + .parse() + .unwrap(), + ); } /// Accumulator specific error type. It would be nice to use `transaction::Error` but it does @@ -132,6 +142,7 @@ pub fn get_accumulator_keys() -> Vec<( "STAKE_CAPS_PARAMETERS_ADDR", Ok(*STAKE_CAPS_PARAMETERS_ADDR), ), + ("BATCH_PUBLISH_PID", Ok(*BATCH_PUBLISH_PID)), ] } @@ -427,21 +438,34 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV v2_messages.push(publisher_stake_caps_message); } - let mut measure = Measure::start("update_v2_aggregate_price"); + let new_prices = pyth_batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { + warn!("extract_batch_publish_prices failed: {}", err); + HashMap::new() + }); + let mut measure = Measure::start("update_v2_aggregate_price"); for (pubkey, mut account) in accounts { let mut price_account_data = account.data().to_owned(); + let price_account = if let Ok(data) = + pyth_oracle::validator::validate_price_account(&mut price_account_data) + { + data + } else { + continue; // Not a price account. + }; + + let mut need_save = + pyth_batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); // Perform Accumulation match pyth_oracle::validator::aggregate_price( bank.slot(), bank.clock().unix_timestamp, &pubkey.to_bytes().into(), - &mut price_account_data, + price_account, ) { Ok(messages) => { - account.set_data(price_account_data); - bank.store_account_and_update_capitalization(&pubkey, &account); + need_save = true; v2_messages.extend(messages); } Err(err) => match err { @@ -451,6 +475,10 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV } }, } + if need_save { + account.set_data(price_account_data); + bank.store_account_and_update_capitalization(&pubkey, &account); + } } measure.stop(); diff --git a/runtime/src/bank/pyth_accumulator_tests.rs b/runtime/src/bank/pyth_accumulator_tests.rs index 68a1bfe0b9..fc8b582044 100644 --- a/runtime/src/bank/pyth_accumulator_tests.rs +++ b/runtime/src/bank/pyth_accumulator_tests.rs @@ -7,12 +7,15 @@ use { }, bank::{ pyth_accumulator::{ - get_accumulator_keys, ACCUMULATOR_RING_SIZE, ORACLE_PID, STAKE_CAPS_PARAMETERS_ADDR, + get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, + STAKE_CAPS_PARAMETERS_ADDR, }, + pyth_batch_publish::publisher_prices_account::{self, PublisherPrice}, Bank, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, }, + bytemuck::{cast_slice, checked::from_bytes}, byteorder::{ByteOrder, LittleEndian, ReadBytesExt}, itertools::Itertools, pyth_oracle::{ @@ -48,7 +51,9 @@ fn create_new_bank_for_tests_with_index(genesis_config: &GenesisConfig) -> Bank AccountSecondaryIndexes { keys: Some(AccountSecondaryIndexesIncludeExclude { exclude: false, - keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID].into_iter().collect(), + keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID, *BATCH_PUBLISH_PID] + .into_iter() + .collect(), }), indexes: [AccountIndex::ProgramId].into_iter().collect(), }, @@ -1005,8 +1010,6 @@ fn test_publisher_stake_caps() { ], ), generate_price(&bank, b"seeds_5", false, &[]), - - ]; // Publishers are sorted in the publisher stake caps message so we sort them here too @@ -1072,9 +1075,7 @@ fn test_publisher_stake_caps() { // We add some badly formatted stake cap parameters let mut stake_cap_parameters_account = AccountSharedData::new(42, size_of::(), &ORACLE_PID); - stake_cap_parameters_account.set_data( - vec![1,2,3,4], - ); + stake_cap_parameters_account.set_data(vec![1, 2, 3, 4]); bank.store_account(&STAKE_CAPS_PARAMETERS_ADDR, &stake_cap_parameters_account); // Nothing should change as the stake cap parameters are invalid @@ -1144,3 +1145,128 @@ fn test_get_accumulator_keys() { ]; assert_eq!(accumulator_keys, expected_pyth_keys); } + +#[test] +fn test_batch_publish() { + let leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config_with_leader(5, &leader_pubkey, 3); + + // Set epoch length to 32 so we can advance epochs quickly. We also skip past slot 0 here + // due to slot 0 having special handling. + let slots_in_epoch = 32; + genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); + let mut bank = create_new_bank_for_tests_with_index(&genesis_config); + + let generate_publisher = |seed, new_prices| { + let publisher1_key = keypair_from_seed(seed).unwrap(); + + let (publisher1_prices_key, _bump) = Pubkey::find_program_address( + // TODO: real seed + &[ + b"PUBLISHER_PRICES_ACCOUNT", + &publisher1_key.pubkey().to_bytes(), + ], + &BATCH_PUBLISH_PID, + ); + let mut publisher1_prices_account = + AccountSharedData::new(42, publisher_prices_account::size(100), &BATCH_PUBLISH_PID); + { + let (header, prices) = publisher_prices_account::create( + publisher1_prices_account.data_mut(), + publisher1_key.pubkey().to_bytes(), + ) + .unwrap(); + publisher_prices_account::extend(header, prices, cast_slice(new_prices)).unwrap(); + } + bank.store_account(&publisher1_prices_key, &publisher1_prices_account); + + publisher1_key + }; + + let publishers = [ + generate_publisher( + &[1u8; 32], + &[ + PublisherPrice::new(1, 1, 10, 2).unwrap(), + PublisherPrice::new(2, 1, 20, 3).unwrap(), + ], + ), + generate_publisher( + &[2u8; 32], + &[ + PublisherPrice::new(1, 1, 15, 2).unwrap(), + PublisherPrice::new(2, 1, 25, 3).unwrap(), + ], + ), + ]; + + let generate_price = |seeds, index| { + let (price_feed_key, _bump) = Pubkey::find_program_address(&[seeds], &ORACLE_PID); + let mut price_feed_account = + AccountSharedData::new(42, size_of::(), &ORACLE_PID); + + let messages = { + let price_feed_info_key = &price_feed_key.to_bytes().into(); + let price_feed_info_lamports = &mut 0; + let price_feed_info_owner = &ORACLE_PID.to_bytes().into(); + let price_feed_info_data = price_feed_account.data_mut(); + let price_feed_info = AccountInfo::new( + price_feed_info_key, + false, + true, + price_feed_info_lamports, + price_feed_info_data, + price_feed_info_owner, + false, + Epoch::default(), + ); + + let mut price_account = PriceAccount::initialize(&price_feed_info, 0).unwrap(); + price_account.flags.insert( + PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED, + ); + price_account.unused_3_ = index; + price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into(); + price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into(); + price_account.num_ = 2; + }; + + bank.store_account(&price_feed_key, &price_feed_account); + (price_feed_key, messages) + }; + + assert!(bank + .feature_set + .is_active(&feature_set::enable_accumulator_sysvar::id())); + assert!(bank + .feature_set + .is_active(&feature_set::move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::undo_move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::redo_move_accumulator_to_end_of_block::id())); + + let prices_with_messages = [ + generate_price(b"seeds_1", 1), + generate_price(b"seeds_2", 2), + generate_price(b"seeds_3", 3), + generate_price(b"seeds_4", 4), + ]; + + bank = new_from_parent(&Arc::new(bank)); // Advance slot 1. + bank = new_from_parent(&Arc::new(bank)); // Advance slot 2. + + let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap(); + let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data()); + assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10); + assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15); + + let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap(); + let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data()); + assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20); + assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25); +} diff --git a/runtime/src/bank/pyth_batch_publish.rs b/runtime/src/bank/pyth_batch_publish.rs new file mode 100644 index 0000000000..68a0dec0aa --- /dev/null +++ b/runtime/src/bank/pyth_batch_publish.rs @@ -0,0 +1,310 @@ +use { + super::{pyth_accumulator::BATCH_PUBLISH_PID, Bank}, + crate::accounts_index::{IndexKey, ScanConfig, ScanError}, + log::warn, + pyth_oracle::{ + find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey, + OracleError, PriceAccount, + }, + solana_sdk::{account::ReadableAccount, clock::Slot}, + std::collections::HashMap, + thiserror::Error, +}; + +#[allow(dead_code)] +pub mod publisher_prices_account { + use { + bytemuck::{cast_slice, from_bytes, from_bytes_mut, Pod, Zeroable}, + solana_sdk::clock::Slot, + std::mem::size_of, + thiserror::Error, + }; + + const FORMAT: u32 = 2848712303; + + #[derive(Debug, Clone, Copy, Zeroable, Pod)] + #[repr(C, packed)] + pub struct PublisherPricesHeader { + pub format: u32, + pub publisher: [u8; 32], + pub slot: Slot, + pub num_prices: u32, + } + + impl PublisherPricesHeader { + fn new(publisher: [u8; 32]) -> Self { + PublisherPricesHeader { + format: FORMAT, + publisher, + slot: 0, + num_prices: 0, + } + } + } + + #[derive(Debug, Clone, Copy, Zeroable, Pod)] + #[repr(C, packed)] + pub struct PublisherPrice { + // 4 high bits: trading status + // 28 low bits: feed index + pub trading_status_and_feed_index: u32, + pub price: i64, + pub confidence: u64, + } + + #[derive(Debug, Error)] + #[error("publisher price data overflow")] + pub struct PublisherPriceError; + + impl PublisherPrice { + pub fn new( + feed_index: u32, + trading_status: u32, + price: i64, + confidence: u64, + ) -> Result { + if feed_index >= (1 << 28) || trading_status >= (1 << 4) { + return Err(PublisherPriceError); + } + Ok(Self { + trading_status_and_feed_index: (trading_status << 28) | feed_index, + price, + confidence, + }) + } + + pub fn trading_status(&self) -> u32 { + self.trading_status_and_feed_index >> 28 + } + + pub fn feed_index(&self) -> u32 { + self.trading_status_and_feed_index & ((1 << 28) - 1) + } + } + + #[derive(Debug, Error)] + pub enum ReadAccountError { + #[error("data too short")] + DataTooShort, + #[error("format mismatch")] + FormatMismatch, + #[error("invalid num prices")] + InvalidNumPrices, + } + + #[derive(Debug, Error)] + pub enum ExtendError { + #[error("not enough space")] + NotEnoughSpace, + #[error("invalid length")] + InvalidLength, + } + + pub fn read( + data: &[u8], + ) -> Result<(&PublisherPricesHeader, &[PublisherPrice]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let header: &PublisherPricesHeader = + from_bytes(&data[..size_of::()]); + if header.format != FORMAT { + return Err(ReadAccountError::FormatMismatch); + } + let prices_bytes = &data[size_of::()..]; + let num_prices: usize = header.num_prices.try_into().unwrap(); + let expected_len = num_prices.saturating_mul(size_of::()); + if expected_len > prices_bytes.len() { + return Err(ReadAccountError::InvalidNumPrices); + } + let prices = cast_slice(&prices_bytes[..expected_len]); + Ok((header, prices)) + } + + pub fn size(max_prices: usize) -> usize { + size_of::() + max_prices * size_of::() + } + + pub fn read_mut( + data: &mut [u8], + ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let (header, prices) = data.split_at_mut(size_of::()); + let header: &mut PublisherPricesHeader = from_bytes_mut(header); + if header.format != FORMAT { + return Err(ReadAccountError::FormatMismatch); + } + Ok((header, prices)) + } + + pub fn create( + data: &mut [u8], + publisher: [u8; 32], + ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let (header, prices) = data.split_at_mut(size_of::()); + let header: &mut PublisherPricesHeader = from_bytes_mut(header); + *header = PublisherPricesHeader::new(publisher); + Ok((header, prices)) + } + + pub fn extend( + header: &mut PublisherPricesHeader, + prices: &mut [u8], + new_prices: &[u8], + ) -> Result<(), ExtendError> { + if new_prices.len() % size_of::() != 0 { + return Err(ExtendError::InvalidLength); + } + let num_new_prices = (new_prices.len() / size_of::()) + .try_into() + .expect("unexpected overflow"); + let num_prices: usize = header.num_prices.try_into().unwrap(); + let start = size_of::() * num_prices; + let end = size_of::() * num_prices + new_prices.len(); + header.num_prices = header + .num_prices + .checked_add(num_new_prices) + .expect("unexpected overflow"); + prices + .get_mut(start..end) + .ok_or(ExtendError::NotEnoughSpace)? + .copy_from_slice(new_prices); + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum HandleBatchPublishError { + #[error("failed to get program accounts: {0}")] + GetProgramAccounts(#[from] ScanError), +} + +#[derive(Debug)] +pub struct PublisherPriceValue { + pub publisher: Pubkey, + pub trading_status: u32, + pub price: i64, + pub confidence: u64, +} + +pub fn extract_batch_publish_prices( + bank: &Bank, +) -> Result>, HandleBatchPublishError> { + assert!( + bank.account_indexes_include_key(&*BATCH_PUBLISH_PID), + "Oracle program account index missing" + ); + + let publisher_prices_accounts = bank + .get_filtered_indexed_accounts( + &IndexKey::ProgramId(*BATCH_PUBLISH_PID), + |account| account.owner() == &*BATCH_PUBLISH_PID, + &ScanConfig::new(true), + None, + ) + .map_err(HandleBatchPublishError::GetProgramAccounts)?; + + let mut all_prices = HashMap::>::new(); + for (account_key, account) in publisher_prices_accounts { + let (header, prices) = match publisher_prices_account::read(account.data()) { + Ok(r) => r, + Err(err) => { + warn!("invalid publisher prices account {}: {}", account_key, err); + continue; + } + }; + if header.slot != bank.slot() { + // Updates from earlier slots have already been applied. + continue; + } + let publisher = header.publisher.into(); + for price in prices { + all_prices + .entry(price.feed_index()) + .or_default() + .push(PublisherPriceValue { + publisher, + trading_status: price.trading_status(), + price: price.price, + confidence: price.confidence, + }); + } + } + Ok(all_prices) +} + +pub fn apply_published_prices( + price_data: &mut PriceAccount, + new_prices: &HashMap>, + slot: Slot, +) -> bool { + // TODO: store index here or somewhere else? + let price_feed_index = price_data.unused_3_ as u32; + let mut any_update = false; + for new_price in new_prices.get(&price_feed_index).unwrap_or(&Vec::new()) { + match apply_published_price(price_data, new_price, slot) { + Ok(()) => { + any_update = true; + } + Err(err) => { + warn!( + "failed to apply publisher price to price feed {}: {}", + price_data.unused_3_ as u32, err + ); + } + } + } + any_update +} + +#[derive(Debug, Error)] +enum ApplyPublishedPriceError { + #[error("publisher {1} is not allowed to publish prices for feed {0}")] + NoPermission(u32, Pubkey), + #[error("bad conf price ratio: {0}")] + BadConfPriceRatio(#[from] OracleError), + #[error("invalid publishers num_")] + InvalidPublishersNum, + #[error("invalid publisher index")] + InvalidPublisherIndex, +} + +fn apply_published_price( + price_data: &mut PriceAccount, + new_price: &PublisherPriceValue, + slot: Slot, +) -> Result<(), ApplyPublishedPriceError> { + let publishers = price_data + .comp_ + .get(..price_data.num_.try_into().unwrap()) + .ok_or(ApplyPublishedPriceError::InvalidPublishersNum)?; + + let publisher_index = find_publisher_index(publishers, &new_price.publisher).ok_or( + ApplyPublishedPriceError::NoPermission(price_data.unused_3_ as u32, new_price.publisher), + )?; + + // IMPORTANT: If the publisher does not meet the price/conf + // ratio condition, its price will not count for the next + // aggregate. + let status: u32 = get_status_for_conf_price_ratio( + new_price.price, + new_price.confidence, + new_price.trading_status, + )?; + + let publisher_price = &mut price_data + .comp_ + .get_mut(publisher_index) + .ok_or(ApplyPublishedPriceError::InvalidPublisherIndex)? + .latest_; + publisher_price.price_ = new_price.price; + publisher_price.conf_ = new_price.confidence; + publisher_price.status_ = status; + publisher_price.pub_slot_ = slot; + Ok(()) +} diff --git a/validator/src/main.rs b/validator/src/main.rs index 711c3b7806..1afe140da6 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -52,7 +52,7 @@ use { AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, AccountsIndexConfig, IndexLimitMb, }, - bank::pyth_accumulator::{MESSAGE_BUFFER_PID, ORACLE_PID}, + bank::pyth_accumulator::{BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, ORACLE_PID}, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, runtime_config::RuntimeConfig, snapshot_config::SnapshotConfig, @@ -3203,12 +3203,14 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { if include_keys { if !account_indexes_include_keys.contains(&*ORACLE_PID) || !account_indexes_include_keys.contains(&*MESSAGE_BUFFER_PID) + || !account_indexes_include_keys.contains(&*BATCH_PUBLISH_PID) { panic!( "The oracle program id and message buffer program id must be included in the account index. Add the following flags\n\ --account-index-include-key {}\n\ + --account-index-include-key {}\n\ --account-index-include-key {}\n", - &*ORACLE_PID, &*MESSAGE_BUFFER_PID + &*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID, ); } } @@ -3216,6 +3218,7 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { if exclude_keys { if account_indexes_exclude_keys.contains(&*ORACLE_PID) || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) + || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID) { panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); } From c0bb42c2fa505093131755229fc6886ee8fbc806 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 31 Jul 2024 10:58:38 +0100 Subject: [PATCH 02/10] chore: add CI 10 --- .github/workflows/pyth.yml | 51 ++++++++++++++++++++++ runtime/src/bank/pyth_accumulator.rs | 22 ++++++---- runtime/src/bank/pyth_accumulator_tests.rs | 1 + validator/src/main.rs | 22 +++++----- 4 files changed, 76 insertions(+), 20 deletions(-) create mode 100644 .github/workflows/pyth.yml diff --git a/.github/workflows/pyth.yml b/.github/workflows/pyth.yml new file mode 100644 index 0000000000..e623db975b --- /dev/null +++ b/.github/workflows/pyth.yml @@ -0,0 +1,51 @@ +name: Check Pythnet + +on: + pull_request: + push: + branches: [main] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: 1.60.0 + components: clippy + override: true + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y libssl-dev libudev-dev pkg-config zlib1g-dev llvm clang cmake make libprotobuf-dev + - name: Run tests + run: cargo test -p solana-runtime pyth + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: 1.60.0 + components: clippy + override: true + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y libssl-dev libudev-dev pkg-config zlib1g-dev llvm clang cmake make libprotobuf-dev + - name: Check clippy + run: cargo clippy --bins --tests --examples -- --deny warnings + format: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: nightly-2022-04-01 + components: rustfmt + override: true + - name: Check formatting + run: cargo fmt -- --check diff --git a/runtime/src/bank/pyth_accumulator.rs b/runtime/src/bank/pyth_accumulator.rs index c37e7c516d..46c5524b83 100644 --- a/runtime/src/bank/pyth_accumulator.rs +++ b/runtime/src/bank/pyth_accumulator.rs @@ -446,13 +446,19 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV let mut measure = Measure::start("update_v2_aggregate_price"); for (pubkey, mut account) in accounts { let mut price_account_data = account.data().to_owned(); - let price_account = if let Ok(data) = - pyth_oracle::validator::validate_price_account(&mut price_account_data) - { - data - } else { - continue; // Not a price account. - }; + let price_account = + match pyth_oracle::validator::validate_price_account(&mut price_account_data) { + Ok(data) => data, + Err(err) => match err { + AggregationError::NotPriceFeedAccount => { + continue; + } + AggregationError::V1AggregationMode | AggregationError::AlreadyAggregated => { + any_v1_aggregations = true; + continue; + } + }, + }; let mut need_save = pyth_batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); @@ -468,7 +474,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV need_save = true; v2_messages.extend(messages); } - Err(err) => match err { + Err(err) => match dbg!(err) { AggregationError::NotPriceFeedAccount => {} AggregationError::V1AggregationMode | AggregationError::AlreadyAggregated => { any_v1_aggregations = true; diff --git a/runtime/src/bank/pyth_accumulator_tests.rs b/runtime/src/bank/pyth_accumulator_tests.rs index fc8b582044..f55260a9b5 100644 --- a/runtime/src/bank/pyth_accumulator_tests.rs +++ b/runtime/src/bank/pyth_accumulator_tests.rs @@ -1142,6 +1142,7 @@ fn test_get_accumulator_keys() { Pubkey::new_from_array(pythnet::WORMHOLE_PID), *ORACLE_PID, *STAKE_CAPS_PARAMETERS_ADDR, + *BATCH_PUBLISH_PID, ]; assert_eq!(accumulator_keys, expected_pyth_keys); } diff --git a/validator/src/main.rs b/validator/src/main.rs index 1afe140da6..efee36b715 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -3200,28 +3200,26 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { let exclude_keys = !account_indexes_exclude_keys.is_empty(); let include_keys = !account_indexes_include_keys.is_empty(); - if include_keys { - if !account_indexes_include_keys.contains(&*ORACLE_PID) + if include_keys + && (!account_indexes_include_keys.contains(&*ORACLE_PID) || !account_indexes_include_keys.contains(&*MESSAGE_BUFFER_PID) - || !account_indexes_include_keys.contains(&*BATCH_PUBLISH_PID) - { - panic!( + || !account_indexes_include_keys.contains(&*BATCH_PUBLISH_PID)) + { + panic!( "The oracle program id and message buffer program id must be included in the account index. Add the following flags\n\ --account-index-include-key {}\n\ --account-index-include-key {}\n\ --account-index-include-key {}\n", &*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID, ); - } } - if exclude_keys { - if account_indexes_exclude_keys.contains(&*ORACLE_PID) + if exclude_keys + && (account_indexes_exclude_keys.contains(&*ORACLE_PID) || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) - || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID) - { - panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); - } + || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID)) + { + panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); } let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) { From 0387cdfca2e7d35bdf6fd4c65105d89f31a40520 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 1 Aug 2024 10:30:26 +0100 Subject: [PATCH 03/10] feat: batch publish 2 (wip) --- Cargo.lock | 2 +- runtime/src/bank/pyth_accumulator.rs | 16 +++++----------- runtime/src/bank/pyth_accumulator_tests.rs | 2 +- runtime/src/bank/pyth_batch_publish.rs | 21 +++++++++++---------- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3282e1ee78..f3314cc104 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,7 +519,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive 0.10.3", - "hashbrown 0.11.2", + "hashbrown 0.12.3", ] [[package]] diff --git a/runtime/src/bank/pyth_accumulator.rs b/runtime/src/bank/pyth_accumulator.rs index 46c5524b83..ee30aca37c 100644 --- a/runtime/src/bank/pyth_accumulator.rs +++ b/runtime/src/bank/pyth_accumulator.rs @@ -447,17 +447,11 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV for (pubkey, mut account) in accounts { let mut price_account_data = account.data().to_owned(); let price_account = - match pyth_oracle::validator::validate_price_account(&mut price_account_data) { + match pyth_oracle::validator::checked_load_price_account_mut(&mut price_account_data) { Ok(data) => data, - Err(err) => match err { - AggregationError::NotPriceFeedAccount => { - continue; - } - AggregationError::V1AggregationMode | AggregationError::AlreadyAggregated => { - any_v1_aggregations = true; - continue; - } - }, + Err(_err) => { + continue; + } }; let mut need_save = @@ -474,7 +468,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV need_save = true; v2_messages.extend(messages); } - Err(err) => match dbg!(err) { + Err(err) => match err { AggregationError::NotPriceFeedAccount => {} AggregationError::V1AggregationMode | AggregationError::AlreadyAggregated => { any_v1_aggregations = true; diff --git a/runtime/src/bank/pyth_accumulator_tests.rs b/runtime/src/bank/pyth_accumulator_tests.rs index f55260a9b5..5c3673a131 100644 --- a/runtime/src/bank/pyth_accumulator_tests.rs +++ b/runtime/src/bank/pyth_accumulator_tests.rs @@ -1228,7 +1228,7 @@ fn test_batch_publish() { price_account.flags.insert( PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED, ); - price_account.unused_3_ = index; + price_account.feed_index = index; price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into(); price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into(); price_account.num_ = 2; diff --git a/runtime/src/bank/pyth_batch_publish.rs b/runtime/src/bank/pyth_batch_publish.rs index 68a0dec0aa..406a313551 100644 --- a/runtime/src/bank/pyth_batch_publish.rs +++ b/runtime/src/bank/pyth_batch_publish.rs @@ -11,6 +11,7 @@ use { thiserror::Error, }; +// TODO: move to the publish program #[allow(dead_code)] pub mod publisher_prices_account { use { @@ -224,15 +225,16 @@ pub fn extract_batch_publish_prices( } let publisher = header.publisher.into(); for price in prices { + let value = PublisherPriceValue { + publisher, + trading_status: price.trading_status(), + price: price.price, + confidence: price.confidence, + }; all_prices .entry(price.feed_index()) .or_default() - .push(PublisherPriceValue { - publisher, - trading_status: price.trading_status(), - price: price.price, - confidence: price.confidence, - }); + .push(value); } } Ok(all_prices) @@ -243,8 +245,7 @@ pub fn apply_published_prices( new_prices: &HashMap>, slot: Slot, ) -> bool { - // TODO: store index here or somewhere else? - let price_feed_index = price_data.unused_3_ as u32; + let price_feed_index = price_data.feed_index as u32; let mut any_update = false; for new_price in new_prices.get(&price_feed_index).unwrap_or(&Vec::new()) { match apply_published_price(price_data, new_price, slot) { @@ -254,7 +255,7 @@ pub fn apply_published_prices( Err(err) => { warn!( "failed to apply publisher price to price feed {}: {}", - price_data.unused_3_ as u32, err + price_data.feed_index as u32, err ); } } @@ -285,7 +286,7 @@ fn apply_published_price( .ok_or(ApplyPublishedPriceError::InvalidPublishersNum)?; let publisher_index = find_publisher_index(publishers, &new_price.publisher).ok_or( - ApplyPublishedPriceError::NoPermission(price_data.unused_3_ as u32, new_price.publisher), + ApplyPublishedPriceError::NoPermission(price_data.feed_index as u32, new_price.publisher), )?; // IMPORTANT: If the publisher does not meet the price/conf From 491e168492eaeaf698a8cbc9608a8a290d9bfedb Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 7 Aug 2024 11:25:21 +0100 Subject: [PATCH 04/10] feat: verify that price feed index is set --- .github/workflows/pyth.yml | 2 +- Cargo.lock | 2 +- runtime/src/bank/pyth_batch_publish.rs | 13 +++++++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pyth.yml b/.github/workflows/pyth.yml index e623db975b..291a49bf3f 100644 --- a/.github/workflows/pyth.yml +++ b/.github/workflows/pyth.yml @@ -3,7 +3,7 @@ name: Check Pythnet on: pull_request: push: - branches: [main] + branches: [pyth-v1.14.17] jobs: test: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index f3314cc104..0a652e01bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3601,7 +3601,7 @@ dependencies = [ [[package]] name = "pyth-oracle" version = "2.33.0" -source = "git+https://github.com/pyth-network/pyth-client?branch=batch-publish#44bf1257e14d3ecf06516bc2520d9358e58b1ed7" +source = "git+https://github.com/pyth-network/pyth-client?branch=batch-publish#f466deb73c4a15ea13cb271c3a900c5a85810c87" dependencies = [ "bindgen 0.60.1", "bitflags 2.6.0", diff --git a/runtime/src/bank/pyth_batch_publish.rs b/runtime/src/bank/pyth_batch_publish.rs index 406a313551..fe9c6cb371 100644 --- a/runtime/src/bank/pyth_batch_publish.rs +++ b/runtime/src/bank/pyth_batch_publish.rs @@ -245,9 +245,14 @@ pub fn apply_published_prices( new_prices: &HashMap>, slot: Slot, ) -> bool { - let price_feed_index = price_data.feed_index as u32; + if price_data.feed_index == 0 { + return false; + } let mut any_update = false; - for new_price in new_prices.get(&price_feed_index).unwrap_or(&Vec::new()) { + for new_price in new_prices + .get(&price_data.feed_index) + .unwrap_or(&Vec::new()) + { match apply_published_price(price_data, new_price, slot) { Ok(()) => { any_update = true; @@ -255,7 +260,7 @@ pub fn apply_published_prices( Err(err) => { warn!( "failed to apply publisher price to price feed {}: {}", - price_data.feed_index as u32, err + price_data.feed_index, err ); } } @@ -286,7 +291,7 @@ fn apply_published_price( .ok_or(ApplyPublishedPriceError::InvalidPublishersNum)?; let publisher_index = find_publisher_index(publishers, &new_price.publisher).ok_or( - ApplyPublishedPriceError::NoPermission(price_data.feed_index as u32, new_price.publisher), + ApplyPublishedPriceError::NoPermission(price_data.feed_index, new_price.publisher), )?; // IMPORTANT: If the publisher does not meet the price/conf From 2611090cdd1456c343efd808ae1c579771679710 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 7 Aug 2024 11:44:09 +0100 Subject: [PATCH 05/10] refactor: add pyth module --- core/src/validator.rs | 4 +- runtime/src/bank.rs | 12 +- .../accumulator.rs} | 11 +- .../batch_publish.rs} | 7 +- runtime/src/bank/pyth/mod.rs | 5 + .../tests/accumulator_tests.rs} | 163 +----------------- .../bank/pyth/tests/batch_publish_tests.rs | 149 ++++++++++++++++ runtime/src/bank/pyth/tests/mod.rs | 36 ++++ validator/src/main.rs | 2 +- 9 files changed, 215 insertions(+), 174 deletions(-) rename runtime/src/bank/{pyth_accumulator.rs => pyth/accumulator.rs} (98%) rename runtime/src/bank/{pyth_batch_publish.rs => pyth/batch_publish.rs} (98%) create mode 100644 runtime/src/bank/pyth/mod.rs rename runtime/src/bank/{pyth_accumulator_tests.rs => pyth/tests/accumulator_tests.rs} (87%) create mode 100644 runtime/src/bank/pyth/tests/batch_publish_tests.rs create mode 100644 runtime/src/bank/pyth/tests/mod.rs diff --git a/core/src/validator.rs b/core/src/validator.rs index 923be65d65..cecda8da28 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -74,7 +74,7 @@ use { accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, - bank::{pyth_accumulator, Bank}, + bank::{pyth, Bank}, bank_forks::BankForks, commitment::BlockCommitmentCache, cost_model::CostModel, @@ -1521,7 +1521,7 @@ fn load_blockstore( } } - for (key_name, pk_res) in pyth_accumulator::get_accumulator_keys() { + for (key_name, pk_res) in pyth::accumulator::get_accumulator_keys() { match pk_res { Ok(pk) => info!("Accumulator {}: {}", key_name, pk), Err(err) => { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e7a3bb53d8..a5330d1fe1 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -198,11 +198,7 @@ mod builtin_programs; mod sysvar_cache; mod transaction_account_state_info; -pub mod pyth_accumulator; -mod pyth_batch_publish; - -#[cfg(test)] -mod pyth_accumulator_tests; +pub mod pyth; pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0; @@ -1411,7 +1407,7 @@ impl Bank { // state before the accumulator is used. bank is in a fully // updated state before the accumulator is used. if !accumulator_moved_to_end_of_block { - pyth_accumulator::update_accumulator(&bank); + pyth::accumulator::update_accumulator(&bank); } bank @@ -1797,7 +1793,7 @@ impl Bank { // the accumulator sysvar updates. sysvars are in a fully updated // state before the accumulator sysvar updates. if !accumulator_moved_to_end_of_block { - pyth_accumulator::update_accumulator(&new); + pyth::accumulator::update_accumulator(&new); } }); @@ -3239,7 +3235,7 @@ impl Bank { // other tasks when freezing to avoid any conflicts. if accumulator_moved_to_end_of_block { let mut measure = Measure::start("accumulator"); - pyth_accumulator::update_accumulator(self); + pyth::accumulator::update_accumulator(self); measure.stop(); debug!( diff --git a/runtime/src/bank/pyth_accumulator.rs b/runtime/src/bank/pyth/accumulator.rs similarity index 98% rename from runtime/src/bank/pyth_accumulator.rs rename to runtime/src/bank/pyth/accumulator.rs index ee30aca37c..2c0ae04cc8 100644 --- a/runtime/src/bank/pyth_accumulator.rs +++ b/runtime/src/bank/pyth/accumulator.rs @@ -1,6 +1,9 @@ use { - super::{pyth_batch_publish, Bank}, - crate::accounts_index::{IndexKey, ScanConfig, ScanError}, + super::batch_publish, + crate::{ + accounts_index::{IndexKey, ScanConfig, ScanError}, + bank::Bank, + }, byteorder::{LittleEndian, ReadBytesExt}, log::*, pyth_oracle::validator::AggregationError, @@ -438,7 +441,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV v2_messages.push(publisher_stake_caps_message); } - let new_prices = pyth_batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { + let new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { warn!("extract_batch_publish_prices failed: {}", err); HashMap::new() }); @@ -455,7 +458,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV }; let mut need_save = - pyth_batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); + batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); // Perform Accumulation match pyth_oracle::validator::aggregate_price( diff --git a/runtime/src/bank/pyth_batch_publish.rs b/runtime/src/bank/pyth/batch_publish.rs similarity index 98% rename from runtime/src/bank/pyth_batch_publish.rs rename to runtime/src/bank/pyth/batch_publish.rs index fe9c6cb371..e8302f8cc5 100644 --- a/runtime/src/bank/pyth_batch_publish.rs +++ b/runtime/src/bank/pyth/batch_publish.rs @@ -1,6 +1,9 @@ use { - super::{pyth_accumulator::BATCH_PUBLISH_PID, Bank}, - crate::accounts_index::{IndexKey, ScanConfig, ScanError}, + super::accumulator::BATCH_PUBLISH_PID, + crate::{ + accounts_index::{IndexKey, ScanConfig, ScanError}, + bank::Bank, + }, log::warn, pyth_oracle::{ find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey, diff --git a/runtime/src/bank/pyth/mod.rs b/runtime/src/bank/pyth/mod.rs new file mode 100644 index 0000000000..795e29db71 --- /dev/null +++ b/runtime/src/bank/pyth/mod.rs @@ -0,0 +1,5 @@ +pub mod accumulator; +mod batch_publish; + +#[cfg(test)] +mod tests; diff --git a/runtime/src/bank/pyth_accumulator_tests.rs b/runtime/src/bank/pyth/tests/accumulator_tests.rs similarity index 87% rename from runtime/src/bank/pyth_accumulator_tests.rs rename to runtime/src/bank/pyth/tests/accumulator_tests.rs index 5c3673a131..6ab1c05f11 100644 --- a/runtime/src/bank/pyth_accumulator_tests.rs +++ b/runtime/src/bank/pyth/tests/accumulator_tests.rs @@ -1,21 +1,17 @@ use { - super::pyth_accumulator::MESSAGE_BUFFER_PID, crate::{ - accounts_db::AccountShrinkThreshold, - accounts_index::{ - AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, - }, bank::{ - pyth_accumulator::{ - get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, - STAKE_CAPS_PARAMETERS_ADDR, + pyth::{ + accumulator::{ + get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, + STAKE_CAPS_PARAMETERS_ADDR, + }, + tests::{create_new_bank_for_tests_with_index, new_from_parent}, }, - pyth_batch_publish::publisher_prices_account::{self, PublisherPrice}, Bank, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, }, - bytemuck::{cast_slice, checked::from_bytes}, byteorder::{ByteOrder, LittleEndian, ReadBytesExt}, itertools::Itertools, pyth_oracle::{ @@ -36,7 +32,6 @@ use { epoch_schedule::EpochSchedule, feature::{self, Feature}, feature_set, - genesis_config::GenesisConfig, hash::hashv, pubkey::Pubkey, signature::keypair_from_seed, @@ -45,23 +40,6 @@ use { std::{io::Read, mem::size_of, sync::Arc}, }; -fn create_new_bank_for_tests_with_index(genesis_config: &GenesisConfig) -> Bank { - Bank::new_with_config_for_tests( - genesis_config, - AccountSecondaryIndexes { - keys: Some(AccountSecondaryIndexesIncludeExclude { - exclude: false, - keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID, *BATCH_PUBLISH_PID] - .into_iter() - .collect(), - }), - indexes: [AccountIndex::ProgramId].into_iter().collect(), - }, - false, - AccountShrinkThreshold::default(), - ) -} - // Create Message Account Bytes // // NOTE: This was serialized by hand, but should be replaced with the pythnet-sdk @@ -400,10 +378,6 @@ fn test_update_accumulator_sysvar() { // 3. Check if message offset is > message size to prevent validator crash. } -fn new_from_parent(parent: &Arc) -> Bank { - Bank::new_from_parent(parent, &Pubkey::default(), parent.slot() + 1) -} - #[test] fn test_update_accumulator_end_of_block() { let leader_pubkey = solana_sdk::pubkey::new_rand(); @@ -1146,128 +1120,3 @@ fn test_get_accumulator_keys() { ]; assert_eq!(accumulator_keys, expected_pyth_keys); } - -#[test] -fn test_batch_publish() { - let leader_pubkey = solana_sdk::pubkey::new_rand(); - let GenesisConfigInfo { - mut genesis_config, .. - } = create_genesis_config_with_leader(5, &leader_pubkey, 3); - - // Set epoch length to 32 so we can advance epochs quickly. We also skip past slot 0 here - // due to slot 0 having special handling. - let slots_in_epoch = 32; - genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); - let mut bank = create_new_bank_for_tests_with_index(&genesis_config); - - let generate_publisher = |seed, new_prices| { - let publisher1_key = keypair_from_seed(seed).unwrap(); - - let (publisher1_prices_key, _bump) = Pubkey::find_program_address( - // TODO: real seed - &[ - b"PUBLISHER_PRICES_ACCOUNT", - &publisher1_key.pubkey().to_bytes(), - ], - &BATCH_PUBLISH_PID, - ); - let mut publisher1_prices_account = - AccountSharedData::new(42, publisher_prices_account::size(100), &BATCH_PUBLISH_PID); - { - let (header, prices) = publisher_prices_account::create( - publisher1_prices_account.data_mut(), - publisher1_key.pubkey().to_bytes(), - ) - .unwrap(); - publisher_prices_account::extend(header, prices, cast_slice(new_prices)).unwrap(); - } - bank.store_account(&publisher1_prices_key, &publisher1_prices_account); - - publisher1_key - }; - - let publishers = [ - generate_publisher( - &[1u8; 32], - &[ - PublisherPrice::new(1, 1, 10, 2).unwrap(), - PublisherPrice::new(2, 1, 20, 3).unwrap(), - ], - ), - generate_publisher( - &[2u8; 32], - &[ - PublisherPrice::new(1, 1, 15, 2).unwrap(), - PublisherPrice::new(2, 1, 25, 3).unwrap(), - ], - ), - ]; - - let generate_price = |seeds, index| { - let (price_feed_key, _bump) = Pubkey::find_program_address(&[seeds], &ORACLE_PID); - let mut price_feed_account = - AccountSharedData::new(42, size_of::(), &ORACLE_PID); - - let messages = { - let price_feed_info_key = &price_feed_key.to_bytes().into(); - let price_feed_info_lamports = &mut 0; - let price_feed_info_owner = &ORACLE_PID.to_bytes().into(); - let price_feed_info_data = price_feed_account.data_mut(); - let price_feed_info = AccountInfo::new( - price_feed_info_key, - false, - true, - price_feed_info_lamports, - price_feed_info_data, - price_feed_info_owner, - false, - Epoch::default(), - ); - - let mut price_account = PriceAccount::initialize(&price_feed_info, 0).unwrap(); - price_account.flags.insert( - PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED, - ); - price_account.feed_index = index; - price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into(); - price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into(); - price_account.num_ = 2; - }; - - bank.store_account(&price_feed_key, &price_feed_account); - (price_feed_key, messages) - }; - - assert!(bank - .feature_set - .is_active(&feature_set::enable_accumulator_sysvar::id())); - assert!(bank - .feature_set - .is_active(&feature_set::move_accumulator_to_end_of_block::id())); - assert!(bank - .feature_set - .is_active(&feature_set::undo_move_accumulator_to_end_of_block::id())); - assert!(bank - .feature_set - .is_active(&feature_set::redo_move_accumulator_to_end_of_block::id())); - - let prices_with_messages = [ - generate_price(b"seeds_1", 1), - generate_price(b"seeds_2", 2), - generate_price(b"seeds_3", 3), - generate_price(b"seeds_4", 4), - ]; - - bank = new_from_parent(&Arc::new(bank)); // Advance slot 1. - bank = new_from_parent(&Arc::new(bank)); // Advance slot 2. - - let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap(); - let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data()); - assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10); - assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15); - - let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap(); - let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data()); - assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20); - assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25); -} diff --git a/runtime/src/bank/pyth/tests/batch_publish_tests.rs b/runtime/src/bank/pyth/tests/batch_publish_tests.rs new file mode 100644 index 0000000000..65a1a590dd --- /dev/null +++ b/runtime/src/bank/pyth/tests/batch_publish_tests.rs @@ -0,0 +1,149 @@ +use { + crate::{ + bank::pyth::{ + accumulator::{BATCH_PUBLISH_PID, ORACLE_PID}, + batch_publish::publisher_prices_account::{self, PublisherPrice}, + tests::{create_new_bank_for_tests_with_index, new_from_parent}, + }, + genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, + }, + bytemuck::{cast_slice, checked::from_bytes}, + pyth_oracle::{ + solana_program::account_info::AccountInfo, PriceAccount, PriceAccountFlags, PythAccount, + }, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount, WritableAccount}, + clock::Epoch, + epoch_schedule::EpochSchedule, + feature_set, + pubkey::Pubkey, + signature::keypair_from_seed, + signer::Signer, + }, + std::{mem::size_of, sync::Arc}, +}; + +#[test] +fn test_batch_publish() { + let leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config_with_leader(5, &leader_pubkey, 3); + + // Set epoch length to 32 so we can advance epochs quickly. We also skip past slot 0 here + // due to slot 0 having special handling. + let slots_in_epoch = 32; + genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); + let mut bank = create_new_bank_for_tests_with_index(&genesis_config); + + let generate_publisher = |seed, new_prices| { + let publisher1_key = keypair_from_seed(seed).unwrap(); + + let (publisher1_prices_key, _bump) = Pubkey::find_program_address( + // TODO: real seed + &[ + b"PUBLISHER_PRICES_ACCOUNT", + &publisher1_key.pubkey().to_bytes(), + ], + &BATCH_PUBLISH_PID, + ); + let mut publisher1_prices_account = + AccountSharedData::new(42, publisher_prices_account::size(100), &BATCH_PUBLISH_PID); + { + let (header, prices) = publisher_prices_account::create( + publisher1_prices_account.data_mut(), + publisher1_key.pubkey().to_bytes(), + ) + .unwrap(); + publisher_prices_account::extend(header, prices, cast_slice(new_prices)).unwrap(); + } + bank.store_account(&publisher1_prices_key, &publisher1_prices_account); + + publisher1_key + }; + + let publishers = [ + generate_publisher( + &[1u8; 32], + &[ + PublisherPrice::new(1, 1, 10, 2).unwrap(), + PublisherPrice::new(2, 1, 20, 3).unwrap(), + ], + ), + generate_publisher( + &[2u8; 32], + &[ + PublisherPrice::new(1, 1, 15, 2).unwrap(), + PublisherPrice::new(2, 1, 25, 3).unwrap(), + ], + ), + ]; + + let generate_price = |seeds, index| { + let (price_feed_key, _bump) = Pubkey::find_program_address(&[seeds], &ORACLE_PID); + let mut price_feed_account = + AccountSharedData::new(42, size_of::(), &ORACLE_PID); + + let messages = { + let price_feed_info_key = &price_feed_key.to_bytes().into(); + let price_feed_info_lamports = &mut 0; + let price_feed_info_owner = &ORACLE_PID.to_bytes().into(); + let price_feed_info_data = price_feed_account.data_mut(); + let price_feed_info = AccountInfo::new( + price_feed_info_key, + false, + true, + price_feed_info_lamports, + price_feed_info_data, + price_feed_info_owner, + false, + Epoch::default(), + ); + + let mut price_account = PriceAccount::initialize(&price_feed_info, 0).unwrap(); + price_account.flags.insert( + PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED, + ); + price_account.feed_index = index; + price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into(); + price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into(); + price_account.num_ = 2; + }; + + bank.store_account(&price_feed_key, &price_feed_account); + (price_feed_key, messages) + }; + + assert!(bank + .feature_set + .is_active(&feature_set::enable_accumulator_sysvar::id())); + assert!(bank + .feature_set + .is_active(&feature_set::move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::undo_move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::redo_move_accumulator_to_end_of_block::id())); + + let prices_with_messages = [ + generate_price(b"seeds_1", 1), + generate_price(b"seeds_2", 2), + generate_price(b"seeds_3", 3), + generate_price(b"seeds_4", 4), + ]; + + bank = new_from_parent(&Arc::new(bank)); // Advance slot 1. + bank = new_from_parent(&Arc::new(bank)); // Advance slot 2. + + let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap(); + let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data()); + assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10); + assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15); + + let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap(); + let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data()); + assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20); + assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25); +} diff --git a/runtime/src/bank/pyth/tests/mod.rs b/runtime/src/bank/pyth/tests/mod.rs new file mode 100644 index 0000000000..5fef8052b9 --- /dev/null +++ b/runtime/src/bank/pyth/tests/mod.rs @@ -0,0 +1,36 @@ +use { + super::accumulator::{BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, ORACLE_PID}, + crate::{ + accounts_db::AccountShrinkThreshold, + accounts_index::{ + AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, + }, + bank::Bank, + }, + solana_sdk::{genesis_config::GenesisConfig, pubkey::Pubkey}, + std::sync::Arc, +}; + +mod accumulator_tests; +mod batch_publish_tests; + +fn new_from_parent(parent: &Arc) -> Bank { + Bank::new_from_parent(parent, &Pubkey::default(), parent.slot() + 1) +} + +fn create_new_bank_for_tests_with_index(genesis_config: &GenesisConfig) -> Bank { + Bank::new_with_config_for_tests( + genesis_config, + AccountSecondaryIndexes { + keys: Some(AccountSecondaryIndexesIncludeExclude { + exclude: false, + keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID, *BATCH_PUBLISH_PID] + .into_iter() + .collect(), + }), + indexes: [AccountIndex::ProgramId].into_iter().collect(), + }, + false, + AccountShrinkThreshold::default(), + ) +} diff --git a/validator/src/main.rs b/validator/src/main.rs index efee36b715..a86c88b488 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -52,7 +52,7 @@ use { AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, AccountsIndexConfig, IndexLimitMb, }, - bank::pyth_accumulator::{BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, ORACLE_PID}, + bank::pyth::accumulator::{BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, ORACLE_PID}, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, runtime_config::RuntimeConfig, snapshot_config::SnapshotConfig, From 9ceff438220c15500494bf952d125638d927b6ae Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 28 Aug 2024 10:05:02 +0100 Subject: [PATCH 06/10] feat: integrate pyth-price-publisher --- Cargo.lock | 11 + runtime/Cargo.toml | 1 + runtime/src/bank/pyth/accumulator.rs | 11 +- runtime/src/bank/pyth/batch_publish.rs | 192 ++---------------- .../bank/pyth/tests/batch_publish_tests.rs | 10 +- validator/src/main.rs | 15 +- 6 files changed, 53 insertions(+), 187 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a652e01bd..4a3965d287 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3615,6 +3615,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pyth-price-publisher" +version = "0.1.0" +source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#1fa5089f0f5abf1fe1d48ba0dda489843fcd545a" +dependencies = [ + "bytemuck", + "solana-program 1.14.17", + "thiserror", +] + [[package]] name = "pythnet-sdk" version = "1.13.6" @@ -6200,6 +6210,7 @@ dependencies = [ "once_cell", "ouroboros", "pyth-oracle", + "pyth-price-publisher", "pythnet-sdk 1.13.6", "rand 0.7.3", "rand_chacha 0.2.2", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index ac5b87ca27..e7628442e1 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,6 +37,7 @@ once_cell = "1.12.0" ouroboros = "0.15.0" pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] } pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "33f901aa45f4f0005aa5a84a1479b78ca9033074" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } rand = "0.7.0" rayon = "1.5.3" regex = "1.5.6" diff --git a/runtime/src/bank/pyth/accumulator.rs b/runtime/src/bank/pyth/accumulator.rs index 2c0ae04cc8..3577b2ff59 100644 --- a/runtime/src/bank/pyth/accumulator.rs +++ b/runtime/src/bank/pyth/accumulator.rs @@ -5,6 +5,7 @@ use { bank::Bank, }, byteorder::{LittleEndian, ReadBytesExt}, + itertools::Itertools, log::*, pyth_oracle::validator::AggregationError, pythnet_sdk::{ @@ -441,7 +442,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV v2_messages.push(publisher_stake_caps_message); } - let new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { + let mut new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { warn!("extract_batch_publish_prices failed: {}", err); HashMap::new() }); @@ -458,7 +459,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV }; let mut need_save = - batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); + batch_publish::apply_published_prices(price_account, &mut new_prices, bank.slot()); // Perform Accumulation match pyth_oracle::validator::aggregate_price( @@ -483,6 +484,12 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV bank.store_account_and_update_capitalization(&pubkey, &account); } } + if !new_prices.is_empty() { + warn!( + "pyth batch publish: missing price feed accounts for indexes: {}", + new_prices.keys().join(", ") + ); + } measure.stop(); debug!( diff --git a/runtime/src/bank/pyth/batch_publish.rs b/runtime/src/bank/pyth/batch_publish.rs index e8302f8cc5..a7a7962dc6 100644 --- a/runtime/src/bank/pyth/batch_publish.rs +++ b/runtime/src/bank/pyth/batch_publish.rs @@ -4,184 +4,17 @@ use { accounts_index::{IndexKey, ScanConfig, ScanError}, bank::Bank, }, - log::warn, + log::{info, warn}, pyth_oracle::{ find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey, OracleError, PriceAccount, }, + pyth_price_publisher::accounts::publisher_prices as publisher_prices_account, solana_sdk::{account::ReadableAccount, clock::Slot}, std::collections::HashMap, thiserror::Error, }; -// TODO: move to the publish program -#[allow(dead_code)] -pub mod publisher_prices_account { - use { - bytemuck::{cast_slice, from_bytes, from_bytes_mut, Pod, Zeroable}, - solana_sdk::clock::Slot, - std::mem::size_of, - thiserror::Error, - }; - - const FORMAT: u32 = 2848712303; - - #[derive(Debug, Clone, Copy, Zeroable, Pod)] - #[repr(C, packed)] - pub struct PublisherPricesHeader { - pub format: u32, - pub publisher: [u8; 32], - pub slot: Slot, - pub num_prices: u32, - } - - impl PublisherPricesHeader { - fn new(publisher: [u8; 32]) -> Self { - PublisherPricesHeader { - format: FORMAT, - publisher, - slot: 0, - num_prices: 0, - } - } - } - - #[derive(Debug, Clone, Copy, Zeroable, Pod)] - #[repr(C, packed)] - pub struct PublisherPrice { - // 4 high bits: trading status - // 28 low bits: feed index - pub trading_status_and_feed_index: u32, - pub price: i64, - pub confidence: u64, - } - - #[derive(Debug, Error)] - #[error("publisher price data overflow")] - pub struct PublisherPriceError; - - impl PublisherPrice { - pub fn new( - feed_index: u32, - trading_status: u32, - price: i64, - confidence: u64, - ) -> Result { - if feed_index >= (1 << 28) || trading_status >= (1 << 4) { - return Err(PublisherPriceError); - } - Ok(Self { - trading_status_and_feed_index: (trading_status << 28) | feed_index, - price, - confidence, - }) - } - - pub fn trading_status(&self) -> u32 { - self.trading_status_and_feed_index >> 28 - } - - pub fn feed_index(&self) -> u32 { - self.trading_status_and_feed_index & ((1 << 28) - 1) - } - } - - #[derive(Debug, Error)] - pub enum ReadAccountError { - #[error("data too short")] - DataTooShort, - #[error("format mismatch")] - FormatMismatch, - #[error("invalid num prices")] - InvalidNumPrices, - } - - #[derive(Debug, Error)] - pub enum ExtendError { - #[error("not enough space")] - NotEnoughSpace, - #[error("invalid length")] - InvalidLength, - } - - pub fn read( - data: &[u8], - ) -> Result<(&PublisherPricesHeader, &[PublisherPrice]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let header: &PublisherPricesHeader = - from_bytes(&data[..size_of::()]); - if header.format != FORMAT { - return Err(ReadAccountError::FormatMismatch); - } - let prices_bytes = &data[size_of::()..]; - let num_prices: usize = header.num_prices.try_into().unwrap(); - let expected_len = num_prices.saturating_mul(size_of::()); - if expected_len > prices_bytes.len() { - return Err(ReadAccountError::InvalidNumPrices); - } - let prices = cast_slice(&prices_bytes[..expected_len]); - Ok((header, prices)) - } - - pub fn size(max_prices: usize) -> usize { - size_of::() + max_prices * size_of::() - } - - pub fn read_mut( - data: &mut [u8], - ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let (header, prices) = data.split_at_mut(size_of::()); - let header: &mut PublisherPricesHeader = from_bytes_mut(header); - if header.format != FORMAT { - return Err(ReadAccountError::FormatMismatch); - } - Ok((header, prices)) - } - - pub fn create( - data: &mut [u8], - publisher: [u8; 32], - ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let (header, prices) = data.split_at_mut(size_of::()); - let header: &mut PublisherPricesHeader = from_bytes_mut(header); - *header = PublisherPricesHeader::new(publisher); - Ok((header, prices)) - } - - pub fn extend( - header: &mut PublisherPricesHeader, - prices: &mut [u8], - new_prices: &[u8], - ) -> Result<(), ExtendError> { - if new_prices.len() % size_of::() != 0 { - return Err(ExtendError::InvalidLength); - } - let num_new_prices = (new_prices.len() / size_of::()) - .try_into() - .expect("unexpected overflow"); - let num_prices: usize = header.num_prices.try_into().unwrap(); - let start = size_of::() * num_prices; - let end = size_of::() * num_prices + new_prices.len(); - header.num_prices = header - .num_prices - .checked_add(num_new_prices) - .expect("unexpected overflow"); - prices - .get_mut(start..end) - .ok_or(ExtendError::NotEnoughSpace)? - .copy_from_slice(new_prices); - Ok(()) - } -} - #[derive(Debug, Error)] pub enum HandleBatchPublishError { #[error("failed to get program accounts: {0}")] @@ -214,7 +47,12 @@ pub fn extract_batch_publish_prices( .map_err(HandleBatchPublishError::GetProgramAccounts)?; let mut all_prices = HashMap::>::new(); + let mut num_found_accounts = 0; + let mut num_found_prices = 0; for (account_key, account) in publisher_prices_accounts { + if !publisher_prices_account::format_matches(account.data()) { + continue; + } let (header, prices) = match publisher_prices_account::read(account.data()) { Ok(r) => r, Err(err) => { @@ -222,6 +60,7 @@ pub fn extract_batch_publish_prices( continue; } }; + num_found_accounts += 1; if header.slot != bank.slot() { // Updates from earlier slots have already been applied. continue; @@ -238,14 +77,21 @@ pub fn extract_batch_publish_prices( .entry(price.feed_index()) .or_default() .push(value); + num_found_prices += 1; } } + info!( + "pyth batch publish: found {} prices in {} accounts at slot {}", + num_found_prices, + num_found_accounts, + bank.slot() + ); Ok(all_prices) } pub fn apply_published_prices( price_data: &mut PriceAccount, - new_prices: &HashMap>, + new_prices: &mut HashMap>, slot: Slot, ) -> bool { if price_data.feed_index == 0 { @@ -253,10 +99,10 @@ pub fn apply_published_prices( } let mut any_update = false; for new_price in new_prices - .get(&price_data.feed_index) - .unwrap_or(&Vec::new()) + .remove(&price_data.feed_index) + .unwrap_or_default() { - match apply_published_price(price_data, new_price, slot) { + match apply_published_price(price_data, &new_price, slot) { Ok(()) => { any_update = true; } diff --git a/runtime/src/bank/pyth/tests/batch_publish_tests.rs b/runtime/src/bank/pyth/tests/batch_publish_tests.rs index 65a1a590dd..59bb3e0856 100644 --- a/runtime/src/bank/pyth/tests/batch_publish_tests.rs +++ b/runtime/src/bank/pyth/tests/batch_publish_tests.rs @@ -2,7 +2,6 @@ use { crate::{ bank::pyth::{ accumulator::{BATCH_PUBLISH_PID, ORACLE_PID}, - batch_publish::publisher_prices_account::{self, PublisherPrice}, tests::{create_new_bank_for_tests_with_index, new_from_parent}, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, @@ -11,6 +10,9 @@ use { pyth_oracle::{ solana_program::account_info::AccountInfo, PriceAccount, PriceAccountFlags, PythAccount, }, + pyth_price_publisher::accounts::publisher_prices::{ + self as publisher_prices_account, PublisherPrice, + }, solana_sdk::{ account::{AccountSharedData, ReadableAccount, WritableAccount}, clock::Epoch, @@ -40,11 +42,7 @@ fn test_batch_publish() { let publisher1_key = keypair_from_seed(seed).unwrap(); let (publisher1_prices_key, _bump) = Pubkey::find_program_address( - // TODO: real seed - &[ - b"PUBLISHER_PRICES_ACCOUNT", - &publisher1_key.pubkey().to_bytes(), - ], + &[b"BUFFER", &publisher1_key.pubkey().to_bytes()], &BATCH_PUBLISH_PID, ); let mut publisher1_prices_account = diff --git a/validator/src/main.rs b/validator/src/main.rs index a86c88b488..4103606a42 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -3214,12 +3214,15 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { ); } - if exclude_keys - && (account_indexes_exclude_keys.contains(&*ORACLE_PID) - || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) - || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID)) - { - panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); + if exclude_keys { + for key in &[&*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID] { + if account_indexes_exclude_keys.contains(key) { + panic!( + "This key must *not* be excluded from the account index: {}", + key + ); + } + } } let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) { From ee765c5d8dbbf38c2a8ea0a021cfb801e35cff03 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 29 Aug 2024 10:24:30 +0100 Subject: [PATCH 07/10] refactor: rename and move get_accumulator_keys --- core/src/validator.rs | 10 ++-------- runtime/src/bank/pyth/accumulator.rs | 20 ------------------- runtime/src/bank/pyth/mod.rs | 20 +++++++++++++++++++ .../src/bank/pyth/tests/accumulator_tests.rs | 8 +++----- 4 files changed, 25 insertions(+), 33 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index cecda8da28..ca3bafb122 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1521,14 +1521,8 @@ fn load_blockstore( } } - for (key_name, pk_res) in pyth::accumulator::get_accumulator_keys() { - match pk_res { - Ok(pk) => info!("Accumulator {}: {}", key_name, pk), - Err(err) => { - error!("Failed to get Accumulator {}: {:?}", key_name, err); - std::process::abort(); - } - } + for (key_name, pk) in pyth::get_pyth_keys() { + info!("Pyth key {}: {}", key_name, pk); } leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); diff --git a/runtime/src/bank/pyth/accumulator.rs b/runtime/src/bank/pyth/accumulator.rs index 3577b2ff59..300f94006d 100644 --- a/runtime/src/bank/pyth/accumulator.rs +++ b/runtime/src/bank/pyth/accumulator.rs @@ -130,26 +130,6 @@ fn env_pubkey_or(var: &str, default: Pubkey) -> Pubkey { } } -/// Get all accumulator related pubkeys from environment variables -/// or return default if the variable is not set. -pub fn get_accumulator_keys() -> Vec<( - &'static str, - std::result::Result, -)> { - vec![ - ("MESSAGE_BUFFER_PID", Ok(*MESSAGE_BUFFER_PID)), - ("ACCUMULATOR_EMITTER_ADDR", Ok(*ACCUMULATOR_EMITTER_ADDR)), - ("ACCUMULATOR_SEQUENCE_ADDR", Ok(*ACCUMULATOR_SEQUENCE_ADDR)), - ("WORMHOLE_PID", Ok(*WORMHOLE_PID)), - ("ORACLE_PID", Ok(*ORACLE_PID)), - ( - "STAKE_CAPS_PARAMETERS_ADDR", - Ok(*STAKE_CAPS_PARAMETERS_ADDR), - ), - ("BATCH_PUBLISH_PID", Ok(*BATCH_PUBLISH_PID)), - ] -} - pub fn update_v1( bank: &Bank, v2_messages: &[Vec], diff --git a/runtime/src/bank/pyth/mod.rs b/runtime/src/bank/pyth/mod.rs index 795e29db71..e8db1a9f75 100644 --- a/runtime/src/bank/pyth/mod.rs +++ b/runtime/src/bank/pyth/mod.rs @@ -1,5 +1,25 @@ +use accumulator::{ + ACCUMULATOR_EMITTER_ADDR, ACCUMULATOR_SEQUENCE_ADDR, BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, + ORACLE_PID, STAKE_CAPS_PARAMETERS_ADDR, WORMHOLE_PID, +}; +use solana_sdk::pubkey::Pubkey; + pub mod accumulator; mod batch_publish; #[cfg(test)] mod tests; + +/// Get all pyth related pubkeys from environment variables +/// or return default if the variable is not set. +pub fn get_pyth_keys() -> Vec<(&'static str, Pubkey)> { + vec![ + ("MESSAGE_BUFFER_PID", *MESSAGE_BUFFER_PID), + ("ACCUMULATOR_EMITTER_ADDR", *ACCUMULATOR_EMITTER_ADDR), + ("ACCUMULATOR_SEQUENCE_ADDR", *ACCUMULATOR_SEQUENCE_ADDR), + ("WORMHOLE_PID", *WORMHOLE_PID), + ("ORACLE_PID", *ORACLE_PID), + ("STAKE_CAPS_PARAMETERS_ADDR", *STAKE_CAPS_PARAMETERS_ADDR), + ("BATCH_PUBLISH_PID", *BATCH_PUBLISH_PID), + ] +} diff --git a/runtime/src/bank/pyth/tests/accumulator_tests.rs b/runtime/src/bank/pyth/tests/accumulator_tests.rs index 6ab1c05f11..70272be51b 100644 --- a/runtime/src/bank/pyth/tests/accumulator_tests.rs +++ b/runtime/src/bank/pyth/tests/accumulator_tests.rs @@ -3,9 +3,10 @@ use { bank::{ pyth::{ accumulator::{ - get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, + ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, STAKE_CAPS_PARAMETERS_ADDR, }, + get_pyth_keys, tests::{create_new_bank_for_tests_with_index, new_from_parent}, }, Bank, @@ -1105,10 +1106,7 @@ fn test_publisher_stake_caps() { #[test] fn test_get_accumulator_keys() { use pythnet_sdk::{pythnet, ACCUMULATOR_EMITTER_ADDRESS, MESSAGE_BUFFER_PID}; - let accumulator_keys: Vec = get_accumulator_keys() - .iter() - .map(|(_, pk_res)| *pk_res.as_ref().unwrap()) - .collect(); + let accumulator_keys: Vec = get_pyth_keys().iter().map(|(_, pk)| *pk).collect(); let expected_pyth_keys = vec![ Pubkey::new_from_array(MESSAGE_BUFFER_PID), Pubkey::new_from_array(ACCUMULATOR_EMITTER_ADDRESS), From 36e1c80b9dd1226c9cd06b286d70bfd4b365ef78 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 29 Aug 2024 10:39:15 +0100 Subject: [PATCH 08/10] test: make sure batch publish does not apply prices when price_index == 0 --- runtime/src/bank/pyth/accumulator.rs | 3 +++ .../src/bank/pyth/tests/batch_publish_tests.rs | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/runtime/src/bank/pyth/accumulator.rs b/runtime/src/bank/pyth/accumulator.rs index 300f94006d..ee86f9c559 100644 --- a/runtime/src/bank/pyth/accumulator.rs +++ b/runtime/src/bank/pyth/accumulator.rs @@ -422,10 +422,13 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV v2_messages.push(publisher_stake_caps_message); } + let mut measure = Measure::start("extract_batch_publish_prices"); let mut new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { warn!("extract_batch_publish_prices failed: {}", err); HashMap::new() }); + measure.stop(); + debug!("batch publish: loaded prices in {}us", measure.as_us()); let mut measure = Measure::start("update_v2_aggregate_price"); for (pubkey, mut account) in accounts { diff --git a/runtime/src/bank/pyth/tests/batch_publish_tests.rs b/runtime/src/bank/pyth/tests/batch_publish_tests.rs index 59bb3e0856..b26f547976 100644 --- a/runtime/src/bank/pyth/tests/batch_publish_tests.rs +++ b/runtime/src/bank/pyth/tests/batch_publish_tests.rs @@ -66,6 +66,13 @@ fn test_batch_publish() { &[ PublisherPrice::new(1, 1, 10, 2).unwrap(), PublisherPrice::new(2, 1, 20, 3).unwrap(), + // Attempt to publish with price_index == 0, + // but it will not be applied. + PublisherPrice { + trading_status_and_feed_index: 0, + price: 30, + confidence: 35, + }, ], ), generate_publisher( @@ -129,7 +136,7 @@ fn test_batch_publish() { generate_price(b"seeds_1", 1), generate_price(b"seeds_2", 2), generate_price(b"seeds_3", 3), - generate_price(b"seeds_4", 4), + generate_price(b"seeds_4", 0), ]; bank = new_from_parent(&Arc::new(bank)); // Advance slot 1. @@ -138,10 +145,16 @@ fn test_batch_publish() { let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap(); let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data()); assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10); + assert_eq!(new_price_feed1_data.comp_[0].latest_.conf_, 2); + assert_eq!(new_price_feed1_data.comp_[0].latest_.status_, 1); assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15); let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap(); let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data()); assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20); assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25); + + let new_price_feed4_account = bank.get_account(&prices_with_messages[3].0).unwrap(); + let new_price_feed4_data: &PriceAccount = from_bytes(new_price_feed4_account.data()); + assert_eq!(new_price_feed4_data.comp_[0].latest_.price_, 0); } From 85d59d96c6d9757bd0fc121adf70f0d11b03bcbd Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 29 Aug 2024 10:56:27 +0100 Subject: [PATCH 09/10] chore: fix format --- runtime/src/bank/pyth/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/runtime/src/bank/pyth/mod.rs b/runtime/src/bank/pyth/mod.rs index e8db1a9f75..bbc334d14b 100644 --- a/runtime/src/bank/pyth/mod.rs +++ b/runtime/src/bank/pyth/mod.rs @@ -1,8 +1,10 @@ -use accumulator::{ - ACCUMULATOR_EMITTER_ADDR, ACCUMULATOR_SEQUENCE_ADDR, BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, - ORACLE_PID, STAKE_CAPS_PARAMETERS_ADDR, WORMHOLE_PID, +use { + accumulator::{ + ACCUMULATOR_EMITTER_ADDR, ACCUMULATOR_SEQUENCE_ADDR, BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, + ORACLE_PID, STAKE_CAPS_PARAMETERS_ADDR, WORMHOLE_PID, + }, + solana_sdk::pubkey::Pubkey, }; -use solana_sdk::pubkey::Pubkey; pub mod accumulator; mod batch_publish; From 4a97a266f68ca81ce4e5b75ca39c1ed0921b7332 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 29 Aug 2024 14:09:00 +0100 Subject: [PATCH 10/10] fix: update deps --- Cargo.lock | 2 +- runtime/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a3965d287..60f548e2a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3601,7 +3601,7 @@ dependencies = [ [[package]] name = "pyth-oracle" version = "2.33.0" -source = "git+https://github.com/pyth-network/pyth-client?branch=batch-publish#f466deb73c4a15ea13cb271c3a900c5a85810c87" +source = "git+https://github.com/pyth-network/pyth-client?rev=256b57#256b575a13e29bec93ba592c8f86cc5fad521915" dependencies = [ "bindgen 0.60.1", "bitflags 2.6.0", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e7628442e1..95a0824c5c 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -35,7 +35,7 @@ num-traits = { version = "0.2" } num_cpus = "1.13.1" once_cell = "1.12.0" ouroboros = "0.15.0" -pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] } +pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "256b57", features = ["library"] } pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "33f901aa45f4f0005aa5a84a1479b78ca9033074" } pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } rand = "0.7.0"