From c5a0b6c7a0d3582a1838ccd2dc2f1f854d915039 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Thu, 5 Sep 2024 09:44:32 -0700 Subject: [PATCH 1/8] this impl should be correct but txbounds expects a right-open interval, while we have a closed interval due to shift away from using network_total_transactions --- .../src/types/transaction_block/tx_lookups.rs | 120 ++++++++++++------ 1 file changed, 82 insertions(+), 38 deletions(-) diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 9577be7a19340..3b073b9947052 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -63,9 +63,9 @@ use crate::{ type_filter::{FqNameFilter, ModuleFilter}, }, }; -use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel::{CombineDsl, ExpressionMethods, QueryDsl}; use std::fmt::Write; -use sui_indexer::schema::checkpoints; +use sui_indexer::schema::pruner_cp_watermark; /// Bounds on transaction sequence number, imposed by filters, cursors, and the scan limit. The /// outermost bounds are determined by the checkpoint filters. These get translated into bounds in @@ -103,7 +103,8 @@ use sui_indexer::schema::checkpoints; #[derive(Clone, Debug, Copy)] pub(crate) struct TxBounds { /// The inclusive lower bound tx_sequence_number derived from checkpoint bounds. If checkpoint - /// bounds are not provided, this will default to `0`. + /// bounds are not provided, this will default to the smallest transaction sequence number of + /// the earliest checkpoint that has not been pruned. tx_lo: u64, /// The exclusive upper bound tx_sequence_number derived from checkpoint bounds. If checkpoint @@ -140,9 +141,9 @@ impl TxBounds { scan_limit: Option, page: &Page, ) -> Result, diesel::result::Error> { - // Lowerbound in terms of checkpoint sequence number. We want to get the total transaction - // count of the checkpoint before this one, or 0 if there is no previous checkpoint. - let cp_lo = max_option([cp_after.map(|x| x.saturating_add(1)), cp_at]).unwrap_or(0); + // Inclusive lowerbound in terms of checkpoint sequence number. If a lower bound is not set, + // we will default to the smallest checkpoint available from the database. + let cp_lo = max_option([cp_after.map(|x| x.saturating_add(1)), cp_at]); let cp_before_inclusive = match cp_before { // There are no results strictly before checkpoint 0. @@ -151,56 +152,99 @@ impl TxBounds { None => None, }; - // Upperbound in terms of checkpoint sequence number. We want to get the total transaction - // count at the end of this checkpoint. If no upperbound is given, use - // `checkpoint_viewed_at`. + // Inclusive upper bound in terms of checkpoint sequence number. If no upperbound is given, + // use `checkpoint_viewed_at`. // // SAFETY: we can unwrap because of the `Some(checkpoint_viewed_at) let cp_hi = min_option([cp_before_inclusive, cp_at, Some(checkpoint_viewed_at)]).unwrap(); - use checkpoints::dsl; - let (tx_lo, tx_hi) = if let Some(cp_prev) = cp_lo.checked_sub(1) { - let res: Vec = conn + use pruner_cp_watermark::dsl; + // Inclusive lower and upper bounds of the transaction sequence number range. `tx_hi` will + // need to be adjusted to form the expected right-open interval. + let (tx_lo, mut tx_hi) = { + let res: Vec<(i64, i64, i64)> = conn .results(move || { - dsl::checkpoints - .select(dsl::network_total_transactions) - .filter(dsl::sequence_number.eq_any([cp_prev as i64, cp_hi as i64])) - .order_by(dsl::network_total_transactions.asc()) + let mut min_cp_range = dsl::pruner_cp_watermark + .select(( + dsl::checkpoint_sequence_number, + dsl::min_tx_sequence_number, + dsl::max_tx_sequence_number, + )) + .into_boxed(); + + if let Some(cp_lo) = cp_lo { + min_cp_range = min_cp_range + .filter(dsl::checkpoint_sequence_number.eq(cp_lo as i64)) + .limit(1); + } else { + min_cp_range = min_cp_range + .order_by(dsl::checkpoint_sequence_number.asc()) + .limit(1); + }; + + let max_cp_range = dsl::pruner_cp_watermark + .select(( + dsl::checkpoint_sequence_number, + dsl::min_tx_sequence_number, + dsl::max_tx_sequence_number, + )) + .filter(dsl::checkpoint_sequence_number.eq(cp_hi as i64)) + .limit(1); + + min_cp_range.union_all(max_cp_range) }) .await?; - // If there are not two distinct results, it means that the transaction bounds are - // empty (lo and hi are the same), or it means that the one or other of the checkpoints - // doesn't exist, so we can return early. - let &[lo, hi] = res.as_slice() else { - return Ok(None); + // Unless `cp_hi` is given by the client, we expect there to be a record returned for + // `cp_hi` corresponding to the latest `checkpoint_viewed_at`. + let Some(hi_record) = res + .iter() + .find(|&(checkpoint, _, _)| *checkpoint == cp_hi as i64) + else { + if cp_hi == checkpoint_viewed_at { + return Err(diesel::result::Error::NotFound); + } else { + return Ok(None); + } }; - (lo as u64, hi as u64) - } else { - let res: Option = conn - .first(move || { - dsl::checkpoints - .select(dsl::network_total_transactions) - .filter(dsl::sequence_number.eq(cp_hi as i64)) - }) - .await - .optional()?; - - // If there is no result, it means that the checkpoint doesn't exist, so we can return - // early. - let Some(hi) = res else { - return Ok(None); + let lo_record = if let Some(cp_lo) = cp_lo { + let Some(lo_record) = res + .iter() + .find(|&(checkpoint, _, _)| *checkpoint == cp_lo as i64) + else { + return Ok(None); + }; + lo_record + } else { + // If `cp_lo` is not given by the client, we expect there to be a record returned + // for the smallest checkpoint from `pruner_cp_watermark`. + let Some(min_record) = res.iter().min_by_key(|(checkpoint, _, _)| *checkpoint) + else { + return Err(diesel::result::Error::NotFound); + }; + + min_record }; - (0, hi as u64) + (lo_record.1 as u64, hi_record.2 as u64) + }; + + // This is done to make tx_hi an exclusive upper bound. + tx_hi = match tx_hi.checked_add(1) { + Some(new_x) => new_x, + None => return Ok(None), }; - // If the cursors point outside checkpoint bounds, we can return early. + // If the cursors point outside checkpoint bounds, we can return early. Increment `after` by + // one because it is an exclusive bound. If `after` + 1 == `tx_hi`, then there cannot be any + // txs between the cursor and the upper bound. if matches!(page.after(), Some(a) if tx_hi <= a.tx_sequence_number.saturating_add(1)) { return Ok(None); } + // There are no transactions between `tx_lo` + // If `before` cursor is equal to `tx_lo`, there cannot be any txs between if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { return Ok(None); } From 1808bb5d3adcb809add9535e939140844039c16c Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 09:21:35 -0700 Subject: [PATCH 2/8] tests around how transactions will work with pruning --- .../tests/pruning/transactions.exp | 230 ++++++++++++++++++ .../tests/pruning/transactions.move | 176 ++++++++++++++ .../tests/stable/prune.exp | 9 + .../tests/stable/prune.move | 3 + .../src/types/transaction_block/tx_lookups.rs | 3 +- 5 files changed, 419 insertions(+), 2 deletions(-) create mode 100644 crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp create mode 100644 crates/sui-graphql-e2e-tests/tests/pruning/transactions.move diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp new file mode 100644 index 0000000000000..60ca509e13b65 --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp @@ -0,0 +1,230 @@ +processed 18 tasks + +init: +A: object(0,0) + +task 1, lines 6-25: +//# publish +created: object(1,0) +mutated: object(0,1) +gas summary: computation_cost: 1000000, storage_cost: 5570800, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 2, line 27: +//# run Test::M1::create --sender A --args 0 @A +created: object(2,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 3, line 29: +//# create-checkpoint +Checkpoint created: 1 + +task 4, line 31: +//# advance-epoch +Epoch advanced: 0 + +task 5, line 33: +//# run Test::M1::create --sender A --args 1 @A +created: object(5,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 6, line 35: +//# create-checkpoint +Checkpoint created: 3 + +task 7, line 37: +//# advance-epoch +Epoch advanced: 1 + +task 8, line 39: +//# run Test::M1::create --sender A --args 2 @A +created: object(8,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 9, line 41: +//# create-checkpoint +Checkpoint created: 5 + +task 10, line 43: +//# advance-epoch +Epoch advanced: 2 + +task 11, line 45: +//# run Test::M1::create --sender A --args 3 @A +created: object(11,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 12, line 47: +//# create-checkpoint +Checkpoint created: 7 + +task 13, lines 49-90: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "epoch": { + "epochId": 3 + }, + "checkpoints": { + "nodes": [ + { + "epoch": { + "epochId": 2 + }, + "sequenceNumber": 5 + }, + { + "epoch": { + "epochId": 2 + }, + "sequenceNumber": 6 + }, + { + "epoch": { + "epochId": 3 + }, + "sequenceNumber": 7 + } + ] + }, + "unfiltered": { + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + }, + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 14, lines 92-111: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 15, lines 113-132: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + } + ] + } + } +} + +task 16, lines 136-155: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": null, + "endCursor": null, + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [] + } + } +} + +task 17, lines 157-176: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": null, + "endCursor": null, + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [] + } + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move new file mode 100644 index 0000000000000..eeff46c70d7be --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move @@ -0,0 +1,176 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//# init --protocol-version 51 --addresses Test=0x0 --accounts A --simulator --epochs-to-keep 2 + +//# publish +module Test::M1 { + use sui::coin::Coin; + + public struct Object has key, store { + id: UID, + value: u64, + } + + fun foo(_p1: u64, value1: T, _value2: &Coin, _p2: u64): T { + value1 + } + + public entry fun create(value: u64, recipient: address, ctx: &mut TxContext) { + transfer::public_transfer( + Object { id: object::new(ctx), value }, + recipient + ) + } +} + +//# run Test::M1::create --sender A --args 0 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 1 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 2 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 3 @A + +//# create-checkpoint + +//# run-graphql --wait-for-checkpoint-pruned 4 +# The smallest unpruned epoch is 2, starting with checkpoint sequence number 5 +# When a range is not specified, transactions queries should return results starting from the smallest unpruned tx +{ + epoch { + epochId + } + checkpoints { + nodes { + epoch { + epochId + } + sequenceNumber + } + } + unfiltered: transactionBlocks { + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } + transactionBlocks(filter: { signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# In the absence of an upper bound, graphql sets the upper bound to `checkpoint_viewed_at`'s max tx + 1 (right-open interval) +{ + transactionBlocks(filter: { afterCheckpoint: 5 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# In the absence of a lower bound, graphql sets the lower bound to the smallest unpruned checkpoint's min tx +{ + transactionBlocks(filter: { beforeCheckpoint: 7 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + + + +//# run-graphql --wait-for-checkpoint-pruned 4 +# If the caller tries to fetch data outside of the unpruned range, they should receive an empty response +{ + transactionBlocks(filter: { atCheckpoint: 0 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# Empty response if caller tries to fetch data beyond the available upper bound +{ + transactionBlocks(filter: { atCheckpoint: 10 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune.exp b/crates/sui-graphql-e2e-tests/tests/stable/prune.exp index 89c6e5a02529c..e5ce9d00573af 100644 --- a/crates/sui-graphql-e2e-tests/tests/stable/prune.exp +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune.exp @@ -52,6 +52,9 @@ task 11, lines 45-55: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { + "epoch": { + "epochId": 3 + }, "checkpoints": { "nodes": [ { @@ -65,6 +68,12 @@ Response: { "epochId": 2 }, "sequenceNumber": 6 + }, + { + "epoch": { + "epochId": 3 + }, + "sequenceNumber": 7 } ] } diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune.move b/crates/sui-graphql-e2e-tests/tests/stable/prune.move index ae72931658a39..ea6057f7d138a 100644 --- a/crates/sui-graphql-e2e-tests/tests/stable/prune.move +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune.move @@ -44,6 +44,9 @@ module Test::M1 { //# run-graphql --wait-for-checkpoint-pruned 4 { + epoch { + epochId + } checkpoints { nodes { epoch { diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 3b073b9947052..cb5ef16738f20 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -243,8 +243,7 @@ impl TxBounds { return Ok(None); } - // There are no transactions between `tx_lo` - // If `before` cursor is equal to `tx_lo`, there cannot be any txs between + // If `before` cursor is equal to `tx_lo`, there cannot be any txs between the two points. if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { return Ok(None); } From a07d37754aa0e20c6ed11bf0b18af2729abe73e4 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 12:01:34 -0700 Subject: [PATCH 3/8] track min unpruned cp in watermark task, simplify txbounds::query implementation --- .../tests/pruning/transactions.exp | 85 +++++++++++++- .../tests/pruning/transactions.move | 64 ++++++++++ crates/sui-graphql-rpc/src/server/builder.rs | 2 +- .../src/server/watermark_task.rs | 63 +++++++--- crates/sui-graphql-rpc/src/types/epoch.rs | 4 +- crates/sui-graphql-rpc/src/types/query.rs | 109 ++++++++---------- .../src/types/transaction_block/mod.rs | 8 +- .../src/types/transaction_block/tx_lookups.rs | 105 +++++++---------- .../src/types/zklogin_verify_signature.rs | 6 +- 9 files changed, 296 insertions(+), 150 deletions(-) diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp index 60ca509e13b65..adaa5fae7b6d5 100644 --- a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp @@ -1,4 +1,4 @@ -processed 18 tasks +processed 21 tasks init: A: object(0,0) @@ -228,3 +228,86 @@ Response: { } } } + +task 18, lines 179-198: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 19, lines 200-219: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 20, lines 221-240: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + } + ] + } + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move index eeff46c70d7be..12ec1a8aa851f 100644 --- a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move @@ -174,3 +174,67 @@ module Test::M1 { } } } + + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { afterCheckpoint: 5 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { beforeCheckpoint: 7 signAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 4d2b35ac28f9b..32f8c6feeef34 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -652,7 +652,7 @@ async fn health_check( .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG); let checkpoint_timestamp = - Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms); + Duration::from_millis(watermark_lock.read().await.hi_cp_timestamp_ms); let now_millis = Utc::now().timestamp_millis(); diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index ca7bb81e8ea70..a5eb29f240377 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -6,7 +6,10 @@ use crate::error::Error; use crate::metrics::Metrics; use crate::types::chain_identifier::ChainIdentifier; use async_graphql::ServerError; -use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel::{ + query_dsl::positional_order_dsl::PositionalOrderDsl, CombineDsl, ExpressionMethods, + OptionalExtension, QueryDsl, +}; use diesel_async::scoped_futures::ScopedFutureExt; use std::mem; use std::sync::Arc; @@ -40,15 +43,17 @@ pub(crate) type WatermarkLock = Arc>; /// changes. #[derive(Clone, Copy, Default)] pub(crate) struct Watermark { - /// The checkpoint upper-bound for the query. - pub checkpoint: u64, - /// The checkpoint upper-bound timestamp for the query. - pub checkpoint_timestamp_ms: u64, + /// The inclusive checkpoint upper-bound for the query. + pub hi_cp: u64, + /// The timestamp of the inclusive upper-bound checkpoint for the query. + pub hi_cp_timestamp_ms: u64, /// The current epoch. pub epoch: u64, + /// Smallest queryable checkpoint - checkpoints below this value are pruned. + pub lo_cp: u64, } -/// Starts an infinite loop that periodically updates the `checkpoint_viewed_at` high watermark. +/// Starts an infinite loop that periodically updates the watermark. impl WatermarkTask { pub(crate) fn new( db: Db, @@ -83,7 +88,7 @@ impl WatermarkTask { return; }, _ = interval.tick() => { - let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await { + let Watermark {lo_cp, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -96,11 +101,13 @@ impl WatermarkTask { // Write the watermark as follows to limit how long we hold the lock let prev_epoch = { let mut w = self.watermark.write().await; - w.checkpoint = checkpoint; - w.checkpoint_timestamp_ms = checkpoint_timestamp_ms; + w.hi_cp = hi_cp; + w.hi_cp_timestamp_ms = hi_cp_timestamp_ms; + w.lo_cp = lo_cp; mem::replace(&mut w.epoch, epoch) }; + // On epoch boundary, notify subscribers if epoch > prev_epoch { self.sender.send(epoch).unwrap(); } @@ -155,21 +162,31 @@ impl Watermark { pub(crate) async fn new(lock: WatermarkLock) -> Self { let w = lock.read().await; Self { - checkpoint: w.checkpoint, - checkpoint_timestamp_ms: w.checkpoint_timestamp_ms, + hi_cp: w.hi_cp, + hi_cp_timestamp_ms: w.hi_cp_timestamp_ms, epoch: w.epoch, + lo_cp: w.lo_cp, } } pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; - let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db + let Some(result): Option> = db .execute(move |conn| { async { - conn.first(move || { - dsl::checkpoints + conn.results(move || { + let min_cp = dsl::checkpoints + .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) + .order_by(dsl::sequence_number.asc()) + .limit(1); + + let max_cp = dsl::checkpoints .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) .order_by(dsl::sequence_number.desc()) + .limit(1); + + // Order by sequence_number, which is in the 1st position + min_cp.union_all(max_cp).positional_order_by(1) }) .await .optional() @@ -177,14 +194,24 @@ impl Watermark { .scope_boxed() }) .await - .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))? + .map_err(|e| Error::Internal(format!("Failed to fetch watermark data: {e}")))? else { + // An empty response from the db is valid when indexer has not written any checkpoints + // to the db yet. return Ok(None); }; + + if result.len() != 2 { + return Err(Error::Internal( + "Expected 2 rows from watermark query".to_string(), + )); + } + Ok(Some(Watermark { - checkpoint: checkpoint as u64, - checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64, - epoch: epoch as u64, + hi_cp: result[1].0 as u64, + hi_cp_timestamp_ms: result[1].1 as u64, + epoch: result[1].2 as u64, + lo_cp: result[0].0 as u64, })) } } diff --git a/crates/sui-graphql-rpc/src/types/epoch.rs b/crates/sui-graphql-rpc/src/types/epoch.rs index 36839af4001ae..a2c63ad37f636 100644 --- a/crates/sui-graphql-rpc/src/types/epoch.rs +++ b/crates/sui-graphql-rpc/src/types/epoch.rs @@ -108,8 +108,8 @@ impl Epoch { let last = match self.stored.last_checkpoint_id { Some(last) => last as u64, None => { - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); - checkpoint + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); + hi_cp } }; diff --git a/crates/sui-graphql-rpc/src/types/query.rs b/crates/sui-graphql-rpc/src/types/query.rs index ca8d9afc5a0f0..b582fba74a0e2 100644 --- a/crates/sui-graphql-rpc/src/types/query.rs +++ b/crates/sui-graphql-rpc/src/types/query.rs @@ -74,8 +74,8 @@ impl Query { /// Range of checkpoints that the RPC has data available for (for data /// that can be tied to a particular checkpoint). async fn available_range(&self, ctx: &Context<'_>) -> Result { - let Watermark { checkpoint, .. } = *ctx.data()?; - AvailableRange::query(ctx.data_unchecked(), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + AvailableRange::query(ctx.data_unchecked(), hi_cp) .await .extend() } @@ -211,10 +211,10 @@ impl Query { address: SuiAddress, root_version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; Ok(Some(Owner { address, - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, root_version: root_version.map(|v| v.into()), })) } @@ -227,10 +227,10 @@ impl Query { address: SuiAddress, version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let key = match version { - Some(version) => Object::at_version(version.into(), checkpoint), - None => Object::latest_at(checkpoint), + Some(version) => Object::at_version(version.into(), hi_cp), + None => Object::latest_at(hi_cp), }; Object::query(ctx, address, key).await.extend() @@ -252,10 +252,10 @@ impl Query { address: SuiAddress, version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let key = match version { - Some(version) => MovePackage::by_version(version.into(), checkpoint), - None => MovePackage::by_id_at(checkpoint), + Some(version) => MovePackage::by_version(version.into(), hi_cp), + None => MovePackage::by_id_at(hi_cp), }; MovePackage::query(ctx, address, key).await.extend() @@ -270,19 +270,19 @@ impl Query { ctx: &Context<'_>, address: SuiAddress, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - MovePackage::query(ctx, address, MovePackage::latest_at(checkpoint)) + let Watermark { hi_cp, .. } = *ctx.data()?; + MovePackage::query(ctx, address, MovePackage::latest_at(hi_cp)) .await .extend() } /// Look-up an Account by its SuiAddress. async fn address(&self, ctx: &Context<'_>, address: SuiAddress) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; Ok(Some(Address { address, - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, })) } @@ -297,8 +297,8 @@ impl Query { /// Fetch epoch information by ID (defaults to the latest epoch). async fn epoch(&self, ctx: &Context<'_>, id: Option) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Epoch::query(ctx, id.map(|id| id.into()), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Epoch::query(ctx, id.map(|id| id.into()), hi_cp) .await .extend() } @@ -310,8 +310,8 @@ impl Query { ctx: &Context<'_>, id: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Checkpoint::query(ctx, id.unwrap_or_default(), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Checkpoint::query(ctx, id.unwrap_or_default(), hi_cp) .await .extend() } @@ -322,10 +322,8 @@ impl Query { ctx: &Context<'_>, digest: Digest, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - TransactionBlock::query(ctx, digest, checkpoint) - .await - .extend() + let Watermark { hi_cp, .. } = *ctx.data()?; + TransactionBlock::query(ctx, digest, hi_cp).await.extend() } /// The coin objects that exist in the network. @@ -341,7 +339,7 @@ impl Query { before: Option, type_: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; let coin = type_.map_or_else(GAS::type_tag, |t| t.0); @@ -350,7 +348,7 @@ impl Query { page, coin, /* owner */ None, - checkpoint, + hi_cp, ) .await .extend() @@ -365,17 +363,12 @@ impl Query { last: Option, before: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - Checkpoint::paginate( - ctx.data_unchecked(), - page, - /* epoch */ None, - checkpoint, - ) - .await - .extend() + Checkpoint::paginate(ctx.data_unchecked(), page, /* epoch */ None, hi_cp) + .await + .extend() } /// The transaction blocks that exist in the network. @@ -408,19 +401,13 @@ impl Query { filter: Option, scan_limit: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - TransactionBlock::paginate( - ctx, - page, - filter.unwrap_or_default(), - checkpoint, - scan_limit, - ) - .await - .extend() + TransactionBlock::paginate(ctx, page, filter.unwrap_or_default(), hi_cp, scan_limit) + .await + .extend() } /// Query events that are emitted in the network. @@ -435,14 +422,14 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; Event::paginate( ctx.data_unchecked(), page, filter.unwrap_or_default(), - checkpoint, + hi_cp, ) .await .extend() @@ -458,14 +445,14 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; Object::paginate( ctx.data_unchecked(), page, filter.unwrap_or_default(), - checkpoint, + hi_cp, ) .await .extend() @@ -485,10 +472,10 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - MovePackage::paginate_by_checkpoint(ctx.data_unchecked(), page, filter, checkpoint) + MovePackage::paginate_by_checkpoint(ctx.data_unchecked(), page, filter, hi_cp) .await .extend() } @@ -506,10 +493,10 @@ impl Query { address: SuiAddress, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - MovePackage::paginate_by_version(ctx.data_unchecked(), page, address, filter, checkpoint) + MovePackage::paginate_by_version(ctx.data_unchecked(), page, address, filter, hi_cp) .await .extend() } @@ -532,14 +519,14 @@ impl Query { ctx: &Context<'_>, domain: Domain, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Ok(NameService::resolve_to_record(ctx, &domain, checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Ok(NameService::resolve_to_record(ctx, &domain, hi_cp) .await .extend()? .and_then(|r| r.target_address) .map(|a| Address { address: a.into(), - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, })) } @@ -549,17 +536,15 @@ impl Query { ctx: &Context<'_>, name: String, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; - NamedMovePackage::query(ctx, &name, checkpoint) - .await - .extend() + NamedMovePackage::query(ctx, &name, hi_cp).await.extend() } /// Fetch a type that includes dot move service names in it. async fn type_by_name(&self, ctx: &Context<'_>, name: String) -> Result { - let Watermark { checkpoint, .. } = *ctx.data()?; - let type_tag = NamedType::query(ctx, &name, checkpoint).await?; + let Watermark { hi_cp, .. } = *ctx.data()?; + let type_tag = NamedType::query(ctx, &name, hi_cp).await?; Ok(type_tag.into()) } @@ -570,8 +555,8 @@ impl Query { ctx: &Context<'_>, coin_type: ExactTypeFilter, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - CoinMetadata::query(ctx.data_unchecked(), coin_type.0, checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + CoinMetadata::query(ctx.data_unchecked(), coin_type.0, hi_cp) .await .extend() } diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs index b03a9e3cd7cd9..1e9731037312b 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs @@ -150,8 +150,8 @@ impl TransactionBlock { // Non-stored transactions have a sentinel checkpoint_viewed_at value that generally // prevents access to further queries, but inputs should generally be available so try // to access them at the high watermark. - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); - checkpoint + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); + hi_cp }; Some(GasInput::from( @@ -325,6 +325,9 @@ impl TransactionBlock { let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); let db: &Db = ctx.data_unchecked(); let is_from_front = page.is_from_front(); + // If we've entered this function, we already fetched `checkpoint_viewed_at` from the + // `Watermark`, and so we must be able to retrieve `lo_cp` as well. + let Watermark { lo_cp, .. } = *ctx.data_unchecked(); use transactions::dsl as tx; let (prev, next, transactions, tx_bounds): ( @@ -340,6 +343,7 @@ impl TransactionBlock { filter.after_checkpoint.map(u64::from), filter.at_checkpoint.map(u64::from), filter.before_checkpoint.map(u64::from), + lo_cp, checkpoint_viewed_at, scan_limit, &page, diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index cb5ef16738f20..a55e3704b7308 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -65,7 +65,7 @@ use crate::{ }; use diesel::{CombineDsl, ExpressionMethods, QueryDsl}; use std::fmt::Write; -use sui_indexer::schema::pruner_cp_watermark; +use sui_indexer::schema::checkpoints; /// Bounds on transaction sequence number, imposed by filters, cursors, and the scan limit. The /// outermost bounds are determined by the checkpoint filters. These get translated into bounds in @@ -137,13 +137,22 @@ impl TxBounds { cp_after: Option, cp_at: Option, cp_before: Option, + min_unpruned_checkpoint: u64, checkpoint_viewed_at: u64, scan_limit: Option, page: &Page, ) -> Result, diesel::result::Error> { // Inclusive lowerbound in terms of checkpoint sequence number. If a lower bound is not set, - // we will default to the smallest checkpoint available from the database. - let cp_lo = max_option([cp_after.map(|x| x.saturating_add(1)), cp_at]); + // we will default to the smallest checkpoint available from the database, retrieved from + // the watermark. + // + // SAFETY: we can unwrap because of the `Some(min_unpruned_checkpoint)` + let cp_lo = max_option([ + cp_after.map(|x| x.saturating_add(1)), + cp_at, + Some(min_unpruned_checkpoint), + ]) + .unwrap(); let cp_before_inclusive = match cp_before { // There are no results strictly before checkpoint 0. @@ -158,88 +167,62 @@ impl TxBounds { // SAFETY: we can unwrap because of the `Some(checkpoint_viewed_at) let cp_hi = min_option([cp_before_inclusive, cp_at, Some(checkpoint_viewed_at)]).unwrap(); - use pruner_cp_watermark::dsl; - // Inclusive lower and upper bounds of the transaction sequence number range. `tx_hi` will - // need to be adjusted to form the expected right-open interval. - let (tx_lo, mut tx_hi) = { - let res: Vec<(i64, i64, i64)> = conn + // Read from the `checkpoints` table rather than the `pruner_cp_watermark` table, because + // the `checkpoints` table is pruned first. + use checkpoints::dsl; + // Inclusive lower bound and exclusive upper bound of the transaction sequence number range. + let (tx_lo, tx_hi) = { + let res: Vec<(i64, Option, i64)> = conn .results(move || { - let mut min_cp_range = dsl::pruner_cp_watermark + let min_cp_range = dsl::checkpoints .select(( - dsl::checkpoint_sequence_number, + dsl::sequence_number, dsl::min_tx_sequence_number, - dsl::max_tx_sequence_number, + dsl::network_total_transactions, )) - .into_boxed(); - - if let Some(cp_lo) = cp_lo { - min_cp_range = min_cp_range - .filter(dsl::checkpoint_sequence_number.eq(cp_lo as i64)) - .limit(1); - } else { - min_cp_range = min_cp_range - .order_by(dsl::checkpoint_sequence_number.asc()) - .limit(1); - }; - - let max_cp_range = dsl::pruner_cp_watermark + .filter(dsl::sequence_number.eq(cp_lo as i64)) + .limit(1); + + let max_cp_range = dsl::checkpoints .select(( - dsl::checkpoint_sequence_number, + dsl::sequence_number, dsl::min_tx_sequence_number, - dsl::max_tx_sequence_number, + dsl::network_total_transactions, )) - .filter(dsl::checkpoint_sequence_number.eq(cp_hi as i64)) + .filter(dsl::sequence_number.eq(cp_hi as i64)) .limit(1); min_cp_range.union_all(max_cp_range) }) .await?; - // Unless `cp_hi` is given by the client, we expect there to be a record returned for - // `cp_hi` corresponding to the latest `checkpoint_viewed_at`. let Some(hi_record) = res .iter() .find(|&(checkpoint, _, _)| *checkpoint == cp_hi as i64) else { - if cp_hi == checkpoint_viewed_at { - return Err(diesel::result::Error::NotFound); - } else { - return Ok(None); - } + return Ok(None); }; - let lo_record = if let Some(cp_lo) = cp_lo { - let Some(lo_record) = res - .iter() - .find(|&(checkpoint, _, _)| *checkpoint == cp_lo as i64) - else { - return Ok(None); - }; - lo_record - } else { - // If `cp_lo` is not given by the client, we expect there to be a record returned - // for the smallest checkpoint from `pruner_cp_watermark`. - let Some(min_record) = res.iter().min_by_key(|(checkpoint, _, _)| *checkpoint) - else { - return Err(diesel::result::Error::NotFound); - }; - - min_record + let Some(lo_record) = res + .iter() + .find(|&(checkpoint, _, _)| *checkpoint == cp_lo as i64) + else { + return Ok(None); }; - (lo_record.1 as u64, hi_record.2 as u64) - }; + let tx_lo = match lo_record.1 { + Some(lo) => lo, + // Ostensibly this shouldn't happen in production, but should it occur, we can use + // `network_total_transactions` to exclude checkpoints < this one + None => lo_record.2, + } as u64; - // This is done to make tx_hi an exclusive upper bound. - tx_hi = match tx_hi.checked_add(1) { - Some(new_x) => new_x, - None => return Ok(None), + (tx_lo, hi_record.2 as u64) }; - // If the cursors point outside checkpoint bounds, we can return early. Increment `after` by - // one because it is an exclusive bound. If `after` + 1 == `tx_hi`, then there cannot be any - // txs between the cursor and the upper bound. - if matches!(page.after(), Some(a) if tx_hi <= a.tx_sequence_number.saturating_add(1)) { + // The `after` cursor is outside of the bounds if `after + 1`, the first element in the + // page, lies on or outside the exclusive upper bound. + if matches!(page.after(), Some(a) if a.tx_sequence_number.saturating_add(1) >= tx_hi) { return Ok(None); } diff --git a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs index eb51fe116f263..80b5bcb2c71de 100644 --- a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs +++ b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs @@ -54,10 +54,10 @@ pub(crate) async fn verify_zklogin_signature( intent_scope: ZkLoginIntentScope, author: SuiAddress, ) -> Result { - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); // get current epoch from db. - let Some(curr_epoch) = Epoch::query(ctx, None, checkpoint).await? else { + let Some(curr_epoch) = Epoch::query(ctx, None, hi_cp).await? else { return Err(Error::Internal( "Cannot get current epoch from db".to_string(), )); @@ -88,7 +88,7 @@ pub(crate) async fn verify_zklogin_signature( bcs: Base64(bcs::to_bytes(&1u64).unwrap()), }, DynamicFieldType::DynamicField, - checkpoint, + hi_cp, ) .await .map_err(|e| as_jwks_read_error(e.to_string()))?; From 0ad253cf89ce5c5d43a16a56287f0f3ae2b63d6a Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 15:05:06 -0700 Subject: [PATCH 4/8] i think we should return an error when people query for data that is pruned? --- .../tests/pruning/transactions.exp | 153 +++++++++++++++--- .../tests/pruning/transactions.move | 93 ++++++++++- .../src/server/watermark_task.rs | 28 +++- .../src/types/transaction_block/mod.rs | 40 +++-- .../src/types/transaction_block/tx_lookups.rs | 36 +++-- 5 files changed, 307 insertions(+), 43 deletions(-) diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp index adaa5fae7b6d5..c40d8e6054358 100644 --- a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.exp @@ -1,4 +1,4 @@ -processed 21 tasks +processed 25 tasks init: A: object(0,0) @@ -61,7 +61,7 @@ task 12, line 47: //# create-checkpoint Checkpoint created: 7 -task 13, lines 49-90: +task 13, lines 49-96: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { @@ -91,6 +91,12 @@ Response: { ] }, "unfiltered": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, "nodes": [ { "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", @@ -147,7 +153,7 @@ Response: { } } -task 14, lines 92-111: +task 14, lines 98-117: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { @@ -172,7 +178,7 @@ Response: { } } -task 15, lines 113-132: +task 15, lines 119-138: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { @@ -197,23 +203,30 @@ Response: { } } -task 16, lines 136-155: +task 16, lines 142-161: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { - "data": { - "transactionBlocks": { - "pageInfo": { - "startCursor": null, - "endCursor": null, - "hasPreviousPage": false, - "hasNextPage": false - }, - "nodes": [] + "data": null, + "errors": [ + { + "message": "Requested data is pruned and no longer available", + "locations": [ + { + "line": 3, + "column": 3 + } + ], + "path": [ + "transactionBlocks" + ], + "extensions": { + "code": "BAD_USER_INPUT" + } } - } + ] } -task 17, lines 157-176: +task 17, lines 163-182: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { @@ -229,7 +242,7 @@ Response: { } } -task 18, lines 179-198: +task 18, lines 185-204: //# run-graphql Response: { "data": { @@ -262,7 +275,7 @@ Response: { } } -task 19, lines 200-219: +task 19, lines 206-225: //# run-graphql Response: { "data": { @@ -287,7 +300,7 @@ Response: { } } -task 20, lines 221-240: +task 20, lines 227-246: //# run-graphql Response: { "data": { @@ -311,3 +324,105 @@ Response: { } } } + +task 21, lines 248-268: +//# run-graphql --cursors {"c":7,"t":6,"i":false} +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo3LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": true, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 22, lines 270-289: +//# run-graphql --cursors {"c":7,"t":0,"i":false} +Response: { + "data": null, + "errors": [ + { + "message": "Requested data is pruned and no longer available", + "locations": [ + { + "line": 3, + "column": 3 + } + ], + "path": [ + "transactionBlocks" + ], + "extensions": { + "code": "BAD_USER_INPUT" + } + } + ] +} + +task 23, lines 291-310: +//# run-graphql --cursors {"c":7,"t":0,"i":true} +Response: { + "data": null, + "errors": [ + { + "message": "Requested data is pruned and no longer available", + "locations": [ + { + "line": 3, + "column": 3 + } + ], + "path": [ + "transactionBlocks" + ], + "extensions": { + "code": "BAD_USER_INPUT" + } + } + ] +} + +task 24, lines 312-331: +//# run-graphql --cursors {"c":7,"t":0,"i":true} +Response: { + "data": null, + "errors": [ + { + "message": "Requested data is pruned and no longer available", + "locations": [ + { + "line": 3, + "column": 3 + } + ], + "path": [ + "transactionBlocks" + ], + "extensions": { + "code": "BAD_USER_INPUT" + } + } + ] +} diff --git a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move index 12ec1a8aa851f..9329b1eb00438 100644 --- a/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move +++ b/crates/sui-graphql-e2e-tests/tests/pruning/transactions.move @@ -62,6 +62,12 @@ module Test::M1 { } } unfiltered: transactionBlocks { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } nodes { digest effects { @@ -134,7 +140,7 @@ module Test::M1 { //# run-graphql --wait-for-checkpoint-pruned 4 -# If the caller tries to fetch data outside of the unpruned range, they should receive an empty response +# If the caller tries to fetch data outside of the unpruned range, they should receive an error { transactionBlocks(filter: { atCheckpoint: 0 signAddress: "@{A}" }) { pageInfo { @@ -238,3 +244,88 @@ module Test::M1 { } } } + +//# run-graphql --cursors {"c":7,"t":6,"i":false} +# The first tx after pruning has seq num 6. +# When viewed at checkpoint 7, there are two more txs that follow it. +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":false} +# Data is pruned and no longer available +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":true} +# Data is pruned and no longer available +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":true} +# Data is pruned and no longer available +{ + transactionBlocks(scanLimit: 10000 after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index a5eb29f240377..665e48f2c6552 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -51,6 +51,7 @@ pub(crate) struct Watermark { pub epoch: u64, /// Smallest queryable checkpoint - checkpoints below this value are pruned. pub lo_cp: u64, + pub lo_tx: u64, } /// Starts an infinite loop that periodically updates the watermark. @@ -88,7 +89,7 @@ impl WatermarkTask { return; }, _ = interval.tick() => { - let Watermark {lo_cp, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { + let Watermark {lo_cp, lo_tx, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -104,6 +105,7 @@ impl WatermarkTask { w.hi_cp = hi_cp; w.hi_cp_timestamp_ms = hi_cp_timestamp_ms; w.lo_cp = lo_cp; + w.lo_tx = lo_tx; mem::replace(&mut w.epoch, epoch) }; @@ -166,22 +168,33 @@ impl Watermark { hi_cp_timestamp_ms: w.hi_cp_timestamp_ms, epoch: w.epoch, lo_cp: w.lo_cp, + lo_tx: w.lo_tx, } } pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; - let Some(result): Option> = db + let Some(result): Option)>> = db .execute(move |conn| { async { conn.results(move || { let min_cp = dsl::checkpoints - .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) + .select(( + dsl::sequence_number, + dsl::timestamp_ms, + dsl::epoch, + dsl::min_tx_sequence_number, + )) .order_by(dsl::sequence_number.asc()) .limit(1); let max_cp = dsl::checkpoints - .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) + .select(( + dsl::sequence_number, + dsl::timestamp_ms, + dsl::epoch, + dsl::min_tx_sequence_number, + )) .order_by(dsl::sequence_number.desc()) .limit(1); @@ -207,11 +220,18 @@ impl Watermark { )); } + let Some(lo_tx) = result[0].3 else { + return Err(Error::Internal( + "Expected min_tx_sequence_number to be non-null".to_string(), + )); + }; + Ok(Some(Watermark { hi_cp: result[1].0 as u64, hi_cp_timestamp_ms: result[1].1 as u64, epoch: result[1].2 as u64, lo_cp: result[0].0 as u64, + lo_tx: lo_tx as u64, })) } } diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs index 1e9731037312b..70f6b56c0cf6e 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs @@ -315,19 +315,39 @@ impl TransactionBlock { } } - // If page size or scan limit is 0, we want to standardize behavior by returning an empty - // connection - if filter.is_empty() || page.limit() == 0 || scan_limit.is_some_and(|v| v == 0) { - return Ok(ScanConnection::new(false, false)); - } - let cursor_viewed_at = page.validate_cursor_consistency()?; let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); let db: &Db = ctx.data_unchecked(); let is_from_front = page.is_from_front(); // If we've entered this function, we already fetched `checkpoint_viewed_at` from the // `Watermark`, and so we must be able to retrieve `lo_cp` as well. - let Watermark { lo_cp, .. } = *ctx.data_unchecked(); + let Watermark { lo_cp, lo_tx, .. } = *ctx.data_unchecked(); + let cp_after = filter.after_checkpoint.map(u64::from); + let cp_at = filter.at_checkpoint.map(u64::from); + let cp_before = filter.before_checkpoint.map(u64::from); + + // Check that requested data is not already pruned. This should catch most cases, except on + // epoch boundary where a request is made just before the epoch pruning process is kicked + // off. + if checkpoint_viewed_at < lo_cp + || cp_after.map_or(false, |cp| cp < lo_cp) + || cp_at.map_or(false, |cp| cp < lo_cp) + || cp_before.map_or(false, |cp| cp < lo_cp) + || page.after().map_or(false, |c| c.tx_sequence_number < lo_tx) + || page + .before() + .map_or(false, |c| c.tx_sequence_number < lo_tx) + { + return Err(Error::Client( + "Requested data is pruned and no longer available".to_string(), + )); + } + + // If page size or scan limit is 0, we want to standardize behavior by returning an empty + // connection + if filter.is_empty() || page.limit() == 0 || scan_limit.is_some_and(|v| v == 0) { + return Ok(ScanConnection::new(false, false)); + } use transactions::dsl as tx; let (prev, next, transactions, tx_bounds): ( @@ -340,9 +360,9 @@ impl TransactionBlock { async move { let Some(tx_bounds) = TxBounds::query( conn, - filter.after_checkpoint.map(u64::from), - filter.at_checkpoint.map(u64::from), - filter.before_checkpoint.map(u64::from), + cp_after, + cp_at, + cp_before, lo_cp, checkpoint_viewed_at, scan_limit, diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index a55e3704b7308..17f149298a992 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -130,8 +130,9 @@ impl TxBounds { /// Determines the `tx_sequence_number` range from the checkpoint bounds for a transaction block /// query. If no checkpoint range is specified, the default is between 0 and the /// `checkpoint_viewed_at`. The corresponding `tx_sequence_number` range is fetched from db, and - /// further adjusted by cursors and scan limit. If there are any inconsistencies or invalid - /// combinations, i.e. `after` cursor is greater than the upper bound, return None. + /// further adjusted by cursors and scan limit. If the checkpoints cannot be found, or if there + /// are any inconsistencies or invalid combinations, i.e. `after` cursor is greater than the + /// upper bound, return None. pub(crate) async fn query( conn: &mut Conn<'_>, cp_after: Option, @@ -220,16 +221,33 @@ impl TxBounds { (tx_lo, hi_record.2 as u64) }; - // The `after` cursor is outside of the bounds if `after + 1`, the first element in the - // page, lies on or outside the exclusive upper bound. - if matches!(page.after(), Some(a) if a.tx_sequence_number.saturating_add(1) >= tx_hi) { + // // The `after` cursor is outside of the bounds if `after + 1`, the first element in the + // // page, lies on or outside the exclusive upper bound. + // if matches!(page.after(), Some(a) if a.tx_sequence_number.saturating_add(1) >= tx_hi) { + // return Ok(None); + // } + + // // If `before` cursor is equal to `tx_lo`, there cannot be any txs between the two points. + // if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { + // return Ok(None); + // } + + let after_plus_one = page.after().map(|a| a.tx_sequence_number.saturating_add(1)); + let before_seq = page.before().map(|b| b.tx_sequence_number); + + if !match (after_plus_one, before_seq) { + (Some(a), Some(b)) => tx_lo < a && a < b && b <= tx_hi, + // a must be strictly less than tx_hi + (Some(a), None) => tx_lo < a && a < tx_hi, + // b can be equal to tx_hi because both are exclusive + (None, Some(b)) => tx_lo < b && b <= tx_hi, + // tx_lo must be strictly less than tx_hi for the right-open interval + (None, None) => tx_lo < tx_hi, + } { return Ok(None); } - // If `before` cursor is equal to `tx_lo`, there cannot be any txs between the two points. - if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { - return Ok(None); - } + println!("tx_lo: {}, tx_hi: {}", tx_lo, tx_hi); Ok(Some(Self { tx_lo, From 76daaee18200a9e75835dc7f418df35bc03a873f Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 15:40:17 -0700 Subject: [PATCH 5/8] cleanup --- .../src/types/transaction_block/tx_lookups.rs | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 17f149298a992..9edd653507723 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -221,21 +221,10 @@ impl TxBounds { (tx_lo, hi_record.2 as u64) }; - // // The `after` cursor is outside of the bounds if `after + 1`, the first element in the - // // page, lies on or outside the exclusive upper bound. - // if matches!(page.after(), Some(a) if a.tx_sequence_number.saturating_add(1) >= tx_hi) { - // return Ok(None); - // } + let cursor_lo_exclusive = page.after().map(|a| a.tx_sequence_number); + let cursor_hi = page.before().map(|b| b.tx_sequence_number); - // // If `before` cursor is equal to `tx_lo`, there cannot be any txs between the two points. - // if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { - // return Ok(None); - // } - - let after_plus_one = page.after().map(|a| a.tx_sequence_number.saturating_add(1)); - let before_seq = page.before().map(|b| b.tx_sequence_number); - - if !match (after_plus_one, before_seq) { + if !match (cursor_lo_exclusive.map(|a| a.saturating_add(1)), cursor_hi) { (Some(a), Some(b)) => tx_lo < a && a < b && b <= tx_hi, // a must be strictly less than tx_hi (Some(a), None) => tx_lo < a && a < tx_hi, @@ -247,13 +236,11 @@ impl TxBounds { return Ok(None); } - println!("tx_lo: {}, tx_hi: {}", tx_lo, tx_hi); - Ok(Some(Self { tx_lo, tx_hi, - cursor_lo_exclusive: page.after().map(|a| a.tx_sequence_number), - cursor_hi: page.before().map(|b| b.tx_sequence_number), + cursor_lo_exclusive, + cursor_hi, scan_limit, end: page.end(), })) From 8e94e48e26a2426d8f90c84e13b865ce993b52d2 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 15:41:41 -0700 Subject: [PATCH 6/8] clippy --- crates/sui-graphql-rpc/src/server/builder.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 32f8c6feeef34..83dfec8f1a0b4 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -730,9 +730,11 @@ pub mod tests { let pg_conn_pool = PgManager::new(reader); let cancellation_token = CancellationToken::new(); let watermark = Watermark { - checkpoint: 1, - checkpoint_timestamp_ms: 1, + hi_cp: 1, + hi_cp_timestamp_ms: 1, epoch: 0, + lo_cp: 0, + lo_tx: 0, }; let state = AppState::new( connection_config.clone(), From 4ddfb4d3a6f3f9bcb950fde9e6242ccb21c83ffa Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 6 Sep 2024 15:58:27 -0700 Subject: [PATCH 7/8] please clippy --- crates/sui-graphql-rpc/src/server/watermark_task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index 665e48f2c6552..f9225af78b5f5 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -172,6 +172,7 @@ impl Watermark { } } + #[allow(clippy::type_complexity)] pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; let Some(result): Option)>> = db From 7e72b73757962024e9ef1485c26cceb4039bcdaa Mon Sep 17 00:00:00 2001 From: Will Yang Date: Thu, 3 Oct 2024 10:07:51 -0700 Subject: [PATCH 8/8] address comments. makes me think ... i suppose even though the watermark tracks lo and hi, because lo is not static - and won't be until we introduce watermarks table, we actually have to keep fetching the min cp no? --- .../src/server/watermark_task.rs | 34 +++++++++++-------- .../src/types/transaction_block/mod.rs | 27 ++++----------- .../src/types/transaction_block/tx_lookups.rs | 20 +++++------ 3 files changed, 36 insertions(+), 45 deletions(-) diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index f9225af78b5f5..949fe0c94bb65 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -89,7 +89,7 @@ impl WatermarkTask { return; }, _ = interval.tick() => { - let Watermark {lo_cp, lo_tx, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { + let Watermark { lo_cp, lo_tx, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -215,24 +215,30 @@ impl Watermark { return Ok(None); }; - if result.len() != 2 { + let (lo_cp, lo_tx) = if let Some((cp, _, _, Some(tx))) = result.first() { + (cp, tx) + } else { return Err(Error::Internal( - "Expected 2 rows from watermark query".to_string(), - )); - } - - let Some(lo_tx) = result[0].3 else { - return Err(Error::Internal( - "Expected min_tx_sequence_number to be non-null".to_string(), + "Expected entry for tx lower bound and min_tx_sequence_number to be non-null" + .to_string(), )); }; + let (hi_cp, hi_cp_timestamp_ms, epoch) = + if let Some((cp, timestamp_ms, epoch, _)) = result.last() { + (cp, timestamp_ms, epoch) + } else { + return Err(Error::Internal( + "Expected entry for tx upper bound".to_string(), + )); + }; + Ok(Some(Watermark { - hi_cp: result[1].0 as u64, - hi_cp_timestamp_ms: result[1].1 as u64, - epoch: result[1].2 as u64, - lo_cp: result[0].0 as u64, - lo_tx: lo_tx as u64, + hi_cp: *hi_cp as u64, + hi_cp_timestamp_ms: *hi_cp_timestamp_ms as u64, + epoch: *epoch as u64, + lo_cp: *lo_cp as u64, + lo_tx: *lo_tx as u64, })) } } diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs index 70f6b56c0cf6e..b12236fe5e60c 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs @@ -282,6 +282,8 @@ impl TransactionBlock { /// `function` should provide a value for `scan_limit`. This modifies querying behavior by /// limiting how many transactions to scan through before applying filters, and also affects /// pagination behavior. + /// + /// Queries for data that have been pruned pub(crate) async fn paginate( ctx: &Context<'_>, page: Page, @@ -290,6 +292,10 @@ impl TransactionBlock { scan_limit: Option, ) -> Result, Error> { let limits = &ctx.data_unchecked::().limits; + let db: &Db = ctx.data_unchecked(); + // If we've entered this function, we already fetched `checkpoint_viewed_at` from the + // `Watermark`, and so we must be able to retrieve `lo_cp` as well. + let Watermark { lo_cp, .. } = *ctx.data_unchecked(); // If the caller has provided some arbitrary combination of `function`, `kind`, // `recvAddress`, `inputObject`, or `changedObject`, we require setting a `scanLimit`. @@ -317,32 +323,11 @@ impl TransactionBlock { let cursor_viewed_at = page.validate_cursor_consistency()?; let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); - let db: &Db = ctx.data_unchecked(); let is_from_front = page.is_from_front(); - // If we've entered this function, we already fetched `checkpoint_viewed_at` from the - // `Watermark`, and so we must be able to retrieve `lo_cp` as well. - let Watermark { lo_cp, lo_tx, .. } = *ctx.data_unchecked(); let cp_after = filter.after_checkpoint.map(u64::from); let cp_at = filter.at_checkpoint.map(u64::from); let cp_before = filter.before_checkpoint.map(u64::from); - // Check that requested data is not already pruned. This should catch most cases, except on - // epoch boundary where a request is made just before the epoch pruning process is kicked - // off. - if checkpoint_viewed_at < lo_cp - || cp_after.map_or(false, |cp| cp < lo_cp) - || cp_at.map_or(false, |cp| cp < lo_cp) - || cp_before.map_or(false, |cp| cp < lo_cp) - || page.after().map_or(false, |c| c.tx_sequence_number < lo_tx) - || page - .before() - .map_or(false, |c| c.tx_sequence_number < lo_tx) - { - return Err(Error::Client( - "Requested data is pruned and no longer available".to_string(), - )); - } - // If page size or scan limit is 0, we want to standardize behavior by returning an empty // connection if filter.is_empty() || page.limit() == 0 || scan_limit.is_some_and(|v| v == 0) { diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 9edd653507723..9ceb3c436cff8 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -222,18 +222,18 @@ impl TxBounds { }; let cursor_lo_exclusive = page.after().map(|a| a.tx_sequence_number); + let cursor_lo = cursor_lo_exclusive.map(|a| a.saturating_add(1)); let cursor_hi = page.before().map(|b| b.tx_sequence_number); - if !match (cursor_lo_exclusive.map(|a| a.saturating_add(1)), cursor_hi) { - (Some(a), Some(b)) => tx_lo < a && a < b && b <= tx_hi, - // a must be strictly less than tx_hi - (Some(a), None) => tx_lo < a && a < tx_hi, - // b can be equal to tx_hi because both are exclusive - (None, Some(b)) => tx_lo < b && b <= tx_hi, - // tx_lo must be strictly less than tx_hi for the right-open interval - (None, None) => tx_lo < tx_hi, - } { - return Ok(None); + match (cursor_lo, cursor_hi) { + (Some(lo), _) if tx_hi <= lo => return Ok(None), + (_, Some(hi)) if hi <= tx_lo => return Ok(None), + (Some(lo), Some(hi)) if hi <= lo => return Ok(None), + _ => { + if tx_hi <= tx_lo { + return Ok(None); + } + } } Ok(Some(Self {