Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use crate::handlers::ev_emit_mod::EvEmitMod;
use crate::handlers::ev_struct_inst::EvStructInst;
use crate::handlers::kv_checkpoints::KvCheckpoints;
use crate::handlers::kv_objects::KvObjects;
use crate::handlers::kv_transactions::KvTransactions;
use crate::handlers::tx_affected_objects::TxAffectedObjects;
use crate::handlers::tx_balance_changes::TxBalanceChanges;
use crate::pipeline::PipelineName;
use anyhow::{Context, Result};
use db::{Db, DbConfig};
use handlers::Handler;
use ingestion::{IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use models::watermarks::CommitterWatermark;
use pipeline::{concurrent, PipelineConfig};
use std::{net::SocketAddr, sync::Arc};
use task::graceful_shutdown;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -47,9 +54,6 @@ pub struct Indexer {
/// Optional override of the checkpoint upperbound.
last_checkpoint: Option<u64>,

/// Optional override of enabled pipelines.
enabled_pipelines: BTreeSet<String>,

/// Cancellation token shared among all continuous tasks in the service.
cancel: CancellationToken,

Expand Down Expand Up @@ -84,11 +88,6 @@ pub struct IndexerConfig {
#[arg(long)]
last_checkpoint: Option<u64>,

/// Only run the following pipelines -- useful for backfills. If not provided, all pipelines
/// will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,

/// Address to serve Prometheus Metrics from.
#[arg(long, default_value = "0.0.0.0:9184")]
pub metrics_address: SocketAddr,
Expand All @@ -102,7 +101,6 @@ impl Indexer {
pipeline_config,
first_checkpoint,
last_checkpoint,
pipeline,
metrics_address,
} = config;

Expand All @@ -115,29 +113,43 @@ impl Indexer {
let ingestion_service =
IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?;

Ok(Self {
let mut indexer = Self {
db,
metrics,
metrics_service,
ingestion_service,
pipeline_config,
first_checkpoint,
last_checkpoint,
enabled_pipelines: pipeline.into_iter().collect(),
cancel,
first_checkpoint_from_watermark: u64::MAX,
handles: vec![],
})
};
indexer.register_pipeline().await?;
Ok(indexer)
}

async fn register_pipeline(&mut self) -> Result<()> {
match self.pipeline_config.pipeline {
PipelineName::EvEmitMod => self.concurrent_pipeline::<EvEmitMod>().await?,
PipelineName::EvStructInst => self.concurrent_pipeline::<EvStructInst>().await?,
PipelineName::KvCheckpoints => self.concurrent_pipeline::<KvCheckpoints>().await?,
PipelineName::KvObjects => self.concurrent_pipeline::<KvObjects>().await?,
PipelineName::KvTransactions => self.concurrent_pipeline::<KvTransactions>().await?,
PipelineName::TxAffectedObjects => {
self.concurrent_pipeline::<TxAffectedObjects>().await?
}
PipelineName::TxBalanceChanges => {
self.concurrent_pipeline::<TxBalanceChanges>().await?
}
}

Ok(())
}

/// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
/// they will be idle until the ingestion service starts, and serves it checkpoint data.
pub async fn concurrent_pipeline<H: Handler + 'static>(&mut self) -> Result<()> {
if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(H::NAME) {
info!("Skipping pipeline {}", H::NAME);
return Ok(());
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, H::NAME)
Expand Down
20 changes: 2 additions & 18 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,7 @@

use anyhow::{Context, Result};
use clap::Parser;
use sui_indexer_alt::{
args::Args,
handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_objects::KvObjects, kv_transactions::KvTransactions,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
},
Indexer,
};
use sui_indexer_alt::{args::Args, Indexer};
use tokio_util::sync::CancellationToken;

#[tokio::main]
Expand All @@ -25,15 +17,7 @@ async fn main() -> Result<()> {

let cancel = CancellationToken::new();

let mut indexer = Indexer::new(args.indexer_config, cancel.clone()).await?;

indexer.concurrent_pipeline::<EvEmitMod>().await?;
indexer.concurrent_pipeline::<EvStructInst>().await?;
indexer.concurrent_pipeline::<KvCheckpoints>().await?;
indexer.concurrent_pipeline::<KvObjects>().await?;
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
let indexer = Indexer::new(args.indexer_config, cancel.clone()).await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

Expand Down
17 changes: 15 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use crate::{handlers::Handler, models::watermarks::CommitterWatermark};
use std::time::Duration;

pub mod concurrent;
mod processor;
Expand All @@ -14,6 +13,8 @@ const COMMITTER_BUFFER: usize = 5;

#[derive(clap::Args, Debug, Clone)]
pub struct PipelineConfig {
#[command(subcommand)]
pub pipeline: PipelineName,
/// Committer will check for pending data at least this often
#[arg(
long,
Expand All @@ -37,6 +38,18 @@ pub struct PipelineConfig {
skip_watermark: bool,
}

/// Each enum variant can also take pipeline-specific configuration.
#[derive(Debug, Clone, clap::Subcommand)]
pub enum PipelineName {
EvEmitMod,
EvStructInst,
KvCheckpoints,
KvObjects,
KvTransactions,
TxAffectedObjects,
TxBalanceChanges,
}

/// A batch of processed values associated with a single checkpoint. This is an internal type used
/// to communicate between the handler and the committer parts of the pipeline.
struct Indexed<H: Handler> {
Expand Down
Loading