Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeSet, sync::Arc};

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
Expand Down Expand Up @@ -57,4 +60,21 @@ impl Handler for EvEmitMod {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why - 1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The to_tx is the first tx of the to_exclusive checkpoint and should not be pruned


Ok(diesel::delete(filter).execute(conn).await?)
}
}
25 changes: 23 additions & 2 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, sync::Arc};
use std::{collections::BTreeSet, ops::Range, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -60,4 +64,21 @@ impl Handler for EvStructInst {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
13 changes: 13 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
Expand Down Expand Up @@ -38,4 +39,16 @@ impl Handler for KvCheckpoints {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
26 changes: 25 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -125,4 +130,23 @@ impl Handler for KvEpochEnds {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check strictly necessary? If they were equal, the between would contain conflicting constraints and nothing would be captured by it, right?

The reason I'm asking is that it piqued my interest that this impl had this check, but the previous ones didn't. If there was a correctness reason to have this check (i.e. that we will get some sort of error by having the bounds passed to BETWEEN inverted), then we are relying on that situation not coming up in practice because we produce checkpoints (and therefore transactions) more often than we prune -- that makes me nervous because we could definitely end up falling foul of this in test scenarios.

If there is not a correctness reason to have this test, then the only other reason I could think of was performance, but this is one of the cheapest tables we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Postgres has an issue with BETWEEN being inverted, but this was just something explicit to handle a possibility unique to epoch. Given [from, to) checkpoints, they may both fall under the same epoch. Conceptually we don't want to prune that epoch

pruning from chkpt 0 to 1
from_epoch: 0, to_epoch: 0
skipping because from_epoch >= to_epoch
pruning from chkpt 1 to 2
from_epoch: 0, to_epoch: 1
pruning from 0 to 0
pruning from chkpt 2 to 3
from_epoch: 1, to_epoch: 1
skipping because from_epoch >= to_epoch
pruning from chkpt 3 to 4
from_epoch: 1, to_epoch: 1
skipping because from_epoch >= to_epoch

If you do run a query ... delete between 0 and 0 - 1 Postgres returns no results from execution

let filter = kv_epoch_ends::table
.filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1));
Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
27 changes: 26 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -72,4 +77,24 @@ impl Handler for KvEpochStarts {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
16 changes: 16 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
Expand Down Expand Up @@ -66,4 +67,19 @@ impl Handler for KvTransactions {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, let's keep this table as it is. Its schema needs to match the schema and indices need to match what is offered in the KV store as much as possible.

let filter = kv_transactions::table.filter(
kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -69,4 +74,21 @@ impl Handler for TxAffectedAddresses {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_addresses::table.filter(
tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -59,4 +64,21 @@ impl Handler for TxAffectedObjects {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_objects::table.filter(
tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -65,6 +70,23 @@ impl Handler for TxBalanceChanges {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_balance_changes::table.filter(
tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

/// Calculate balance changes based on the object's input and output objects.
Expand Down
23 changes: 22 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,4 +67,20 @@ impl Handler for TxCalls {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
Loading
Loading