Skip to content

Commit 484f88e

Browse files
authored
[indexer-v2] add event queries (#14204)
## Description As title. This is a followup PR of #14203 ## Test Plan How did you test the new or updated feature? --- If your changes are not user-facing and not a breaking change, you can skip the following section. Otherwise, please indicate what changed, and then add to the Release Notes section as highlighted during the release process. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes Add event queries
1 parent 6f15337 commit 484f88e

File tree

2 files changed

+212
-4
lines changed

2 files changed

+212
-4
lines changed

crates/sui-indexer/src/apis/indexer_api_v2.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,24 @@ impl IndexerApiServer for IndexerApiV2 {
144144
limit: Option<usize>,
145145
descending_order: Option<bool>,
146146
) -> RpcResult<EventPage> {
147-
unimplemented!()
147+
let limit = cap_page_limit(limit);
148+
if limit == 0 {
149+
return Ok(EventPage::empty());
150+
}
151+
let descending_order = descending_order.unwrap_or(false);
152+
let mut results = self
153+
.inner
154+
.query_events_in_blocking_task(query, cursor, limit + 1, descending_order)
155+
.await?;
156+
157+
let has_next_page = results.len() > limit;
158+
results.truncate(limit);
159+
let next_cursor = results.last().map(|o| o.id.clone());
160+
Ok(Page {
161+
data: results,
162+
next_cursor,
163+
has_next_page,
164+
})
148165
}
149166

150167
async fn get_dynamic_fields(

crates/sui-indexer/src/indexer_reader.rs

Lines changed: 194 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use crate::{
77
checkpoints::StoredCheckpoint,
88
display::StoredDisplay,
99
epoch::StoredEpochInfo,
10+
events::StoredEvent,
1011
objects::{ObjectRefColumn, StoredObject},
1112
packages::StoredPackage,
1213
transactions::StoredTransaction,
1314
tx_indices::TxSequenceNumber,
1415
},
15-
schema_v2::{checkpoints, display, epochs, objects, packages, transactions},
16+
schema_v2::{checkpoints, display, epochs, events, objects, packages, transactions},
1617
types_v2::{IndexerResult, OwnerType},
1718
PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection,
1819
};
@@ -28,7 +29,10 @@ use std::{
2829
collections::{BTreeMap, HashMap},
2930
sync::{Arc, RwLock},
3031
};
31-
use sui_json_rpc_types::{CheckpointId, EpochInfo, SuiTransactionBlockResponse, TransactionFilter};
32+
use sui_json_rpc_types::{
33+
CheckpointId, EpochInfo, EventFilter, SuiEvent, SuiTransactionBlockResponse, TransactionFilter,
34+
};
35+
use sui_types::event::EventID;
3236
use sui_types::{
3337
base_types::{ObjectID, ObjectRef, SequenceNumber, SuiAddress, VersionNumber},
3438
committee::EpochId,
@@ -542,6 +546,19 @@ impl IndexerReader {
542546
})
543547
}
544548

549+
pub async fn query_events_in_blocking_task(
550+
&self,
551+
filter: EventFilter,
552+
cursor: Option<EventID>,
553+
limit: usize,
554+
descending_order: bool,
555+
) -> IndexerResult<Vec<SuiEvent>> {
556+
self.spawn_blocking(move |this| {
557+
this.query_events_impl(filter, cursor, limit, descending_order)
558+
})
559+
.await
560+
}
561+
545562
pub async fn multi_get_objects_in_blocking_task(
546563
&self,
547564
object_ids: Vec<ObjectID>,
@@ -712,7 +729,7 @@ impl IndexerReader {
712729
Some(TransactionFilter::FromAndToAddress { from, to }) => {
713730
let from_address = Hex::encode(from.to_vec());
714731
let to_address = Hex::encode(to.to_vec());
715-
// Need to de-amibguize the tx_sequence_number column
732+
// Need to remove ambiguities for tx_sequence_number column
716733
let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
717734
if is_descending {
718735
format!(
@@ -878,6 +895,180 @@ impl IndexerReader {
878895
.map_err(Into::into)
879896
}
880897

898+
fn query_events_by_tx_digest_query(
899+
&self,
900+
tx_digest: TransactionDigest,
901+
cursor: Option<EventID>,
902+
limit: usize,
903+
descending_order: bool,
904+
) -> IndexerResult<String> {
905+
let cursor = if let Some(cursor) = cursor {
906+
if cursor.tx_digest != tx_digest {
907+
return Err(IndexerError::InvalidArgumentError(
908+
"Cursor tx_digest does not match the tx_digest in the query.".into(),
909+
));
910+
}
911+
if descending_order {
912+
format!("e.{EVENT_SEQUENCE_NUMBER_STR} < {}", cursor.event_seq)
913+
} else {
914+
format!("e.{EVENT_SEQUENCE_NUMBER_STR} > {}", cursor.event_seq)
915+
}
916+
} else if descending_order {
917+
format!("e.{EVENT_SEQUENCE_NUMBER_STR} <= {}", i64::MAX)
918+
} else {
919+
format!("e.{EVENT_SEQUENCE_NUMBER_STR} >= {}", 0)
920+
};
921+
922+
let order_clause = if descending_order { "DESC" } else { "ASC" };
923+
Ok(format!(
924+
"SELECT * \
925+
FROM EVENTS e \
926+
JOIN TRANSACTIONS t \
927+
ON t.tx_sequence_number = e.tx_sequence_number \
928+
AND t.transaction_digest = '\\x{}'::bytea \
929+
WHERE {cursor} \
930+
ORDER BY e.{EVENT_SEQUENCE_NUMBER_STR} {order_clause} \
931+
LIMIT {limit}
932+
",
933+
Hex::encode(tx_digest.into_inner()),
934+
))
935+
}
936+
937+
fn query_events_impl(
938+
&self,
939+
filter: EventFilter,
940+
cursor: Option<EventID>,
941+
limit: usize,
942+
descending_order: bool,
943+
) -> IndexerResult<Vec<SuiEvent>> {
944+
let (tx_seq, event_seq) = if let Some(cursor) = cursor.clone() {
945+
let EventID {
946+
tx_digest,
947+
event_seq,
948+
} = cursor;
949+
(
950+
self.run_query(|conn| {
951+
transactions::dsl::transactions
952+
.select(transactions::tx_sequence_number)
953+
.filter(
954+
transactions::dsl::transaction_digest
955+
.eq(tx_digest.into_inner().to_vec()),
956+
)
957+
.first::<i64>(conn)
958+
})?,
959+
event_seq,
960+
)
961+
} else if descending_order {
962+
let max_tx_seq: i64 = self.run_query(|conn| {
963+
events::dsl::events
964+
.select(events::tx_sequence_number)
965+
.order(events::dsl::tx_sequence_number.desc())
966+
.first::<i64>(conn)
967+
})?;
968+
(max_tx_seq + 1, 0)
969+
} else {
970+
(-1, 0)
971+
};
972+
973+
let query = if let EventFilter::Sender(sender) = &filter {
974+
// Need to remove ambiguities for tx_sequence_number column
975+
let cursor_clause = if descending_order {
976+
format!("(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))", tx_seq, tx_seq, event_seq)
977+
} else {
978+
format!("(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))", tx_seq, tx_seq, event_seq)
979+
};
980+
let order_clause = if descending_order {
981+
format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
982+
} else {
983+
format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
984+
};
985+
format!(
986+
"( \
987+
SELECT *
988+
FROM tx_senders s
989+
JOIN events e
990+
ON e.tx_sequence_number = s.tx_sequence_number
991+
AND s.sender = '\\x{}'::bytea
992+
WHERE {} \
993+
ORDER BY {} \
994+
LIMIT {}
995+
)",
996+
Hex::encode(sender.to_vec()),
997+
cursor_clause,
998+
order_clause,
999+
limit,
1000+
)
1001+
} else if let EventFilter::Transaction(tx_digest) = filter {
1002+
self.query_events_by_tx_digest_query(tx_digest, cursor, limit, descending_order)?
1003+
} else {
1004+
let main_where_clause = match filter {
1005+
EventFilter::Package(package_id) => {
1006+
format!("package = '\\x{}'::bytea", package_id.to_hex())
1007+
}
1008+
EventFilter::MoveModule { package, module } => {
1009+
format!(
1010+
"package = '\\x{}'::bytea AND module = '{}'",
1011+
package.to_hex(),
1012+
module,
1013+
)
1014+
}
1015+
EventFilter::MoveEventType(struct_tag) => {
1016+
format!("event_type = '{}'", struct_tag)
1017+
}
1018+
EventFilter::MoveEventModule { package, module } => {
1019+
let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1020+
format!("event_type LIKE '{package_module_prefix}::%'")
1021+
}
1022+
EventFilter::Sender(_) => {
1023+
// Processed above
1024+
unreachable!()
1025+
}
1026+
EventFilter::Transaction(_) => {
1027+
// Processed above
1028+
unreachable!()
1029+
}
1030+
EventFilter::MoveEventField { .. }
1031+
| EventFilter::All(_)
1032+
| EventFilter::Any(_)
1033+
| EventFilter::And(_, _)
1034+
| EventFilter::Or(_, _)
1035+
| EventFilter::TimeRange { .. } => {
1036+
return Err(IndexerError::NotSupportedError(
1037+
"This type of EventFilter is not supported.".into(),
1038+
));
1039+
}
1040+
};
1041+
1042+
let cursor_clause = if descending_order {
1043+
format!("AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))", tx_seq, tx_seq, event_seq)
1044+
} else {
1045+
format!("AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))", tx_seq, tx_seq, event_seq)
1046+
};
1047+
let order_clause = if descending_order {
1048+
format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1049+
} else {
1050+
format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1051+
};
1052+
1053+
format!(
1054+
"
1055+
SELECT * FROM events \
1056+
WHERE {} {} \
1057+
ORDER BY {} \
1058+
LIMIT {}
1059+
",
1060+
main_where_clause, cursor_clause, order_clause, limit,
1061+
)
1062+
};
1063+
tracing::debug!("query events: {}", query);
1064+
let stored_events =
1065+
self.run_query(|conn| diesel::sql_query(query).load::<StoredEvent>(conn))?;
1066+
stored_events
1067+
.into_iter()
1068+
.map(|se| se.try_into_sui_event(self))
1069+
.collect()
1070+
}
1071+
8811072
pub async fn get_transaction_events_in_blocking_task(
8821073
&self,
8831074
digest: TransactionDigest,

0 commit comments

Comments
 (0)