From 705fd2d5372be2c480bf7dcc94cd6affea4d25cd Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Tue, 15 Apr 2025 10:08:21 +0900 Subject: [PATCH 1/3] OracleMap: handle updating multiple oracles on one subscription --- crates/src/lib.rs | 1 - crates/src/oraclemap.rs | 158 +++++++++++++++++++++++++++------------- 2 files changed, 109 insertions(+), 50 deletions(-) diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 19c4b9b..06ad4d8 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -1061,7 +1061,6 @@ impl DriftClientBackend { ffi::get_oracle_price(oracle_source, &mut (oracle, account_data.clone()), slot)?; Ok(Oracle { - market, pubkey: oracle, source: oracle_source, slot, diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index bbb4cfb..3d22d06 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -23,9 +23,29 @@ use crate::{ const LOG_TARGET: &str = "oraclemap"; +#[derive(Copy, Clone, Debug)] +/// Captures shared relationship between oracles and markets/source types +enum OracleShareMode { + /// Oracle is unshared + Isolated { + market: MarketId, + source: OracleSource, + }, + /// Oracle is shared by two markets + Dual { + spot: MarketId, + perp: MarketId, + source: OracleSource, + }, + /// Oracle is shared by two markets with mixed sources + DualMixed { + spot: (MarketId, OracleSource), + perp: (MarketId, OracleSource), + }, +} + #[derive(Clone, Default, Debug)] pub struct Oracle { - pub market: MarketId, pub pubkey: Pubkey, pub data: OraclePriceData, pub source: OracleSource, @@ -38,12 +58,14 @@ pub struct Oracle { /// Caller can subscribe to some subset of markets for Ws backed updates /// Alternatively, the caller may drive the map by calling `sync` periodically pub struct OracleMap { - /// Oracle data keyed by pubkey + /// Oracle data keyed by pubkey and source pub oraclemap: Arc>, /// Oracle subscription handles by pubkey subscriptions: DashMap, /// Oracle (pubkey, source) by MarketId (immutable) - pub oracle_by_market: ReadOnlyView, + pub oracle_by_market: ReadOnlyView, + /// map from oracle to consuming markets/source types + shared_oracles: ReadOnlyView, latest_slot: Arc, commitment: CommitmentConfig, pubsub: Arc, @@ -64,30 +86,56 @@ impl OracleMap { commitment: CommitmentConfig, ) -> Self { log::debug!(target: LOG_TARGET, "all oracles: {:?}", all_oracles); - let oraclemap = all_oracles - .iter() - .copied() - .map(|(market, pubkey, source)| { - ( - (pubkey, source as u8), - Oracle { - market, - pubkey, - source, - ..Default::default() - }, - ) - }) - .collect(); - let oracle_by_market: DashMap = all_oracles - .iter() - .copied() - .map(|(market, pubkey, source)| (market, (pubkey, source))) - .collect(); + let oracle_by_market: DashMap = + all_oracles + .iter() + .copied() + .map(|(market, pubkey, source)| (market, (pubkey, source))) + .collect(); + let oracle_by_market = oracle_by_market.into_read_only(); + + let shared_oracles = DashMap::::default(); + for (market, (pubkey, source)) in oracle_by_market.iter() { + shared_oracles + .entry(*pubkey) + .and_modify(|m| { + match m { + OracleShareMode::Isolated { + market: isolated_market, + source: isolated_source, + } => { + let (spot, perp) = if isolated_market.is_spot() { + ((*isolated_market, *isolated_source), (*market, *source)) + } else { + ((*market, *source), (*isolated_market, *isolated_source)) + }; + + if source == isolated_source { + *m = OracleShareMode::Dual { + spot: spot.0, + perp: perp.0, + source: *source, + }; + } else { + *m = OracleShareMode::DualMixed { spot, perp }; + } + } + _ => { + // the oracle is being used by more than 2 markets + panic!("detected unhandled shared oracle mode"); + } + } + }) + .or_insert(OracleShareMode::Isolated { + market: *market, + source: *source, + }); + } Self { - oraclemap: Arc::new(oraclemap), - oracle_by_market: oracle_by_market.into_read_only(), + oraclemap: Arc::default(), + shared_oracles: shared_oracles.into_read_only(), + oracle_by_market, subscriptions: Default::default(), latest_slot: Arc::new(AtomicU64::new(0)), commitment, @@ -107,21 +155,17 @@ impl OracleMap { log::debug!(target: LOG_TARGET, "subscribe market oracles: {markets:?}"); let mut pending_subscriptions = - Vec::<(WebsocketAccountSubscriber, Oracle)>::with_capacity(markets.len()); + Vec::::with_capacity(markets.len()); for market in markets { - let (oracle_pubkey, oracle_source) = + let (oracle_pubkey, _oracle_source) = self.oracle_by_market.get(market).expect("oracle exists"); - let oracle_info = self - .oraclemap - .get(&(*oracle_pubkey, *oracle_source as u8)) - .expect("oracle exists"); // caller did not supply in `OracleMap::new()` // markets can share oracle pubkeys, only want one sub per oracle pubkey if self.subscriptions.contains_key(oracle_pubkey) || pending_subscriptions .iter() - .any(|(_, o)| &o.pubkey == oracle_pubkey) + .any(|sub| &sub.pubkey == oracle_pubkey) { log::debug!(target: LOG_TARGET, "subscription exists: {market:?}/{oracle_pubkey:?}"); continue; @@ -133,28 +177,46 @@ impl OracleMap { self.commitment, ); - pending_subscriptions.push((oracle_subscriber, oracle_info.clone())); + pending_subscriptions.push(oracle_subscriber); } - let futs_iter = pending_subscriptions.into_iter().map(|(sub_fut, info)| { + let futs_iter = pending_subscriptions.into_iter().map(|sub_fut| { let oraclemap = Arc::clone(&self.oraclemap); + let oracle_shared_mode = self + .shared_oracles + .get(&sub_fut.pubkey) + .expect("oracle exists") + .clone(); + async move { - let unsub = sub_fut - .subscribe(Self::SUBSCRIPTION_ID, true, { - // TODO: - // receive a list of all markets that share the oracle to update the data simultaneously - move |update| update_handler(update, info.market, info.source, &oraclemap) - }) - .await; - (info, unsub) + let unsub = + sub_fut + .subscribe(Self::SUBSCRIPTION_ID, true, move |update| { + match oracle_shared_mode { + OracleShareMode::Isolated { market: _, source } => { + update_handler(update, source, &oraclemap) + } + OracleShareMode::Dual { + spot: _, + perp: _, + source, + } => update_handler(update, source, &oraclemap), + OracleShareMode::DualMixed { spot, perp } => { + update_handler(update, spot.1, &oraclemap); + update_handler(update, perp.1, &oraclemap); + } + } + }) + .await; + ((sub_fut.pubkey, oracle_shared_mode), unsub) } }); let mut subscription_futs = FuturesUnordered::from_iter(futs_iter); - while let Some((info, unsub)) = subscription_futs.next().await { - log::debug!(target: LOG_TARGET, "subscribed market oracle: {:?}", info.market); - self.subscriptions.insert(info.pubkey, unsub?); + while let Some(((pubkey, oracle_share_mode), unsub)) = subscription_futs.next().await { + log::debug!(target: LOG_TARGET, "subscribed market oracle: {oracle_share_mode:?}"); + self.subscriptions.insert(pubkey, unsub?); } log::debug!(target: LOG_TARGET, "subscribed"); @@ -165,7 +227,7 @@ impl OracleMap { pub fn unsubscribe(&self, markets: &[MarketId]) -> SdkResult<()> { for market in markets { if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) { - if let Some((_, unsub)) = self.subscriptions.remove(&oracle_pubkey) { + if let Some((_, unsub)) = self.subscriptions.remove(oracle_pubkey) { let _ = unsub.send(()); self.oraclemap .remove(&(*oracle_pubkey, *oracle_source as u8)); @@ -296,7 +358,6 @@ impl OracleMap { /// Handler fn for new oracle account data fn update_handler( update: &AccountUpdate, - oracle_market: MarketId, oracle_source: OracleSource, oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>, ) { @@ -324,10 +385,9 @@ fn update_handler( o.raw.clone_from(&update.data); }) .or_insert(Oracle { - market: oracle_market, pubkey: oracle_pubkey, - data: price_data, source: oracle_source, + data: price_data, slot: update.slot, raw: update.data.clone(), }); From f2b7d76cb21087e3090c96b93df805828ff2b805 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Tue, 15 Apr 2025 10:58:29 +0900 Subject: [PATCH 2/3] fix test --- crates/src/oraclemap.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 3d22d06..0d7b258 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -23,6 +23,7 @@ use crate::{ const LOG_TARGET: &str = "oraclemap"; +#[allow(dead_code)] #[derive(Copy, Clone, Debug)] /// Captures shared relationship between oracles and markets/source types enum OracleShareMode { @@ -515,7 +516,7 @@ mod tests { let map = OracleMap::new(pubsub, &all_oracles, rpc.commitment()); // - dups ignored - // - makerts with same oracle pubkey, make at most 1 sub + // - markets with same oracle pubkey, make at most 1 sub let markets = [ MarketId::perp(0), MarketId::spot(1), @@ -562,6 +563,7 @@ mod tests { #[tokio::test] async fn oraclemap_subscribes() { + let _ = env_logger::try_init(); let all_oracles = vec![ ( MarketId::spot(0), @@ -576,16 +578,15 @@ mod tests { ), (MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull), ]; - let rpc = Arc::new(RpcClient::new(devnet_endpoint().into())); let pubsub = Arc::new( PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap()) .await .expect("ws connects"), ); - let map = OracleMap::new(pubsub, &all_oracles, rpc.commitment()); + let map = OracleMap::new(pubsub, &all_oracles, CommitmentConfig::confirmed()); // - dups ignored - // - makerts with same oracle pubkey, make at most 1 sub + // - markets with same oracle pubkey, make at most 1 sub let markets = [ MarketId::perp(0), MarketId::spot(1), @@ -593,10 +594,10 @@ mod tests { MarketId::spot(1), ]; map.subscribe(&markets).await.expect("subd"); - assert_eq!(map.len(), 3); + assert_eq!(map.len(), 2); let markets = [MarketId::perp(0), MarketId::spot(1)]; map.subscribe(&markets).await.expect("subd"); - assert_eq!(map.len(), 3); + assert_eq!(map.len(), 2); assert!(map.is_subscribed(&MarketId::perp(0))); assert!(map.is_subscribed(&MarketId::perp(1))); From 747f4b2731f1142fdf05abd538083236ec8b1ed8 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Tue, 15 Apr 2025 11:05:17 +0900 Subject: [PATCH 3/3] add bonk spot + perp sub for integraiton test --- tests/integration.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/integration.rs b/tests/integration.rs index 03f923e..b5a3659 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -80,6 +80,8 @@ async fn client_sync_subscribe_mainnet() { MarketId::perp(0), MarketId::perp(1), MarketId::perp(2), + MarketId::perp(4), + MarketId::spot(32), ]; tokio::try_join!( client.subscribe_markets(&markets), @@ -90,7 +92,10 @@ async fn client_sync_subscribe_mainnet() { let price = client.oracle_price(MarketId::perp(1)).await.expect("ok"); assert!(price > 0); dbg!(price); - let price = client.oracle_price(MarketId::spot(2)).await.expect("ok"); + let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); + assert!(price > 0); + dbg!(price); + let price = client.oracle_price(MarketId::spot(32)).await.expect("ok"); assert!(price > 0); dbg!(price); }