diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index d22ff0f9e2205..d96c719ea10b8 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -162,6 +162,22 @@ pub enum Command { }, /// Run through the migration scripts. RunMigrations, + /// Backfill DB tables for checkpoint range [\first_checkpoint, \last_checkpoint]. + /// by running a SQL query provided in \sql. + /// The tool will automatically slice it into smaller checkpoint ranges and for each range [start, end], + /// it augments the \sql query with: + /// "WHERE {checkpoint_column_name} BETWEEN {start} AND {end}" + /// to avoid running out of memory. + /// Example: + /// ./sui-indexer --database-url <...> sql-back-fill + /// "INSERT INTO full_objects_history (object_id, object_version, serialized_object) SELECT object_id, object_version, serialized_object FROM objects_history" + /// "checkpoint_sequence_number" 0 100000 + SqlBackFill { + sql: String, + checkpoint_column_name: String, + first_checkpoint: u64, + last_checkpoint: u64, + }, } #[derive(Args, Default, Debug, Clone)] diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index 5f058595e4eb5..9a125f35e8c0e 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -26,6 +26,7 @@ use crate::indexer_reader::IndexerReader; use errors::IndexerError; pub mod apis; +pub mod config; pub mod database; pub mod db; pub mod errors; @@ -35,14 +36,13 @@ pub mod indexer_reader; pub mod metrics; pub mod models; pub mod schema; +pub mod sql_backfill; pub mod store; pub mod system_package_task; pub mod tempdb; pub mod test_utils; pub mod types; -pub mod config; - pub async fn build_json_rpc_server( prometheus_registry: &Registry, reader: IndexerReader, diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 523e91c28e8a1..1c1523efbbb31 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -6,13 +6,13 @@ use sui_indexer::config::Command; use sui_indexer::database::ConnectionPool; use sui_indexer::db::{check_db_migration_consistency, reset_database, run_migrations}; use sui_indexer::indexer::Indexer; -use sui_indexer::store::PgIndexerStore; -use tokio_util::sync::CancellationToken; -use tracing::warn; - use sui_indexer::metrics::{ spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics, }; +use sui_indexer::sql_backfill::run_sql_backfill; +use sui_indexer::store::PgIndexerStore; +use tokio_util::sync::CancellationToken; +use tracing::warn; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -74,6 +74,21 @@ async fn main() -> anyhow::Result<()> { Command::RunMigrations => { run_migrations(pool.dedicated_connection().await?).await?; } + Command::SqlBackFill { + sql, + checkpoint_column_name, + first_checkpoint, + last_checkpoint, + } => { + run_sql_backfill( + &sql, + &checkpoint_column_name, + first_checkpoint, + last_checkpoint, + pool, + ) + .await; + } } Ok(()) diff --git a/crates/sui-indexer/src/sql_backfill.rs b/crates/sui-indexer/src/sql_backfill.rs new file mode 100644 index 0000000000000..a594e19d2be91 --- /dev/null +++ b/crates/sui-indexer/src/sql_backfill.rs @@ -0,0 +1,59 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::database::ConnectionPool; +use diesel_async::RunQueryDsl; +use futures::{stream, StreamExt}; +use std::time::Instant; + +const CHUNK_SIZE: u64 = 10000; +const MAX_CONCURRENCY: usize = 100; + +pub async fn run_sql_backfill( + sql: &str, + checkpoint_column_name: &str, + first_checkpoint: u64, + last_checkpoint: u64, + pool: ConnectionPool, +) { + let cur_time = Instant::now(); + let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint) + .step_by(CHUNK_SIZE as usize) + .map(|chunk_start| { + let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint); + (chunk_start, chunk_end) + }) + .collect(); + + stream::iter(chunks) + .for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| { + let pool_clone = pool.clone(); // Clone the pool for async operation + async move { + // Run the copy in a batch and add a delay + backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone) + .await; + println!("Finished checkpoint range: {} - {}", start_id, end_id); + } + }) + .await; + println!("Finished backfilling in {:?}", cur_time.elapsed()); +} + +async fn backfill_data_batch( + sql: &str, + checkpoint_column_name: &str, + first_checkpoint: u64, + last_checkpoint: u64, + pool: ConnectionPool, +) { + let mut conn = pool.get().await.unwrap(); + + let query = format!( + "{} WHERE {} BETWEEN {} AND {}", + sql, checkpoint_column_name, first_checkpoint, last_checkpoint + ); + + // Execute the SQL query using Diesel's async connection + // TODO: Add retry support. + diesel::sql_query(query).execute(&mut conn).await.unwrap(); +}