Skip to content

Commit 8952176

Browse files
committed
refactor: use get_program_accounts instead of mapping account to poll
1 parent 557a79e commit 8952176

File tree

2 files changed

+106
-76
lines changed

2 files changed

+106
-76
lines changed

src/agent/services/oracle.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ where
6060
config.clone(),
6161
network,
6262
state.clone(),
63-
key_store.mapping_key,
63+
key_store.pyth_oracle_program_key,
6464
key_store.publish_keypair,
6565
key_store.pyth_price_store_program_key,
6666
config.oracle.max_lookup_batch_size,
@@ -152,13 +152,13 @@ where
152152
Ok(())
153153
}
154154

155-
/// On poll lookup all Pyth Mapping/Product/Price accounts and sync.
155+
/// On poll lookup all Pyth Product/Price accounts and sync.
156156
#[instrument(skip(config, publish_keypair, state))]
157157
async fn poller<S>(
158158
config: Config,
159159
network: Network,
160160
state: Arc<S>,
161-
mapping_key: Pubkey,
161+
oracle_program_key: Pubkey,
162162
publish_keypair: Option<Keypair>,
163163
pyth_price_store_program_key: Option<Pubkey>,
164164
max_lookup_batch_size: usize,
@@ -183,7 +183,7 @@ async fn poller<S>(
183183
Oracle::poll_updates(
184184
&*state,
185185
network,
186-
mapping_key,
186+
oracle_program_key,
187187
publish_keypair.as_ref(),
188188
pyth_price_store_program_key,
189189
&client,

src/agent/state/oracle.rs

Lines changed: 102 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ use {
2020
},
2121
pyth_price_store::instruction::PUBLISHER_CONFIG_SEED,
2222
pyth_sdk_solana::state::{
23-
load_mapping_account,
2423
load_product_account,
2524
GenericPriceAccount,
26-
MappingAccount,
2725
PriceComp,
2826
PythnetPriceAccount,
2927
SolanaPriceAccount,
@@ -132,7 +130,6 @@ impl std::ops::Deref for PriceEntry {
132130

133131
#[derive(Default, Debug, Clone)]
134132
pub struct Data {
135-
pub mapping_accounts: HashMap<Pubkey, MappingAccount>,
136133
pub product_accounts: HashMap<Pubkey, ProductEntry>,
137134
pub price_accounts: HashMap<Pubkey, PriceEntry>,
138135
/// publisher => {their permissioned price accounts => price publishing metadata}
@@ -194,7 +191,7 @@ pub trait Oracle {
194191
async fn poll_updates(
195192
&self,
196193
network: Network,
197-
mapping_key: Pubkey,
194+
oracle_program_key: Pubkey,
198195
publish_keypair: Option<&Keypair>,
199196
pyth_price_store_program_key: Option<Pubkey>,
200197
rpc_client: &RpcClient,
@@ -269,20 +266,16 @@ where
269266
async fn poll_updates(
270267
&self,
271268
network: Network,
272-
mapping_key: Pubkey,
269+
oracle_program_key: Pubkey,
273270
publish_keypair: Option<&Keypair>,
274271
pyth_price_store_program_key: Option<Pubkey>,
275272
rpc_client: &RpcClient,
276273
max_lookup_batch_size: usize,
277274
) -> Result<()> {
278275
let mut publisher_permissions = HashMap::new();
279-
let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?;
280-
let (product_accounts, price_accounts) = fetch_product_and_price_accounts(
281-
rpc_client,
282-
max_lookup_batch_size,
283-
mapping_accounts.values(),
284-
)
285-
.await?;
276+
let (product_accounts, price_accounts) =
277+
fetch_product_and_price_accounts(rpc_client, oracle_program_key, max_lookup_batch_size)
278+
.await?;
286279

287280
for (price_key, price_entry) in price_accounts.iter() {
288281
for component in price_entry.comp {
@@ -337,7 +330,6 @@ where
337330
}
338331

339332
let new_data = Data {
340-
mapping_accounts,
341333
product_accounts,
342334
price_accounts,
343335
publisher_permissions,
@@ -412,57 +404,109 @@ async fn fetch_publisher_buffer_key(
412404
}
413405

414406
#[instrument(skip(rpc_client))]
415-
async fn fetch_mapping_accounts(
416-
rpc_client: &RpcClient,
417-
mapping_account_key: Pubkey,
418-
) -> Result<HashMap<Pubkey, MappingAccount>> {
419-
let mut accounts = HashMap::new();
420-
let mut account_key = mapping_account_key;
421-
while account_key != Pubkey::default() {
422-
let account = *load_mapping_account(
423-
&rpc_client
424-
.get_account_data(&account_key)
425-
.await
426-
.with_context(|| format!("load mapping account {}", account_key))?,
427-
)?;
428-
accounts.insert(account_key, account);
429-
account_key = account.next;
430-
}
431-
Ok(accounts)
432-
}
433-
434-
#[instrument(skip(rpc_client, mapping_accounts))]
435-
async fn fetch_product_and_price_accounts<'a, A>(
407+
async fn fetch_product_and_price_accounts(
436408
rpc_client: &RpcClient,
409+
oracle_program_key: Pubkey,
437410
max_lookup_batch_size: usize,
438-
mapping_accounts: A,
439-
) -> Result<(HashMap<Pubkey, ProductEntry>, HashMap<Pubkey, PriceEntry>)>
440-
where
441-
A: IntoIterator<Item = &'a MappingAccount>,
442-
{
443-
let mut product_keys = vec![];
444-
445-
// Get all product keys
446-
for mapping_account in mapping_accounts {
447-
for account_key in mapping_account
448-
.products
449-
.iter()
450-
.filter(|pubkey| **pubkey != Pubkey::default())
451-
{
452-
product_keys.push(*account_key);
453-
}
454-
}
455-
411+
) -> Result<(HashMap<Pubkey, ProductEntry>, HashMap<Pubkey, PriceEntry>)> {
456412
let mut product_entries = HashMap::new();
457413
let mut price_entries = HashMap::new();
458414

459-
// Lookup products and their prices using the configured batch size
460-
for product_key_batch in product_keys.as_slice().chunks(max_lookup_batch_size) {
461-
let (mut batch_products, mut batch_prices) =
462-
fetch_batch_of_product_and_price_accounts(rpc_client, product_key_batch).await?;
415+
let oracle_accounts = rpc_client.get_program_accounts(&oracle_program_key).await?;
416+
417+
// Go over all the product accounts and partially fill the product entires. The product
418+
// entires need to have prices inside them which gets filled by going over all the
419+
// price accounts.
420+
for (product_key, product) in oracle_accounts.iter().filter_map(|(pubkey, account)| {
421+
load_product_account(&account.data)
422+
.ok()
423+
.map(|product| (pubkey, product))
424+
}) {
425+
#[allow(deprecated)]
426+
let legacy_schedule: LegacySchedule = if let Some((_wsched_key, wsched_val)) =
427+
product.iter().find(|(k, _v)| *k == "weekly_schedule")
428+
{
429+
wsched_val.parse().unwrap_or_else(|err| {
430+
tracing::warn!(
431+
product_key = product_key.to_string(),
432+
weekly_schedule = wsched_val,
433+
"Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.",
434+
);
435+
tracing::debug!(err = ?err, "Parsing error context.");
436+
Default::default()
437+
})
438+
} else {
439+
Default::default() // No market hours specified, meaning 24/7 publishing
440+
};
441+
442+
let market_schedule: Option<MarketSchedule> = if let Some((_msched_key, msched_val)) =
443+
product.iter().find(|(k, _v)| *k == "schedule")
444+
{
445+
match msched_val.parse::<MarketSchedule>() {
446+
Ok(schedule) => Some(schedule),
447+
Err(err) => {
448+
tracing::warn!(
449+
product_key = product_key.to_string(),
450+
schedule = msched_val,
451+
"Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule.",
452+
);
453+
tracing::debug!(err = ?err, "Parsing error context.");
454+
None
455+
}
456+
}
457+
} else {
458+
None
459+
};
460+
461+
let publish_interval: Option<Duration> = if let Some((
462+
_publish_interval_key,
463+
publish_interval_val,
464+
)) =
465+
product.iter().find(|(k, _v)| *k == "publish_interval")
466+
{
467+
match publish_interval_val.parse::<f64>() {
468+
Ok(interval) => Some(Duration::from_secs_f64(interval)),
469+
Err(err) => {
470+
tracing::warn!(
471+
product_key = product_key.to_string(),
472+
publish_interval = publish_interval_val,
473+
"Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None.",
474+
);
475+
tracing::debug!(err = ?err, "parsing error context");
476+
None
477+
}
478+
}
479+
} else {
480+
None
481+
};
463482

464-
product_entries.extend(batch_products.drain());
465-
price_entries.extend(batch_prices.drain());
483+
product_entries.insert(
484+
*product_key,
485+
ProductEntry {
486+
account_data: *product,
487+
schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()),
488+
price_accounts: vec![],
489+
publish_interval,
490+
},
491+
);
492+
}
493+
494+
// Load the price accounts into price entry and also fill the product entires
495+
for (price_key, price) in oracle_accounts.iter().filter_map(|(pubkey, account)| {
496+
PriceEntry::load_from_account(&account.data).map(|product| (pubkey, product))
497+
}) {
498+
if let Some(prod) = product_entries.get_mut(&price.prod) {
499+
prod.price_accounts.push(*price_key);
500+
price_entries.insert(*price_key, price);
501+
} else {
502+
tracing::warn!(
503+
missing_product = price.prod.to_string(),
504+
price_key = price_key.to_string(),
505+
"Could not find product entry for price, listed in its prod field, skipping",
506+
);
507+
508+
continue;
509+
}
466510
}
467511

468512
Ok((product_entries, price_entries))
@@ -625,20 +669,6 @@ async fn fetch_batch_of_product_and_price_accounts(
625669
#[instrument(skip(data, new_data))]
626670
fn log_data_diff(data: &Data, new_data: &Data) {
627671
// Log new accounts which have been found
628-
let previous_mapping_accounts = data
629-
.mapping_accounts
630-
.keys()
631-
.cloned()
632-
.collect::<HashSet<_>>();
633-
tracing::info!(
634-
new = ?new_data
635-
.mapping_accounts
636-
.keys()
637-
.cloned()
638-
.collect::<HashSet<_>>().difference(&previous_mapping_accounts),
639-
total = data.mapping_accounts.len(),
640-
"Fetched mapping accounts."
641-
);
642672
let previous_product_accounts = data
643673
.product_accounts
644674
.keys()

0 commit comments

Comments
 (0)