Skip to content

OracleMap: handle updating multiple oracles on one subscription #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,6 @@ impl DriftClientBackend {
ffi::get_oracle_price(oracle_source, &mut (oracle, account_data.clone()), slot)?;

Ok(Oracle {
market,
pubkey: oracle,
source: oracle_source,
slot,
Expand Down
171 changes: 116 additions & 55 deletions crates/src/oraclemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,30 @@ use crate::{

const LOG_TARGET: &str = "oraclemap";

#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
/// Captures shared relationship between oracles and markets/source types
enum OracleShareMode {
/// Oracle is unshared
Isolated {
market: MarketId,
source: OracleSource,
},
/// Oracle is shared by two markets
Dual {
spot: MarketId,
perp: MarketId,
source: OracleSource,
},
/// Oracle is shared by two markets with mixed sources
DualMixed {
spot: (MarketId, OracleSource),
perp: (MarketId, OracleSource),
},
}

#[derive(Clone, Default, Debug)]
pub struct Oracle {
pub market: MarketId,
pub pubkey: Pubkey,
pub data: OraclePriceData,
pub source: OracleSource,
Expand All @@ -38,12 +59,14 @@ pub struct Oracle {
/// Caller can subscribe to some subset of markets for Ws backed updates
/// Alternatively, the caller may drive the map by calling `sync` periodically
pub struct OracleMap {
/// Oracle data keyed by pubkey
/// Oracle data keyed by pubkey and source
pub oraclemap: Arc<DashMap<(Pubkey, u8), Oracle, ahash::RandomState>>,
/// Oracle subscription handles by pubkey
subscriptions: DashMap<Pubkey, UnsubHandle, ahash::RandomState>,
/// Oracle (pubkey, source) by MarketId (immutable)
pub oracle_by_market: ReadOnlyView<MarketId, (Pubkey, OracleSource)>,
pub oracle_by_market: ReadOnlyView<MarketId, (Pubkey, OracleSource), ahash::RandomState>,
/// map from oracle to consuming markets/source types
shared_oracles: ReadOnlyView<Pubkey, OracleShareMode, ahash::RandomState>,
latest_slot: Arc<AtomicU64>,
commitment: CommitmentConfig,
pubsub: Arc<PubsubClient>,
Expand All @@ -64,30 +87,56 @@ impl OracleMap {
commitment: CommitmentConfig,
) -> Self {
log::debug!(target: LOG_TARGET, "all oracles: {:?}", all_oracles);
let oraclemap = all_oracles
.iter()
.copied()
.map(|(market, pubkey, source)| {
(
(pubkey, source as u8),
Oracle {
market,
pubkey,
source,
..Default::default()
},
)
})
.collect();
let oracle_by_market: DashMap<MarketId, (Pubkey, OracleSource)> = all_oracles
.iter()
.copied()
.map(|(market, pubkey, source)| (market, (pubkey, source)))
.collect();
let oracle_by_market: DashMap<MarketId, (Pubkey, OracleSource), ahash::RandomState> =
all_oracles
.iter()
.copied()
.map(|(market, pubkey, source)| (market, (pubkey, source)))
.collect();
let oracle_by_market = oracle_by_market.into_read_only();

let shared_oracles = DashMap::<Pubkey, OracleShareMode, ahash::RandomState>::default();
for (market, (pubkey, source)) in oracle_by_market.iter() {
shared_oracles
.entry(*pubkey)
.and_modify(|m| {
match m {
OracleShareMode::Isolated {
market: isolated_market,
source: isolated_source,
} => {
let (spot, perp) = if isolated_market.is_spot() {
((*isolated_market, *isolated_source), (*market, *source))
} else {
((*market, *source), (*isolated_market, *isolated_source))
};

if source == isolated_source {
*m = OracleShareMode::Dual {
spot: spot.0,
perp: perp.0,
source: *source,
};
} else {
*m = OracleShareMode::DualMixed { spot, perp };
}
}
_ => {
// the oracle is being used by more than 2 markets
panic!("detected unhandled shared oracle mode");
}
}
})
.or_insert(OracleShareMode::Isolated {
market: *market,
source: *source,
});
}

Self {
oraclemap: Arc::new(oraclemap),
oracle_by_market: oracle_by_market.into_read_only(),
oraclemap: Arc::default(),
shared_oracles: shared_oracles.into_read_only(),
oracle_by_market,
subscriptions: Default::default(),
latest_slot: Arc::new(AtomicU64::new(0)),
commitment,
Expand All @@ -107,21 +156,17 @@ impl OracleMap {
log::debug!(target: LOG_TARGET, "subscribe market oracles: {markets:?}");

let mut pending_subscriptions =
Vec::<(WebsocketAccountSubscriber, Oracle)>::with_capacity(markets.len());
Vec::<WebsocketAccountSubscriber>::with_capacity(markets.len());

for market in markets {
let (oracle_pubkey, oracle_source) =
let (oracle_pubkey, _oracle_source) =
self.oracle_by_market.get(market).expect("oracle exists");
let oracle_info = self
.oraclemap
.get(&(*oracle_pubkey, *oracle_source as u8))
.expect("oracle exists"); // caller did not supply in `OracleMap::new()`

// markets can share oracle pubkeys, only want one sub per oracle pubkey
if self.subscriptions.contains_key(oracle_pubkey)
|| pending_subscriptions
.iter()
.any(|(_, o)| &o.pubkey == oracle_pubkey)
.any(|sub| &sub.pubkey == oracle_pubkey)
{
log::debug!(target: LOG_TARGET, "subscription exists: {market:?}/{oracle_pubkey:?}");
continue;
Expand All @@ -133,28 +178,46 @@ impl OracleMap {
self.commitment,
);

pending_subscriptions.push((oracle_subscriber, oracle_info.clone()));
pending_subscriptions.push(oracle_subscriber);
}

let futs_iter = pending_subscriptions.into_iter().map(|(sub_fut, info)| {
let futs_iter = pending_subscriptions.into_iter().map(|sub_fut| {
let oraclemap = Arc::clone(&self.oraclemap);
let oracle_shared_mode = self
.shared_oracles
.get(&sub_fut.pubkey)
.expect("oracle exists")
.clone();

async move {
let unsub = sub_fut
.subscribe(Self::SUBSCRIPTION_ID, true, {
// TODO:
// receive a list of all markets that share the oracle to update the data simultaneously
move |update| update_handler(update, info.market, info.source, &oraclemap)
})
.await;
(info, unsub)
let unsub =
sub_fut
.subscribe(Self::SUBSCRIPTION_ID, true, move |update| {
match oracle_shared_mode {
OracleShareMode::Isolated { market: _, source } => {
update_handler(update, source, &oraclemap)
}
OracleShareMode::Dual {
spot: _,
perp: _,
source,
} => update_handler(update, source, &oraclemap),
OracleShareMode::DualMixed { spot, perp } => {
update_handler(update, spot.1, &oraclemap);
update_handler(update, perp.1, &oraclemap);
}
}
})
.await;
((sub_fut.pubkey, oracle_shared_mode), unsub)
}
});

let mut subscription_futs = FuturesUnordered::from_iter(futs_iter);

while let Some((info, unsub)) = subscription_futs.next().await {
log::debug!(target: LOG_TARGET, "subscribed market oracle: {:?}", info.market);
self.subscriptions.insert(info.pubkey, unsub?);
while let Some(((pubkey, oracle_share_mode), unsub)) = subscription_futs.next().await {
log::debug!(target: LOG_TARGET, "subscribed market oracle: {oracle_share_mode:?}");
self.subscriptions.insert(pubkey, unsub?);
}

log::debug!(target: LOG_TARGET, "subscribed");
Expand All @@ -165,7 +228,7 @@ impl OracleMap {
pub fn unsubscribe(&self, markets: &[MarketId]) -> SdkResult<()> {
for market in markets {
if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) {
if let Some((_, unsub)) = self.subscriptions.remove(&oracle_pubkey) {
if let Some((_, unsub)) = self.subscriptions.remove(oracle_pubkey) {
let _ = unsub.send(());
self.oraclemap
.remove(&(*oracle_pubkey, *oracle_source as u8));
Expand Down Expand Up @@ -296,7 +359,6 @@ impl OracleMap {
/// Handler fn for new oracle account data
fn update_handler(
update: &AccountUpdate,
oracle_market: MarketId,
oracle_source: OracleSource,
oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>,
) {
Expand Down Expand Up @@ -324,10 +386,9 @@ fn update_handler(
o.raw.clone_from(&update.data);
})
.or_insert(Oracle {
market: oracle_market,
pubkey: oracle_pubkey,
data: price_data,
source: oracle_source,
data: price_data,
slot: update.slot,
raw: update.data.clone(),
});
Expand Down Expand Up @@ -455,7 +516,7 @@ mod tests {
let map = OracleMap::new(pubsub, &all_oracles, rpc.commitment());

// - dups ignored
// - makerts with same oracle pubkey, make at most 1 sub
// - markets with same oracle pubkey, make at most 1 sub
let markets = [
MarketId::perp(0),
MarketId::spot(1),
Expand Down Expand Up @@ -502,6 +563,7 @@ mod tests {

#[tokio::test]
async fn oraclemap_subscribes() {
let _ = env_logger::try_init();
let all_oracles = vec![
(
MarketId::spot(0),
Expand All @@ -516,27 +578,26 @@ mod tests {
),
(MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull),
];
let rpc = Arc::new(RpcClient::new(devnet_endpoint().into()));
let pubsub = Arc::new(
PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap())
.await
.expect("ws connects"),
);
let map = OracleMap::new(pubsub, &all_oracles, rpc.commitment());
let map = OracleMap::new(pubsub, &all_oracles, CommitmentConfig::confirmed());

// - dups ignored
// - makerts with same oracle pubkey, make at most 1 sub
// - markets with same oracle pubkey, make at most 1 sub
let markets = [
MarketId::perp(0),
MarketId::spot(1),
MarketId::perp(1),
MarketId::spot(1),
];
map.subscribe(&markets).await.expect("subd");
assert_eq!(map.len(), 3);
assert_eq!(map.len(), 2);
let markets = [MarketId::perp(0), MarketId::spot(1)];
map.subscribe(&markets).await.expect("subd");
assert_eq!(map.len(), 3);
assert_eq!(map.len(), 2);

assert!(map.is_subscribed(&MarketId::perp(0)));
assert!(map.is_subscribed(&MarketId::perp(1)));
Expand Down
7 changes: 6 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async fn client_sync_subscribe_mainnet() {
MarketId::perp(0),
MarketId::perp(1),
MarketId::perp(2),
MarketId::perp(4),
MarketId::spot(32),
];
tokio::try_join!(
client.subscribe_markets(&markets),
Expand All @@ -90,7 +92,10 @@ async fn client_sync_subscribe_mainnet() {
let price = client.oracle_price(MarketId::perp(1)).await.expect("ok");
assert!(price > 0);
dbg!(price);
let price = client.oracle_price(MarketId::spot(2)).await.expect("ok");
let price = client.oracle_price(MarketId::perp(4)).await.expect("ok");
assert!(price > 0);
dbg!(price);
let price = client.oracle_price(MarketId::spot(32)).await.expect("ok");
assert!(price > 0);
dbg!(price);
}
Expand Down