From 99912955fd8840506f8452709eb65e33ac94d3ac Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 10 Sep 2024 22:45:19 -0700 Subject: [PATCH 1/2] [Indexer] SQL Backfill command --- crates/sui-indexer/src/config.rs | 16 +++++++ crates/sui-indexer/src/lib.rs | 4 +- crates/sui-indexer/src/main.rs | 23 ++++++++-- crates/sui-indexer/src/sql_backfill.rs | 61 ++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 crates/sui-indexer/src/sql_backfill.rs diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index d22ff0f9e2205..d96c719ea10b8 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -162,6 +162,22 @@ pub enum Command { }, /// Run through the migration scripts. RunMigrations, + /// Backfill DB tables for checkpoint range [\first_checkpoint, \last_checkpoint]. + /// by running a SQL query provided in \sql. + /// The tool will automatically slice it into smaller checkpoint ranges and for each range [start, end], + /// it augments the \sql query with: + /// "WHERE {checkpoint_column_name} BETWEEN {start} AND {end}" + /// to avoid running out of memory. + /// Example: + /// ./sui-indexer --database-url <...> sql-back-fill + /// "INSERT INTO full_objects_history (object_id, object_version, serialized_object) SELECT object_id, object_version, serialized_object FROM objects_history" + /// "checkpoint_sequence_number" 0 100000 + SqlBackFill { + sql: String, + checkpoint_column_name: String, + first_checkpoint: u64, + last_checkpoint: u64, + }, } #[derive(Args, Default, Debug, Clone)] diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index 5f058595e4eb5..9a125f35e8c0e 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -26,6 +26,7 @@ use crate::indexer_reader::IndexerReader; use errors::IndexerError; pub mod apis; +pub mod config; pub mod database; pub mod db; pub mod errors; @@ -35,14 +36,13 @@ pub mod indexer_reader; pub mod metrics; pub mod models; pub mod schema; +pub mod sql_backfill; pub mod store; pub mod system_package_task; pub mod tempdb; pub mod test_utils; pub mod types; -pub mod config; - pub async fn build_json_rpc_server( prometheus_registry: &Registry, reader: IndexerReader, diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 523e91c28e8a1..1c1523efbbb31 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -6,13 +6,13 @@ use sui_indexer::config::Command; use sui_indexer::database::ConnectionPool; use sui_indexer::db::{check_db_migration_consistency, reset_database, run_migrations}; use sui_indexer::indexer::Indexer; -use sui_indexer::store::PgIndexerStore; -use tokio_util::sync::CancellationToken; -use tracing::warn; - use sui_indexer::metrics::{ spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics, }; +use sui_indexer::sql_backfill::run_sql_backfill; +use sui_indexer::store::PgIndexerStore; +use tokio_util::sync::CancellationToken; +use tracing::warn; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -74,6 +74,21 @@ async fn main() -> anyhow::Result<()> { Command::RunMigrations => { run_migrations(pool.dedicated_connection().await?).await?; } + Command::SqlBackFill { + sql, + checkpoint_column_name, + first_checkpoint, + last_checkpoint, + } => { + run_sql_backfill( + &sql, + &checkpoint_column_name, + first_checkpoint, + last_checkpoint, + pool, + ) + .await; + } } Ok(()) diff --git a/crates/sui-indexer/src/sql_backfill.rs b/crates/sui-indexer/src/sql_backfill.rs new file mode 100644 index 0000000000000..fa57b56f9574a --- /dev/null +++ b/crates/sui-indexer/src/sql_backfill.rs @@ -0,0 +1,61 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::database::ConnectionPool; +use diesel_async::RunQueryDsl; +use futures::{stream, StreamExt}; +use std::time::Duration; +use std::time::Instant; + +const CHUNK_SIZE: u64 = 10000; +const MAX_CONCURRENCY: usize = 100; +const SLEEP_BETWEEN_BATCHES_MS: u64 = 100; + +pub async fn run_sql_backfill( + sql: &str, + checkpoint_column_name: &str, + first_checkpoint: u64, + last_checkpoint: u64, + pool: ConnectionPool, +) { + let cur_time = Instant::now(); + let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint) + .step_by(CHUNK_SIZE as usize) + .map(|chunk_start| { + let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint); + (chunk_start, chunk_end) + }) + .collect(); + + stream::iter(chunks) + .for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| { + let pool_clone = pool.clone(); // Clone the pool for async operation + async move { + // Run the copy in a batch and add a delay + backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone) + .await; + println!("Finished checkpoint range: {} - {}", start_id, end_id); + tokio::time::sleep(Duration::from_millis(SLEEP_BETWEEN_BATCHES_MS)).await; + } + }) + .await; + println!("Finished backfilling in {:?}", cur_time.elapsed()); +} + +async fn backfill_data_batch( + sql: &str, + checkpoint_column_name: &str, + first_checkpoint: u64, + last_checkpoint: u64, + pool: ConnectionPool, +) { + let mut conn = pool.get().await.unwrap(); + + let query = format!( + "{} WHERE {} BETWEEN {} AND {}", + sql, checkpoint_column_name, first_checkpoint, last_checkpoint + ); + + // Execute the SQL query using Diesel's async connection + diesel::sql_query(query).execute(&mut conn).await.unwrap(); +} From 422f352fb24433f6f252be1bc1b343e2beceb468 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 16 Sep 2024 12:10:15 -0700 Subject: [PATCH 2/2] feedback --- crates/sui-indexer/src/sql_backfill.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/sui-indexer/src/sql_backfill.rs b/crates/sui-indexer/src/sql_backfill.rs index fa57b56f9574a..a594e19d2be91 100644 --- a/crates/sui-indexer/src/sql_backfill.rs +++ b/crates/sui-indexer/src/sql_backfill.rs @@ -4,12 +4,10 @@ use crate::database::ConnectionPool; use diesel_async::RunQueryDsl; use futures::{stream, StreamExt}; -use std::time::Duration; use std::time::Instant; const CHUNK_SIZE: u64 = 10000; const MAX_CONCURRENCY: usize = 100; -const SLEEP_BETWEEN_BATCHES_MS: u64 = 100; pub async fn run_sql_backfill( sql: &str, @@ -35,7 +33,6 @@ pub async fn run_sql_backfill( backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone) .await; println!("Finished checkpoint range: {} - {}", start_id, end_id); - tokio::time::sleep(Duration::from_millis(SLEEP_BETWEEN_BATCHES_MS)).await; } }) .await; @@ -57,5 +54,6 @@ async fn backfill_data_batch( ); // Execute the SQL query using Diesel's async connection + // TODO: Add retry support. diesel::sql_query(query).execute(&mut conn).await.unwrap(); }