diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index d558391c0c4e4..8556501733249 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -1,8 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; - +use crate::handlers::ev_emit_mod::EvEmitMod; +use crate::handlers::ev_struct_inst::EvStructInst; +use crate::handlers::kv_checkpoints::KvCheckpoints; +use crate::handlers::kv_objects::KvObjects; +use crate::handlers::kv_transactions::KvTransactions; +use crate::handlers::tx_affected_objects::TxAffectedObjects; +use crate::handlers::tx_balance_changes::TxBalanceChanges; +use crate::pipeline::PipelineName; use anyhow::{Context, Result}; use db::{Db, DbConfig}; use handlers::Handler; @@ -10,6 +16,7 @@ use ingestion::{IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; use models::watermarks::CommitterWatermark; use pipeline::{concurrent, PipelineConfig}; +use std::{net::SocketAddr, sync::Arc}; use task::graceful_shutdown; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -47,9 +54,6 @@ pub struct Indexer { /// Optional override of the checkpoint upperbound. last_checkpoint: Option, - /// Optional override of enabled pipelines. - enabled_pipelines: BTreeSet, - /// Cancellation token shared among all continuous tasks in the service. cancel: CancellationToken, @@ -84,11 +88,6 @@ pub struct IndexerConfig { #[arg(long)] last_checkpoint: Option, - /// Only run the following pipelines -- useful for backfills. If not provided, all pipelines - /// will be run. - #[arg(long, action = clap::ArgAction::Append)] - pipeline: Vec, - /// Address to serve Prometheus Metrics from. #[arg(long, default_value = "0.0.0.0:9184")] pub metrics_address: SocketAddr, @@ -102,7 +101,6 @@ impl Indexer { pipeline_config, first_checkpoint, last_checkpoint, - pipeline, metrics_address, } = config; @@ -115,7 +113,7 @@ impl Indexer { let ingestion_service = IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?; - Ok(Self { + let mut indexer = Self { db, metrics, metrics_service, @@ -123,21 +121,35 @@ impl Indexer { pipeline_config, first_checkpoint, last_checkpoint, - enabled_pipelines: pipeline.into_iter().collect(), cancel, first_checkpoint_from_watermark: u64::MAX, handles: vec![], - }) + }; + indexer.register_pipeline().await?; + Ok(indexer) + } + + async fn register_pipeline(&mut self) -> Result<()> { + match self.pipeline_config.pipeline { + PipelineName::EvEmitMod => self.concurrent_pipeline::().await?, + PipelineName::EvStructInst => self.concurrent_pipeline::().await?, + PipelineName::KvCheckpoints => self.concurrent_pipeline::().await?, + PipelineName::KvObjects => self.concurrent_pipeline::().await?, + PipelineName::KvTransactions => self.concurrent_pipeline::().await?, + PipelineName::TxAffectedObjects => { + self.concurrent_pipeline::().await? + } + PipelineName::TxBalanceChanges => { + self.concurrent_pipeline::().await? + } + } + + Ok(()) } /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, /// they will be idle until the ingestion service starts, and serves it checkpoint data. pub async fn concurrent_pipeline(&mut self) -> Result<()> { - if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(H::NAME) { - info!("Skipping pipeline {}", H::NAME); - return Ok(()); - } - let mut conn = self.db.connect().await.context("Failed DB connection")?; let watermark = CommitterWatermark::get(&mut conn, H::NAME) diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 77d90d3fe0cb3..cc8026c3a3548 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -3,15 +3,7 @@ use anyhow::{Context, Result}; use clap::Parser; -use sui_indexer_alt::{ - args::Args, - handlers::{ - ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, - kv_objects::KvObjects, kv_transactions::KvTransactions, - tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, - }, - Indexer, -}; +use sui_indexer_alt::{args::Args, Indexer}; use tokio_util::sync::CancellationToken; #[tokio::main] @@ -25,15 +17,7 @@ async fn main() -> Result<()> { let cancel = CancellationToken::new(); - let mut indexer = Indexer::new(args.indexer_config, cancel.clone()).await?; - - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; + let indexer = Indexer::new(args.indexer_config, cancel.clone()).await?; let h_indexer = indexer.run().await.context("Failed to start indexer")?; diff --git a/crates/sui-indexer-alt/src/pipeline/mod.rs b/crates/sui-indexer-alt/src/pipeline/mod.rs index 12893179676da..174b55eef6447 100644 --- a/crates/sui-indexer-alt/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/mod.rs @@ -1,9 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; - use crate::{handlers::Handler, models::watermarks::CommitterWatermark}; +use std::time::Duration; pub mod concurrent; mod processor; @@ -14,6 +13,8 @@ const COMMITTER_BUFFER: usize = 5; #[derive(clap::Args, Debug, Clone)] pub struct PipelineConfig { + #[command(subcommand)] + pub pipeline: PipelineName, /// Committer will check for pending data at least this often #[arg( long, @@ -37,6 +38,18 @@ pub struct PipelineConfig { skip_watermark: bool, } +/// Each enum variant can also take pipeline-specific configuration. +#[derive(Debug, Clone, clap::Subcommand)] +pub enum PipelineName { + EvEmitMod, + EvStructInst, + KvCheckpoints, + KvObjects, + KvTransactions, + TxAffectedObjects, + TxBalanceChanges, +} + /// A batch of processed values associated with a single checkpoint. This is an internal type used /// to communicate between the handler and the committer parts of the pipeline. struct Indexed {