Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ pub enum Command {
},
/// Run through the migration scripts.
RunMigrations,
/// Backfill DB tables for checkpoint range [\first_checkpoint, \last_checkpoint].
/// by running a SQL query provided in \sql.
/// The tool will automatically slice it into smaller checkpoint ranges and for each range [start, end],
/// it augments the \sql query with:
/// "WHERE {checkpoint_column_name} BETWEEN {start} AND {end}"
/// to avoid running out of memory.
/// Example:
/// ./sui-indexer --database-url <...> sql-back-fill
/// "INSERT INTO full_objects_history (object_id, object_version, serialized_object) SELECT object_id, object_version, serialized_object FROM objects_history"
/// "checkpoint_sequence_number" 0 100000
SqlBackFill {
sql: String,
checkpoint_column_name: String,
first_checkpoint: u64,
last_checkpoint: u64,
},
}

#[derive(Args, Default, Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::indexer_reader::IndexerReader;
use errors::IndexerError;

pub mod apis;
pub mod config;
pub mod database;
pub mod db;
pub mod errors;
Expand All @@ -35,14 +36,13 @@ pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod schema;
pub mod sql_backfill;
pub mod store;
pub mod system_package_task;
pub mod tempdb;
pub mod test_utils;
pub mod types;

pub mod config;

pub async fn build_json_rpc_server(
prometheus_registry: &Registry,
reader: IndexerReader,
Expand Down
23 changes: 19 additions & 4 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use sui_indexer::config::Command;
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::{check_db_migration_consistency, reset_database, run_migrations};
use sui_indexer::indexer::Indexer;
use sui_indexer::store::PgIndexerStore;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use sui_indexer::metrics::{
spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics,
};
use sui_indexer::sql_backfill::run_sql_backfill;
use sui_indexer::store::PgIndexerStore;
use tokio_util::sync::CancellationToken;
use tracing::warn;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -74,6 +74,21 @@ async fn main() -> anyhow::Result<()> {
Command::RunMigrations => {
run_migrations(pool.dedicated_connection().await?).await?;
}
Command::SqlBackFill {
sql,
checkpoint_column_name,
first_checkpoint,
last_checkpoint,
} => {
run_sql_backfill(
&sql,
&checkpoint_column_name,
first_checkpoint,
last_checkpoint,
pool,
)
.await;
}
}

Ok(())
Expand Down
61 changes: 61 additions & 0 deletions crates/sui-indexer/src/sql_backfill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::database::ConnectionPool;
use diesel_async::RunQueryDsl;
use futures::{stream, StreamExt};
use std::time::Duration;
use std::time::Instant;

const CHUNK_SIZE: u64 = 10000;
const MAX_CONCURRENCY: usize = 100;
const SLEEP_BETWEEN_BATCHES_MS: u64 = 100;

pub async fn run_sql_backfill(
sql: &str,
checkpoint_column_name: &str,
first_checkpoint: u64,
last_checkpoint: u64,
pool: ConnectionPool,
) {
let cur_time = Instant::now();
let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint)
.step_by(CHUNK_SIZE as usize)
.map(|chunk_start| {
let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint);
(chunk_start, chunk_end)
})
.collect();

stream::iter(chunks)
.for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| {
let pool_clone = pool.clone(); // Clone the pool for async operation
async move {
// Run the copy in a batch and add a delay
backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone)
.await;
println!("Finished checkpoint range: {} - {}", start_id, end_id);
tokio::time::sleep(Duration::from_millis(SLEEP_BETWEEN_BATCHES_MS)).await;
}
})
.await;
println!("Finished backfilling in {:?}", cur_time.elapsed());
}

async fn backfill_data_batch(
sql: &str,
checkpoint_column_name: &str,
first_checkpoint: u64,
last_checkpoint: u64,
pool: ConnectionPool,
) {
let mut conn = pool.get().await.unwrap();

let query = format!(
"{} WHERE {} BETWEEN {} AND {}",
sql, checkpoint_column_name, first_checkpoint, last_checkpoint
);

// Execute the SQL query using Diesel's async connection
diesel::sql_query(query).execute(&mut conn).await.unwrap();
}
Loading