Skip to content

Fix/reduce sync requests #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ rpc_tests = []

[dependencies]
abi_stable = "0.11"
ahash = "0.8.11"
anchor-lang = { version = "0.30", features = ["derive"] }
base64 = "0.22"
bytemuck = "1.17"
dashmap = "6"
env_logger = "0.11"
fnv = "1"
futures-util = "0.3"
log = "0.4"
rayon = { version = "1.9.0", optional = true }
Expand All @@ -49,4 +49,4 @@ hex = "0.4"
hex-literal = "0.4"

[build-dependencies]
drift-idl-gen = { version = "0.1.1", path = "crates/drift-idl-gen"}
drift-idl-gen = { version = "0.1.1", path = "crates/drift-idl-gen"}
31 changes: 16 additions & 15 deletions crates/src/account_map.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::sync::{Arc, Mutex, RwLock};

use anchor_lang::AccountDeserialize;
use fnv::FnvHashMap;
use dashmap::DashMap;
use log::debug;
use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey};

use crate::{
utils::get_ws_url, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult,
UnsubHandle,
};

const LOG_TARGET: &str = "accountmap";

#[derive(Clone, Default)]
pub struct AccountSlot {
raw: Vec<u8>,
Expand All @@ -24,7 +27,7 @@ pub struct DataAndSlot<T> {
pub struct AccountMap {
endpoint: String,
commitment: CommitmentConfig,
inner: RwLock<FnvHashMap<Pubkey, AccountSub<Subscribed>>>,
inner: DashMap<Pubkey, AccountSub<Subscribed>, ahash::RandomState>,
}

impl AccountMap {
Expand All @@ -37,26 +40,23 @@ impl AccountMap {
}
/// Subscribe user account
pub async fn subscribe_account(&self, account: &Pubkey) -> SdkResult<()> {
{
let map = self.inner.read().expect("acquired");
if map.contains_key(account) {
return Ok(());
}
if self.inner.contains_key(account) {
return Ok(());
}
debug!(target: LOG_TARGET, "subscribing: {account:?}");

let user = AccountSub::new(&self.endpoint, self.commitment, *account);
let user = user.subscribe().await?;

let mut map = self.inner.write().expect("acquired");
map.insert(*account, user);
self.inner.insert(*account, user);

Ok(())
}
/// Unsubscribe user account
pub fn unsubscribe_account(&self, account: &Pubkey) {
let mut map = self.inner.write().expect("acquired");
if let Some(u) = map.remove(account) {
let _ = u.unsubscribe();
if let Some((acc, unsub)) = self.inner.remove(account) {
debug!(target: LOG_TARGET, "unsubscribing: {acc:?}");
let _ = unsub.unsubscribe();
}
}
/// Return data of the given `account` as T, if it exists
Expand All @@ -68,8 +68,9 @@ impl AccountMap {
&self,
account: &Pubkey,
) -> Option<DataAndSlot<T>> {
let accounts = self.inner.read().expect("read");
accounts.get(account).map(|u| u.get_account_data_and_slot())
self.inner
.get(account)
.map(|u| u.get_account_data_and_slot())
}
}

Expand Down Expand Up @@ -111,7 +112,7 @@ impl AccountSub<Unsubscribed> {
let data_and_slot = Arc::new(RwLock::new(AccountSlot::default()));
let unsub = self
.subscription
.subscribe(Self::SUBSCRIPTION_ID, {
.subscribe(Self::SUBSCRIPTION_ID, true, {
let data_and_slot = Arc::clone(&data_and_slot);
move |update| {
let mut guard = data_and_slot.write().expect("acquired");
Expand Down
12 changes: 6 additions & 6 deletions crates/src/event_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
time::Duration,
};

use ahash::HashSet;
use anchor_lang::{AnchorDeserialize, Discriminator};
use base64::Engine;
use fnv::FnvHashSet;
use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt, Stream, StreamExt};
use log::{debug, info, warn};
use regex::Regex;
Expand Down Expand Up @@ -656,15 +656,15 @@ impl DriftEvent {
/// fixed capacity cache of tx signatures
struct TxSignatureCache {
capacity: usize,
entries: FnvHashSet<String>,
entries: HashSet<String>,
age: VecDeque<String>,
}

impl TxSignatureCache {
fn new(capacity: usize) -> Self {
Self {
capacity,
entries: FnvHashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
entries: HashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
age: VecDeque::with_capacity(capacity),
}
}
Expand All @@ -689,9 +689,9 @@ impl TxSignatureCache {

#[cfg(test)]
mod test {
use ahash::HashMap;
use anchor_lang::prelude::*;
use base64::Engine;
use fnv::FnvHashMap;
use futures_util::future::ready;
use solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -852,7 +852,7 @@ mod test {
async fn polled_event_stream_caching() {
let _ = env_logger::try_init();
struct MockRpcProvider {
tx_responses: FnvHashMap<String, EncodedTransactionWithStatusMeta>,
tx_responses: HashMap<String, EncodedTransactionWithStatusMeta>,
signatures: tokio::sync::Mutex<Vec<String>>,
}

Expand Down Expand Up @@ -952,7 +952,7 @@ mod test {
let signatures: Vec<String> = (0..order_events.len())
.map(|_| Signature::new_unique().to_string())
.collect();
let mut tx_responses = FnvHashMap::<String, EncodedTransactionWithStatusMeta>::default();
let mut tx_responses = HashMap::<String, EncodedTransactionWithStatusMeta>::default();
for s in signatures.iter() {
let (oar, or) = order_events.pop().unwrap();
tx_responses.insert(
Expand Down
14 changes: 5 additions & 9 deletions crates/src/jit_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
accounts::User,
build_accounts,
constants::{self, state_account, JIT_PROXY_ID},
DriftClient, MarketId, MarketType, PostOnlyParam, ReferrerInfo, SdkError, SdkResult,
drift_idl, DriftClient, MarketId, MarketType, PostOnlyParam, ReferrerInfo, SdkError, SdkResult,
TransactionBuilder, Wallet,
};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl JitProxyClient {
let program_data = tx_builder.program_data();
let account_data = tx_builder.account_data();

let writable_markets = match order.market_type {
let writable_markets = match order.market_type.into() {
MarketType::Perp => {
vec![MarketId::perp(order.market_index)]
}
Expand Down Expand Up @@ -161,18 +161,14 @@ impl JitProxyClient {
accounts.push(AccountMeta::new(referrer_info.referrer_stats(), false));
}

if order.market_type == MarketType::Spot {
if order.market_type == drift_idl::types::MarketType::Spot {
let spot_market_vault = self
.drift_client
.get_spot_market_account_and_slot(order.market_index)
.expect("spot market exists")
.data
.try_get_spot_market_account(order.market_index)?
.vault;
let quote_spot_market_vault = self
.drift_client
.get_spot_market_account_and_slot(MarketId::QUOTE_SPOT.index())
.expect("quote market exists")
.data
.try_get_spot_market_account(MarketId::QUOTE_SPOT.index())?
.vault;
accounts.push(AccountMeta::new_readonly(spot_market_vault, false));
accounts.push(AccountMeta::new_readonly(quote_spot_market_vault, false));
Expand Down
Loading