diff --git a/crates/src/lib.rs b/crates/src/lib.rs index f5684f2..c6ae73d 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -886,8 +886,11 @@ impl DriftClient { endpoint: String, x_token: String, opts: GrpcSubscribeOpts, + sync: bool, ) -> SdkResult<()> { - self.backend.grpc_subscribe(endpoint, x_token, opts).await + self.backend + .grpc_subscribe(endpoint, x_token, opts, sync) + .await } /// Unsubscribe the gRPC connection @@ -1043,11 +1046,35 @@ impl DriftClientBackend { endpoint: String, x_token: String, opts: GrpcSubscribeOpts, + sync: bool, ) -> SdkResult<()> { log::debug!(target: "grpc", "subscribing to grpc with config: commitment: {:?}, interslot updates: {:?}", opts.commitment, opts.interslot_updates); let mut grpc = DriftGrpcClient::new(endpoint.clone(), x_token.clone()) .grpc_connection_opts(opts.connection_opts.clone()); + if sync { + tokio::try_join!( + self.spot_market_map.sync(&self.rpc_client), + self.perp_market_map.sync(&self.rpc_client), + )?; + + let spot_markets = self + .spot_market_map + .marketmap + .iter() + .map(|i| MarketId::spot(*i.key())); + let perp_markets = self + .perp_market_map + .marketmap + .iter() + .map(|i| MarketId::perp(*i.key())); + let all_markets: Vec = spot_markets.chain(perp_markets).collect(); + + self.oracle_map + .sync(all_markets.as_ref(), &self.rpc_client) + .await?; + } + grpc.on_account( AccountFilter::partial().with_discriminator(SpotMarket::DISCRIMINATOR), self.spot_market_map.on_account_fn(), diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 476caa1..4adaa3a 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -295,6 +295,22 @@ impl OracleMap { o.raw.clone_from(&oracle_account.data); o.data = price_data; o.slot = latest_slot; + }) + .or_insert({ + let price_data = get_oracle_price( + oracle_source, + &mut (*oracle_pubkey, oracle_account.clone()), + latest_slot, + ) + .expect("valid oracle data"); + + Oracle { + pubkey: *oracle_pubkey, + data: price_data, + slot: latest_slot, + source: oracle_source, + raw: oracle_account.data.clone(), + } }); } diff --git a/tests/integration.rs b/tests/integration.rs index f2c3708..2db9f9d 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -135,24 +135,12 @@ async fn client_sync_subscribe_mainnet_grpc() { println!("account: {}", account.pubkey); let _ = user_update_tx.try_send(account.pubkey); } - ) + ), + true, ) .await .is_ok()); - // wait for updates - tokio::time::sleep(Duration::from_secs(120)).await; - - // markets available - assert!(client.try_get_perp_market_account(0).is_ok()); - assert!(client.try_get_spot_market_account(1).is_ok()); - - // slot update received - assert!(slot_update_rx.try_recv().is_ok_and(|s| s > 0)); - - // user update received - assert!(user_update_rx.try_recv().is_ok_and(|u| u != DEFAULT_PUBKEY)); - // oracle map subscribed for market in client.get_all_spot_market_ids() { let rpc_fetched_price = client.oracle_price(market).await.unwrap(); @@ -168,6 +156,33 @@ async fn client_sync_subscribe_mainnet_grpc() { } } + for market in client.get_all_perp_market_ids() { + let rpc_fetched_price = client.oracle_price(market).await.unwrap(); + log::info!("fetching market: {market:?}"); + match client.try_get_oracle_price_data_and_slot(market) { + Some(x) => assert!(x.data.price == rpc_fetched_price), + // there was no gRPC synced data since the test started + // this is acceptable for switchboard on demand oracle (seems to update slowly) + None => assert!(client + .program_data() + .spot_market_config_by_index(market.index()) + .is_some_and(|m| m.oracle_source == OracleSource::SwitchboardOnDemand)), + } + } + + // wait for updates + tokio::time::sleep(Duration::from_secs(120)).await; + + // markets available + assert!(client.try_get_perp_market_account(0).is_ok()); + assert!(client.try_get_spot_market_account(1).is_ok()); + + // slot update received + assert!(slot_update_rx.try_recv().is_ok_and(|s| s > 0)); + + // user update received + assert!(user_update_rx.try_recv().is_ok_and(|u| u != DEFAULT_PUBKEY)); + client.grpc_unsubscribe(); }