Skip to content

add grpc backed EventSubscriber and example #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 128 additions & 8 deletions crates/src/event_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ use crate::{
events::{FundingPaymentRecord, OrderActionRecord, OrderRecord},
types::{MarketType, Order, OrderAction, OrderActionExplanation, PositionDirection},
},
grpc::{
grpc_subscriber::{DriftGrpcClient, GeyserSubscribeOpts, GrpcConnectionOpts},
TransactionUpdate,
},
types::{events::SwapRecord, SdkResult},
};

Expand Down Expand Up @@ -113,7 +117,7 @@ pub struct EventSubscriber;
impl EventSubscriber {
/// Subscribe to drift events of `sub_account`, backed by Ws APIs
///
/// * `sub_account` - pubkey of the user's sub-account to subscribe to
/// * `sub_account` - pubkey of the user's sub-account to subscribe to (use Drift Program ID to get all program events)
///
/// passing the driftV2 address `dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH`
/// will yield events from all sub-accounts.
Expand All @@ -129,6 +133,14 @@ impl EventSubscriber {
pub fn subscribe_polled(provider: impl EventRpcProvider, account: Pubkey) -> DriftEventStream {
polled_stream(provider, account)
}

pub async fn subscribe_grpc(
endpoint: String,
x_token: String,
sub_account: Pubkey,
) -> SdkResult<DriftEventStream> {
grpc_log_stream(endpoint, x_token, sub_account).await
}
}

struct LogEventStream {
Expand Down Expand Up @@ -164,20 +176,21 @@ impl LogEventStream {
debug!(target: LOG_TARGET, "start log subscription: {sub_account:?}");

while let Some(response) = log_stream.next().await {
self.process_log(response.value).await;
self.process_log(response.context.slot, response.value)
.await;
}
warn!(target: LOG_TARGET, "log stream ended: {sub_account:?}");
}

/// Process a log response from RPC, emitting any relevant events
async fn process_log(&self, response: RpcLogsResponse) {
async fn process_log(&self, slot: u64, response: RpcLogsResponse) {
let signature = response.signature;
if response.err.is_some() {
debug!(target: LOG_TARGET, "skipping failed tx: {signature:?}");
return;
}
if signature == EMPTY_SIGNATURE {
debug!(target: LOG_TARGET, "skipping empty signature");
debug!(target: LOG_TARGET, "skipping empty signature, logs");
return;
}
{
Expand All @@ -189,7 +202,7 @@ impl LogEventStream {
cache.insert(signature.clone());
}

debug!(target: LOG_TARGET, "log extracting events, tx: {signature:?}");
debug!(target: LOG_TARGET, "log extracting events, slot: {slot}, tx: {signature:?}");
for (tx_idx, log) in response.logs.iter().enumerate() {
// a drift sub-account should not interact with any other program by definition
if let Some(event) = try_parse_log(log.as_str(), &signature, tx_idx) {
Expand All @@ -205,6 +218,82 @@ impl LogEventStream {
}
}

struct GrpcLogEventStream {
grpc_endpoint: String,
grpc_x_token: String,
sub_account: Pubkey,
event_tx: Sender<DriftEvent>,
commitment: CommitmentConfig,
}

impl GrpcLogEventStream {
/// Returns a future for running the configured log event stream
async fn stream_fn(self) {
let sub_account = self.sub_account;
info!(target: LOG_TARGET, "grpc log stream connecting: {sub_account:?}");

let mut grpc = DriftGrpcClient::new(self.grpc_endpoint.clone(), self.grpc_x_token.clone())
.grpc_connection_opts(GrpcConnectionOpts::default());

let (raw_event_tx, mut raw_event_rx): (
Sender<TransactionUpdate>,
Receiver<TransactionUpdate>,
) = channel(256);

let raw_event_tx_clone = raw_event_tx.clone();
grpc.on_transaction(Box::new(move |tx_update: &TransactionUpdate| {
raw_event_tx_clone.try_send(tx_update.clone()).unwrap();
}));

// prevent dropping unsub_fn and unsubscribing from grpc
let _unsub_fn = grpc
.subscribe(
self.commitment.commitment,
GeyserSubscribeOpts {
transactions_accounts_include: vec![sub_account.to_string()],
..Default::default()
},
)
.await
.unwrap();
info!(target: LOG_TARGET, "grpc log stream connected: {sub_account:?}");

while let Some(event) = raw_event_rx.recv().await {
let start = std::time::Instant::now();
let slot = event.slot;
self.process_log(&event).await;
let elapsed = start.elapsed();
debug!(target: "grpc", "transaction slot: {}, len: {} callbacks took {:?}", slot, raw_event_rx.len(), elapsed);
}
info!(target: LOG_TARGET, "grpc log stream ended: {sub_account:?}");
}

/// Process a log response from RPC, emitting any relevant events
async fn process_log(&self, event: &TransactionUpdate) {
let signature = event.transaction.signatures.first();
if signature.is_none() {
debug!(target: LOG_TARGET, "skipping tx with no signatures");
return;
}
let signature =
Signature::from(<[u8; 64]>::try_from(signature.unwrap().as_slice()).unwrap());

debug!(target: LOG_TARGET, "log extracting events, slot: {}, tx: {}", event.slot, signature);
let logs = &event.meta.log_messages;
for (tx_idx, log) in logs.iter().enumerate() {
if let Some(event) = try_parse_log(log.as_str(), &signature.to_string(), tx_idx) {
// unrelated events from same tx should not be emitted e.g. a filler tx which produces other fill events
if event.pertains_to(self.sub_account) {
if self.event_tx.send(event).await.is_err() {
warn!("event receiver closed");
return;
}
}
}
}
}
}

/// Creates a poll-ed stream using JSON-RPC interfaces
fn polled_stream(provider: impl EventRpcProvider, sub_account: Pubkey) -> DriftEventStream {
let (event_tx, event_rx) = channel(256);
Expand Down Expand Up @@ -250,6 +339,34 @@ async fn log_stream(ws: Arc<PubsubClient>, sub_account: Pubkey) -> SdkResult<Dri
})
}

/// Creates a grpc-backed event stream
async fn grpc_log_stream(
endpoint: String,
x_token: String,
sub_account: Pubkey,
) -> SdkResult<DriftEventStream> {
debug!(target: LOG_TARGET, "grpc stream events for {sub_account:?}");
let (event_tx, event_rx) = channel(256);

// spawn the event subscription task
let join_handle = tokio::spawn(async move {
GrpcLogEventStream {
grpc_endpoint: endpoint.clone(),
grpc_x_token: x_token.clone(),
sub_account,
event_tx: event_tx.clone(),
commitment: CommitmentConfig::confirmed(),
}
.stream_fn()
.await;
});

Ok(DriftEventStream {
rx: event_rx,
task: join_handle,
})
}

pub struct PolledEventStream<T: EventRpcProvider> {
cache: Arc<RwLock<TxSignatureCache>>,
event_tx: Sender<DriftEvent>,
Expand Down Expand Up @@ -281,7 +398,7 @@ impl<T: EventRpcProvider> PolledEventStream<T> {
self.sub_account,
last_seen_tx
.clone()
.map(|s| Signature::from_str(&s).unwrap()),
.map(|s| Signature::from_str(s.as_str()).unwrap()),
None,
)
.await;
Expand Down Expand Up @@ -475,6 +592,7 @@ pub enum DriftEvent {
signature: String,
tx_idx: usize,
ts: u64,
bit_flags: u8,
},
OrderCancel {
taker: Option<Pubkey>,
Expand Down Expand Up @@ -662,6 +780,7 @@ impl DriftEvent {
ts: value.ts.unsigned_abs(),
signature: signature.to_string(),
tx_idx,
bit_flags: value.bit_flags,
}),
// Place - parsed from `OrderRecord` event, ignored here due to lack of useful info
// Expire - never emitted
Expand Down Expand Up @@ -787,7 +906,7 @@ mod test {
"Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP success",
].into_iter().map(Into::into).collect();

log_stream.process_log(RpcLogsResponse {
log_stream.process_log(338797360, RpcLogsResponse {
signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
err: None,
logs: logs.clone(),
Expand Down Expand Up @@ -821,6 +940,7 @@ mod test {
signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
tx_idx: 9,
ts: 1710893646,
bit_flags: 0,
}
);
assert!(event_rx.try_recv().is_err()); // no more events
Expand All @@ -832,7 +952,7 @@ mod test {
.unwrap();
log_stream.cache.write().await.reset();

log_stream.process_log(RpcLogsResponse {
log_stream.process_log(338797360, RpcLogsResponse {
signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
err: None,
logs: logs.clone(),
Expand Down
Loading
Loading