diff --git a/Cargo.toml b/Cargo.toml index 0a11c0f..4e8f930 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ unsafe_pub = [] abi_stable = { version = "0.11", default-features = false } ahash = "0.8.11" anchor-lang = { version = "0.31", features = ["derive"] } +arrayvec = "0.7.6" base64 = "0.22" bytemuck = "1.17" dashmap = "6" @@ -50,7 +51,6 @@ tokio-stream = "0.1.17" tokio-tungstenite = { version = "0.26", features = ["native-tls"] } drift-pubsub-client = { version = "0.1.1", path = "crates/pubsub-client" } -arrayvec = "0.7.6" [dev-dependencies] bytes = "1" diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 3c55c39..bbb4cfb 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -41,7 +41,7 @@ pub struct OracleMap { /// Oracle data keyed by pubkey pub oraclemap: Arc>, /// Oracle subscription handles by pubkey - subscriptions: DashMap<(Pubkey, u8), UnsubHandle, ahash::RandomState>, + subscriptions: DashMap, /// Oracle (pubkey, source) by MarketId (immutable) pub oracle_by_market: ReadOnlyView, latest_slot: Arc, @@ -118,12 +118,10 @@ impl OracleMap { .expect("oracle exists"); // caller did not supply in `OracleMap::new()` // markets can share oracle pubkeys, only want one sub per oracle pubkey - if self - .subscriptions - .contains_key(&(*oracle_pubkey, *oracle_source as u8)) + if self.subscriptions.contains_key(oracle_pubkey) || pending_subscriptions .iter() - .any(|(_, o)| &o.pubkey == oracle_pubkey && o.source == *oracle_source) + .any(|(_, o)| &o.pubkey == oracle_pubkey) { log::debug!(target: LOG_TARGET, "subscription exists: {market:?}/{oracle_pubkey:?}"); continue; @@ -156,8 +154,7 @@ impl OracleMap { while let Some((info, unsub)) = subscription_futs.next().await { log::debug!(target: LOG_TARGET, "subscribed market oracle: {:?}", info.market); - self.subscriptions - .insert((info.pubkey, info.source as u8), unsub?); + self.subscriptions.insert(info.pubkey, unsub?); } log::debug!(target: LOG_TARGET, "subscribed"); @@ -168,12 +165,10 @@ impl OracleMap { pub fn unsubscribe(&self, markets: &[MarketId]) -> SdkResult<()> { for market in markets { if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) { - if let Some((market, unsub)) = self - .subscriptions - .remove(&(*oracle_pubkey, *oracle_source as u8)) - { + if let Some((_, unsub)) = self.subscriptions.remove(&oracle_pubkey) { let _ = unsub.send(()); - self.oraclemap.remove(&market); + self.oraclemap + .remove(&(*oracle_pubkey, *oracle_source as u8)); } } } @@ -184,11 +179,7 @@ impl OracleMap { /// Unsubscribe from all oracle updates pub fn unsubscribe_all(&self) -> SdkResult<()> { - let all_markets: Vec = self - .subscriptions - .iter() - .filter_map(|s| self.oraclemap.get(s.key()).map(|o| o.market)) - .collect(); + let all_markets: Vec = self.oracle_by_market.keys().copied().collect(); self.unsubscribe(&all_markets) } @@ -258,9 +249,8 @@ impl OracleMap { /// Returns true if the oraclemap has a subscription for `market` pub fn is_subscribed(&self, market: &MarketId) -> bool { - if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) { - self.subscriptions - .contains_key(&(*oracle_pubkey, *oracle_source as u8)) + if let Some((oracle_pubkey, _oracle_source)) = self.oracle_by_market.get(market) { + self.subscriptions.contains_key(oracle_pubkey) } else { false } @@ -343,7 +333,7 @@ fn update_handler( }); } Err(err) => { - log::error!("Failed to get oracle price: {err:?}") + log::error!("Failed to get oracle price: {err:?}, {oracle_pubkey:?}") } } } diff --git a/crates/src/websocket_account_subscriber.rs b/crates/src/websocket_account_subscriber.rs index f38dabc..261f06c 100644 --- a/crates/src/websocket_account_subscriber.rs +++ b/crates/src/websocket_account_subscriber.rs @@ -135,7 +135,7 @@ impl WebsocketAccountSubscriber { } } None => { - log::error!(target: LOG_TARGET, "{subscription_name}: Ws ended unexpectedly"); + log::error!(target: LOG_TARGET, "{subscription_name}: Ws ended unexpectedly: {pubkey:?}"); break Err(()); } }