Skip to content

add perp/spot market update notifcations #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions crates/src/account_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubke

use crate::{
grpc::AccountUpdate, polled_account_subscriber::PolledAccountSubscriber, types::DataAndSlot,
websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, UnsubHandle,
websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult,
UnsubHandle,
};

const LOG_TARGET: &str = "accountmap";
Expand Down Expand Up @@ -54,17 +55,29 @@ impl AccountMap {
/// * `account` pubkey to subscribe
///
pub async fn subscribe_account(&self, account: &Pubkey) -> SdkResult<()> {
self.subscribe_account_with_callback(account, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_account_with_callback<F>(
&self,
account: &Pubkey,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
if self.inner.contains_key(account) {
return Ok(());
}
debug!(target: LOG_TARGET, "subscribing: {account:?}");

let user = AccountSub::new(Arc::clone(&self.pubsub), self.commitment, *account);
let sub = user.subscribe(Arc::clone(&self.inner)).await?;
let sub = user.subscribe(Arc::clone(&self.inner), on_account).await?;
self.subscriptions.insert(*account, sub);

Ok(())
}

/// Subscribe account with RPC polling
///
/// * `account` pubkey to subscribe
Expand All @@ -75,17 +88,31 @@ impl AccountMap {
account: &Pubkey,
interval: Option<Duration>,
) -> SdkResult<()> {
self.subscribe_account_polled_with_callback(account, interval, None::<fn(&crate::AccountUpdate)>)
.await
}

pub async fn subscribe_account_polled_with_callback<F>(
&self,
account: &Pubkey,
interval: Option<Duration>,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
if self.inner.contains_key(account) {
return Ok(());
}
debug!(target: LOG_TARGET, "subscribing: {account:?} @ {interval:?}");

let user = AccountSub::polled(Arc::clone(&self.rpc), *account, interval);
let sub = user.subscribe(Arc::clone(&self.inner)).await?;
let sub = user.subscribe(Arc::clone(&self.inner), on_account).await?;
self.subscriptions.insert(*account, sub);

Ok(())
}

/// On account update callback for gRPC hook
pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) {
let accounts = Arc::clone(&self.inner);
Expand Down Expand Up @@ -185,10 +212,15 @@ impl AccountSub<Unsubscribed> {
}

/// Start the subscriber task
pub async fn subscribe(
pub async fn subscribe<F>(
self,
accounts: Arc<DashMap<Pubkey, AccountSlot, ahash::RandomState>>,
) -> SdkResult<AccountSub<Subscribed>> {
on_account: Option<F>,
) -> SdkResult<AccountSub<Subscribed>>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let on_account = on_account.clone();
let unsub = match self.subscription {
SubscriptionImpl::Ws(ref ws) => {
let unsub = ws
Expand All @@ -206,6 +238,10 @@ impl AccountSub<Unsubscribed> {
raw: update.data.clone(),
slot: update.slot,
});

if let Some(on_account) = &on_account {
on_account(&update);
}
})
.await?;
Some(unsub)
Expand All @@ -225,6 +261,10 @@ impl AccountSub<Unsubscribed> {
raw: update.data.clone(),
slot: update.slot,
});

if let Some(on_account) = &on_account {
on_account(update);
}
});
Some(unsub)
}
Expand Down
13 changes: 10 additions & 3 deletions crates/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct GrpcSubscribeOpts {
/// callback for slot updates
pub on_slot: Option<Box<OnSlotFn>>,
/// custom callback for account updates
pub on_account: Option<(AccountFilter, Box<OnAccountFn>)>,
pub on_account: Option<Vec<(AccountFilter, Box<OnAccountFn>)>>,
/// Network level connection config
pub connection_opts: GrpcConnectionOpts,
/// Enable inter-slot update notifications
Expand Down Expand Up @@ -130,9 +130,16 @@ impl GrpcSubscribeOpts {
pub fn on_account(
mut self,
filter: AccountFilter,
on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static,
callback: impl Fn(&AccountUpdate) + Send + Sync + 'static,
) -> Self {
self.on_account = Some((filter, Box::new(on_account)));
match &mut self.on_account {
Some(on_account) => {
on_account.push((filter, Box::new(callback)));
}
None => {
self.on_account = Some(vec![(filter, Box::new(callback))]);
}
}
self
}
/// Set network level connection opts
Expand Down
139 changes: 123 additions & 16 deletions crates/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream},
types::{
accounts::{PerpMarket, SpotMarket, State, User, UserStats},
DataAndSlot, MarketType, *,
DataAndSlot, MarketType, AccountUpdate, *,
},
utils::{get_http_url, get_ws_url},
};
Expand All @@ -71,6 +71,9 @@ pub mod types;
pub mod grpc;
pub mod polled_account_subscriber;
pub mod websocket_account_subscriber;

#[cfg(test)]
mod generic_callback_test;
pub mod websocket_program_account_subscriber;

// subscribers
Expand Down Expand Up @@ -157,31 +160,75 @@ impl DriftClient {
///
/// This is a no-op if already subscribed
pub async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> {
self.backend.subscribe_markets(markets).await
self.backend.subscribe_markets(markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_markets_with_callback<F>(
&self,
markets: &[MarketId],
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
self.backend.subscribe_markets(markets, on_account).await
}

/// Subscribe to all spot and perp markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_markets(&self) -> SdkResult<()> {
let markets = self.get_all_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_markets_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_market_ids();
self.backend.subscribe_markets(&markets, on_account).await
}

/// Subscribe to all spot markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_spot_markets(&self) -> SdkResult<()> {
let markets = self.get_all_spot_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_spot_markets_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_spot_market_ids();
self.backend.subscribe_markets(&markets, on_account).await
}

/// Subscribe to all perp markets
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_perp_markets(&self) -> SdkResult<()> {
let markets = self.get_all_perp_market_ids();
self.backend.subscribe_markets(&markets).await
self.backend.subscribe_markets(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_perp_markets_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_perp_market_ids();
self.backend.subscribe_markets(&markets, on_account).await
}

/// Starts background subscriptions for live oracle account updates by market
Expand All @@ -190,31 +237,75 @@ impl DriftClient {
///
/// This is a no-op if already subscribed
pub async fn subscribe_oracles(&self, markets: &[MarketId]) -> SdkResult<()> {
self.backend.subscribe_oracles(markets).await
self.backend.subscribe_oracles(markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_oracles_with_callback<F>(
&self,
markets: &[MarketId],
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
self.backend.subscribe_oracles(markets, on_account).await
}

/// Subscribe to all oracles
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_oracles(&self) -> SdkResult<()> {
let markets = self.get_all_market_ids();
self.backend.subscribe_oracles(&markets).await
self.backend.subscribe_oracles(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_oracles_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_market_ids();
self.backend.subscribe_oracles(&markets, on_account).await
}

/// Subscribe to all spot market oracles
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_spot_oracles(&self) -> SdkResult<()> {
let markets = self.get_all_spot_market_ids();
self.backend.subscribe_oracles(&markets).await
self.backend.subscribe_oracles(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_spot_oracles_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_spot_market_ids();
self.backend.subscribe_oracles(&markets, on_account).await
}

/// Subscribe to all perp market oracles
///
/// This is a no-op if already subscribed
pub async fn subscribe_all_perp_oracles(&self) -> SdkResult<()> {
let markets = self.get_all_perp_market_ids();
self.backend.subscribe_oracles(&markets).await
self.backend.subscribe_oracles(&markets, None::<fn(&crate::AccountUpdate)>).await
}

pub async fn subscribe_all_perp_oracles_with_callback<F>(
&self,
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
let markets = self.get_all_perp_market_ids();
self.backend.subscribe_oracles(&markets, on_account).await
}

/// Subscribe to swift order feed(s) for given `markets`
Expand Down Expand Up @@ -1013,7 +1104,14 @@ impl DriftClientBackend {
}

/// Start subscriptions for market accounts
async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> {
async fn subscribe_markets<F>(
&self,
markets: &[MarketId],
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
if self.is_grpc_subscribed() {
log::info!("already subscribed markets via gRPC");
return Err(SdkError::AlreadySubscribed);
Expand All @@ -1023,21 +1121,28 @@ impl DriftClientBackend {
.iter()
.partition::<Vec<MarketId>, _>(|x| x.is_perp());
let _ = tokio::try_join!(
self.perp_market_map.subscribe(&perps),
self.spot_market_map.subscribe(&spot),
self.perp_market_map.subscribe(&perps, on_account.clone()),
self.spot_market_map.subscribe(&spot, on_account),
)?;

Ok(())
}

/// Start subscriptions for market oracle accounts
async fn subscribe_oracles(&self, markets: &[MarketId]) -> SdkResult<()> {
async fn subscribe_oracles<F>(
&self,
markets: &[MarketId],
on_account: Option<F>,
) -> SdkResult<()>
where
F: Fn(&crate::AccountUpdate) + Send + Sync + 'static + Clone,
{
if self.is_grpc_subscribed() {
log::info!("already subscribed oracles via gRPC");
return Err(SdkError::AlreadySubscribed);
}

self.oracle_map.subscribe(markets).await
self.oracle_map.subscribe(markets, on_account).await
}

/// Subscribe to all: markets, oracles, and slot updates over gRPC
Expand Down Expand Up @@ -1110,8 +1215,10 @@ impl DriftClientBackend {
}

// set custom callbacks
if let Some((filter, on_account)) = opts.on_account {
grpc.on_account(filter, on_account);
if let Some(callbacks) = opts.on_account {
for (filter, on_account) in callbacks {
grpc.on_account(filter, on_account)
}
}
if let Some(f) = opts.on_slot {
grpc.on_slot(f);
Expand Down
Loading
Loading