Skip to content

Commit 84dce0e

Browse files
authored
Backport: Granular slot updates v2.0 (#52) (#58)
Add implementation for on_entry to get more granular slot updates
1 parent 88ebc51 commit 84dce0e

File tree

12 files changed

+581
-17
lines changed

12 files changed

+581
-17
lines changed

Cargo.lock

Lines changed: 94 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ rand = "0.8"
3737
serde = "1.0.160"
3838
serde_derive = "1.0.160"
3939
serde_json = "1.0.96"
40+
serde_with = "=3.9.0"
4041
solana-account-decoder = "=2.0.15"
4142
solana-logger = "=2.0.15"
4243
solana-metrics = "=2.0.15"

client/src/geyser_consumer.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use std::{
1919
use jito_geyser_protos::solana::geyser::{
2020
geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest,
2121
MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest,
22-
SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
22+
SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest,
23+
SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
2324
};
2425
use log::*;
2526
use lru::LruCache;
@@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};
3233

3334
use crate::{
3435
geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed},
35-
types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate},
36+
types::{
37+
AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate,
38+
},
3639
GrpcInterceptor, Pubkey, Slot,
3740
};
3841

@@ -242,6 +245,35 @@ impl GeyserConsumer {
242245
Ok(())
243246
}
244247

248+
pub async fn consume_slot_entry_updates(
249+
&self,
250+
slot_updates_tx: UnboundedSender<SlotEntryUpdate>,
251+
) -> Result<()> {
252+
let mut c = self.client.clone();
253+
254+
let resp = c
255+
.subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {})
256+
.await?;
257+
let mut stream = resp.into_inner();
258+
259+
while !self.exit.load(Ordering::Relaxed) {
260+
match stream.message().await {
261+
Ok(Some(slot_update)) => {
262+
if slot_updates_tx
263+
.send(SlotEntryUpdate::from(slot_update.entry_update.unwrap()))
264+
.is_err()
265+
{
266+
return Err(GeyserConsumerError::ConsumerChannelDisconnected);
267+
};
268+
}
269+
Ok(None) => return Err(StreamClosed),
270+
Err(e) => return Err(GeyserConsumerError::from(e)),
271+
}
272+
}
273+
274+
Ok(())
275+
}
276+
245277
#[allow(clippy::too_many_arguments)]
246278
fn process_account_update(
247279
maybe_message: std::result::Result<Option<TimestampedAccountUpdate>, Status>,

client/src/types.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,17 @@ impl From<geyser::SlotUpdate> for SlotUpdate {
136136
}
137137
}
138138
}
139+
140+
pub struct SlotEntryUpdate {
141+
pub slot: Slot,
142+
pub index: u64,
143+
}
144+
145+
impl From<geyser::SlotEntryUpdate> for SlotEntryUpdate {
146+
fn from(proto: geyser::SlotEntryUpdate) -> Self {
147+
Self {
148+
slot: proto.slot,
149+
index: proto.index,
150+
}
151+
}
152+
}

proto/proto/geyser.proto

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,29 @@ message GetHeartbeatIntervalResponse {
157157
uint64 heartbeat_interval_ms = 1;
158158
}
159159

160+
/// Modelled based off of https://github.com/solana-labs/solana/blob/v2.0/geyser-plugin-interface/src/geyser_plugin_interface.rs#L210
161+
/// If more details are needed can extend this structure.
162+
message SlotEntryUpdate {
163+
// The slot number of the block containing this Entry
164+
uint64 slot = 1;
165+
// The Entry's index in the block
166+
uint64 index = 2;
167+
// The number of executed transactions in the Entry
168+
// If this number is zero, we can assume its a tick entry
169+
uint64 executed_transaction_count = 3;
170+
}
171+
172+
message TimestampedSlotEntryUpdate {
173+
// Time at which the message was generated
174+
// Send relative timestamp in micros using u32 to reduce overhead. Provides ~71 mins of accuracy between sender and receiver
175+
// See [compact_timestamp::to_system_time]
176+
uint32 ts = 1;
177+
// SlotEntryUpdate update
178+
SlotEntryUpdate entry_update = 2;
179+
}
180+
181+
message SubscribeSlotEntryUpdateRequest {}
182+
160183
// The following __must__ be assumed:
161184
// - Clients may receive data for slots out of order.
162185
// - Clients may receive account updates for a given slot out of order.
@@ -186,4 +209,8 @@ service Geyser {
186209

187210
// Subscribes to block updates.
188211
rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {}
212+
213+
// Subscribes to entry updates.
214+
// Returns the highest slot seen thus far and the entry index corresponding to the tick
215+
rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {}
189216
}

proto/src/convert.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ use solana_transaction_status::{
2323
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
2424
};
2525

26-
use crate::solana::{entries, tx_by_addr};
27-
use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta};
26+
use crate::{
27+
solana::{entries, storage::confirmed_block, tx_by_addr},
28+
StoredExtendedRewards, StoredTransactionStatusMeta,
29+
};
2830

2931
impl From<Vec<Reward>> for confirmed_block::Rewards {
3032
fn from(rewards: Vec<Reward>) -> Self {

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ prost-types = { workspace = true }
2121
serde = { workspace = true }
2222
serde_derive = { workspace = true }
2323
serde_json = { workspace = true }
24+
serde_with = { workspace = true }
2425
solana-logger = { workspace = true }
2526
solana-metrics = { workspace = true }
2627
solana-program = { workspace = true }

server/example-config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
"bind_address": "0.0.0.0:10000",
44
"account_update_buffer_size": 100000,
55
"slot_update_buffer_size": 100000,
6+
"slot_entry_update_buffer_size": 1000000,
67
"block_update_buffer_size": 100000,
78
"transaction_update_buffer_size": 100000,
89
"geyser_service_config": {
910
"heartbeat_interval_ms": 1000,
1011
"subscriber_buffer_size": 1000000
1112
}
12-
}
13+
}

0 commit comments

Comments
 (0)