diff --git a/Cargo.lock b/Cargo.lock index 1849fc77f020f..bdec902bd473c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13642,6 +13642,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "futures", "mysten-metrics", "prometheus", "sui-data-ingestion-core", diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index b4c4254586885..7b00e3eeaa877 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use ethers::prelude::Transaction; use ethers::providers::{Http, Middleware, Provider, StreamExt, Ws}; use ethers::types::{Address as EthAddress, Block, Filter, H256}; +use prometheus::{IntCounterVec, IntGaugeVec}; use sui_bridge::error::BridgeError; use sui_bridge::eth_client::EthClient; use sui_bridge::metered_eth_provider::MeteredEthHttpProvier; @@ -145,6 +146,18 @@ impl Datasource for EthSubscriptionDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_block } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } pub struct EthSyncDatasource { @@ -267,6 +280,18 @@ impl Datasource for EthSyncDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_block } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } #[derive(Clone)] diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index afd4b461303f8..787bd3853f74f 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -19,6 +19,7 @@ pub mod types; pub mod eth_bridge_indexer; pub mod sui_bridge_indexer; +pub mod sui_datasource; #[derive(Clone)] pub enum ProcessedTxnData { diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index a58d5fa10d377..d0a2e94f4887f 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -25,12 +25,12 @@ use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper; use sui_bridge_indexer::metrics::BridgeIndexerMetrics; use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store}; use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper}; +use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource; use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop; use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task; use sui_config::Config; use sui_data_ingestion_core::DataIngestionMetrics; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; -use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource; use sui_sdk::SuiClientBuilder; #[derive(Parser, Clone, Debug)] @@ -137,6 +137,7 @@ async fn main() -> Result<()> { config.checkpoints_path.clone().into(), config.sui_bridge_genesis_checkpoint, ingestion_metrics.clone(), + indexer_meterics.clone(), ); let indexer = IndexerBuilder::new( "SuiBridgeIndexer", diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index b57a3121f4ae3..9ea85514e20d3 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use prometheus::{ - register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge, - Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, Registry, }; #[derive(Clone, Debug)] @@ -20,83 +21,107 @@ pub struct BridgeIndexerMetrics { pub(crate) last_committed_sui_checkpoint: IntGauge, pub(crate) latest_committed_eth_block: IntGauge, pub(crate) last_synced_eth_block: IntGauge, + pub(crate) tasks_remaining_checkpoints: IntGaugeVec, + pub(crate) tasks_processed_checkpoints: IntCounterVec, + pub(crate) live_task_current_checkpoint: IntGaugeVec, } impl BridgeIndexerMetrics { pub fn new(registry: &Registry) -> Self { Self { total_sui_bridge_transactions: register_int_counter_with_registry!( - "total_sui_bridge_transactions", + "bridge_indexer_total_sui_bridge_transactions", "Total number of sui bridge transactions", registry, ) .unwrap(), total_sui_token_deposited: register_int_counter_with_registry!( - "total_sui_token_deposited", + "bridge_indexer_total_sui_token_deposited", "Total number of sui token deposited transactions", registry, ) .unwrap(), total_sui_token_transfer_approved: register_int_counter_with_registry!( - "total_sui_token_transfer_approved", + "bridge_indexer_total_sui_token_transfer_approved", "Total number of sui token approved transactions", registry, ) .unwrap(), total_sui_token_transfer_claimed: register_int_counter_with_registry!( - "total_sui_token_transfer_claimed", + "bridge_indexer_total_sui_token_transfer_claimed", "Total number of sui token claimed transactions", registry, ) .unwrap(), total_sui_bridge_txn_other: register_int_counter_with_registry!( - "total_sui_bridge_txn_other", + "bridge_indexer_total_sui_bridge_txn_other", "Total number of other sui bridge transactions", registry, ) .unwrap(), total_eth_bridge_transactions: register_int_counter_with_registry!( - "total_eth_bridge_transactions", + "bridge_indexer_total_eth_bridge_transactions", "Total number of eth bridge transactions", registry, ) .unwrap(), total_eth_token_deposited: register_int_counter_with_registry!( - "total_eth_token_deposited", + "bridge_indexer_total_eth_token_deposited", "Total number of eth token deposited transactions", registry, ) .unwrap(), total_eth_token_transfer_claimed: register_int_counter_with_registry!( - "total_eth_token_transfer_claimed", + "bridge_indexer_total_eth_token_transfer_claimed", "Total number of eth token claimed transactions", registry, ) .unwrap(), total_eth_bridge_txn_other: register_int_counter_with_registry!( - "total_eth_bridge_txn_other", + "bridge_indexer_total_eth_bridge_txn_other", "Total number of other eth bridge transactions", registry, ) .unwrap(), last_committed_sui_checkpoint: register_int_gauge_with_registry!( - "last_committed_sui_checkpoint", + "bridge_indexer_last_committed_sui_checkpoint", "The latest sui checkpoint that indexer committed to DB", registry, ) .unwrap(), latest_committed_eth_block: register_int_gauge_with_registry!( - "last_committed_eth_block", + "bridge_indexer_last_committed_eth_block", "The latest eth block that indexer committed to DB", registry, ) .unwrap(), last_synced_eth_block: register_int_gauge_with_registry!( - "last_synced_eth_block", + "bridge_indexer_last_synced_eth_block", "The last eth block that indexer committed to DB", registry, ) .unwrap(), + tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!( + "bridge_indexer_tasks_remaining_checkpoints", + "The remaining checkpoints for each task", + &["task_name"], + registry, + ) + .unwrap(), + tasks_processed_checkpoints: register_int_counter_vec_with_registry!( + "bridge_indexer_tasks_processed_checkpoints", + "Total processed checkpoints for each task", + &["task_name"], + registry, + ) + .unwrap(), + live_task_current_checkpoint: register_int_gauge_vec_with_registry!( + "bridge_indexer_live_task_current_checkpoint", + "Current checkpoint of live task", + &["task_name"], + registry, + ) + .unwrap(), } } diff --git a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs index f76baa6e8ae25..7caf595e1b9f6 100644 --- a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs @@ -15,7 +15,6 @@ use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent}; -use sui_indexer_builder::sui_datasource::CheckpointTxnData; use sui_indexer_builder::Task; use sui_types::effects::TransactionEffectsAPI; use sui_types::event::Event; @@ -27,6 +26,7 @@ use crate::metrics::BridgeIndexerMetrics; use crate::postgres_manager::PgPool; use crate::schema::progress_store::{columns, dsl}; use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; +use crate::sui_datasource::CheckpointTxnData; use crate::{ models, schema, BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer, TokenTransferData, TokenTransferStatus, diff --git a/crates/sui-indexer-builder/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs similarity index 87% rename from crates/sui-indexer-builder/src/sui_datasource.rs rename to crates/sui-bridge-indexer/src/sui_datasource.rs index f1a95a264fdd0..a9d5de0fac774 100644 --- a/crates/sui-indexer-builder/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -1,15 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer_builder::{DataSender, Datasource}; use anyhow::Error; use async_trait::async_trait; use mysten_metrics::{metered_channel, spawn_monitored_task}; +use prometheus::IntCounterVec; +use prometheus::IntGaugeVec; use std::path::PathBuf; use std::sync::Arc; use sui_data_ingestion_core::{ DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool, }; +use sui_indexer_builder::indexer_builder::{DataSender, Datasource}; use sui_sdk::SuiClient; use sui_types::base_types::TransactionDigest; use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData; @@ -18,7 +20,8 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use tracing::info; + +use crate::metrics::BridgeIndexerMetrics; pub struct SuiCheckpointDatasource { remote_store_url: String, @@ -27,6 +30,7 @@ pub struct SuiCheckpointDatasource { checkpoint_path: PathBuf, genesis_checkpoint: u64, metrics: DataIngestionMetrics, + indexer_metrics: BridgeIndexerMetrics, } impl SuiCheckpointDatasource { pub fn new( @@ -36,6 +40,7 @@ impl SuiCheckpointDatasource { checkpoint_path: PathBuf, genesis_checkpoint: u64, metrics: DataIngestionMetrics, + indexer_metrics: BridgeIndexerMetrics, ) -> Self { SuiCheckpointDatasource { remote_store_url, @@ -43,6 +48,7 @@ impl SuiCheckpointDatasource { concurrency, checkpoint_path, metrics, + indexer_metrics, genesis_checkpoint, } } @@ -97,6 +103,18 @@ impl Datasource for SuiCheckpointDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_checkpoint } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } struct PerTaskInMemProgressStore { @@ -144,7 +162,7 @@ pub type CheckpointTxnData = (CheckpointTransaction, u64, u64); #[async_trait] impl Worker for IndexerWorker { async fn process_checkpoint(&self, checkpoint: SuiCheckpointData) -> anyhow::Result<()> { - info!( + tracing::trace!( "Received checkpoint [{}] {}: {}", checkpoint.checkpoint_summary.epoch, checkpoint.checkpoint_summary.sequence_number, diff --git a/crates/sui-indexer-builder/Cargo.toml b/crates/sui-indexer-builder/Cargo.toml index 306e6522ad0d6..bd863f8dc12f9 100644 --- a/crates/sui-indexer-builder/Cargo.toml +++ b/crates/sui-indexer-builder/Cargo.toml @@ -15,6 +15,7 @@ mysten-metrics.workspace = true sui-types.workspace = true sui-sdk.workspace = true sui-data-ingestion-core.workspace = true +futures.workspace = true tracing.workspace = true prometheus.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index c9ed96073a028..f37cab7d58c3c 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -6,6 +6,8 @@ use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; +use futures::StreamExt; +use prometheus::{IntCounterVec, IntGaugeVec}; use tokio::task::JoinHandle; use crate::{Task, Tasks}; @@ -15,6 +17,8 @@ use tap::tap::TapFallible; type CheckpointData = (u64, Vec); pub type DataSender = metered_channel::Sender>; +const INGESTION_BATCH_SIZE: usize = 100; + pub struct IndexerBuilder { name: String, datasource: D, @@ -221,6 +225,7 @@ impl Indexer { .await .tap_ok(|_| { tracing::info!( + task_name = self.name.as_str(), "Created backfill tasks ({}-{})", from_checkpoint, live_task_from_checkpoint - 1 @@ -228,6 +233,7 @@ impl Indexer { }) .tap_err(|e| { tracing::error!( + task_name = self.name.as_str(), "Failed to create backfill tasks ({}-{}): {:?}", from_checkpoint, live_task_from_checkpoint - 1, @@ -336,14 +342,19 @@ pub trait Datasource: Sync + Send { M: DataMapper, P: Persistent, { + let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE") + .unwrap_or(INGESTION_BATCH_SIZE.to_string()) + .parse::() + .unwrap(); tracing::info!( task_name, + ingestion_batch_size, "Starting ingestion task ({}-{})", starting_checkpoint, - target_checkpoint + target_checkpoint, ); - // todo: add metrics for number of tasks - let (data_sender, mut data_channel) = metered_channel::channel( + let is_live_task = target_checkpoint == i64::MAX as u64; + let (data_sender, data_rx) = metered_channel::channel( 1000, &mysten_metrics::get_metrics() .unwrap() @@ -354,23 +365,100 @@ pub trait Datasource: Sync + Send { .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender) .await?; - while let Some((block_number, data)) = data_channel.recv().await { - if block_number > target_checkpoint { - break; + let processed_checkpoints_metrics = self + .get_tasks_processed_checkpoints_metric() + .with_label_values(&[&task_name]); + // track remaining checkpoints per task, except for live task + let remaining_checkpoints_metric = if !is_live_task { + let remaining = self + .get_tasks_remaining_checkpoints_metric() + .with_label_values(&[&task_name]); + remaining.set((target_checkpoint - starting_checkpoint + 1) as i64); + Some(remaining) + } else { + None + }; + // track current checkpoint for live task + let live_task_current_checkpoint_metrics = if is_live_task { + let m = self + .get_live_task_checkpoint_metric() + .with_label_values(&[&task_name]); + m.set((starting_checkpoint) as i64); + Some(m) + } else { + None + }; + + let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx) + .ready_chunks(ingestion_batch_size); + + while let Some(batch) = stream.next().await { + // unwrap safe: at least 1 element in the batch + let mut max_height = 0; + let mut data = vec![]; + let mut batch_size = 0; + for (height, d) in batch { + if height > target_checkpoint { + tracing::warn!( + task_name, + height, + "Received data with height > target_checkpoint, skipping." + ); + continue; + } + max_height = std::cmp::max(max_height, height); + batch_size += 1; + data.extend(d); } + tracing::debug!( + task_name, + max_height, + "Ingestion task received {} blocks.", + batch_size, + ); + let timer = tokio::time::Instant::now(); + if !data.is_empty() { let processed_data = data.into_iter().try_fold(vec![], |mut result, d| { result.append(&mut data_mapper.map(d)?); Ok::, Error>(result) })?; + // TODO: batch write data // TODO: we might be able to write data and progress in a single transaction. storage.write(processed_data).await?; } - storage - .save_progress(task_name.clone(), block_number) - .await?; + // TODO: batch progress + storage.save_progress(task_name.clone(), max_height).await?; + tracing::debug!( + task_name, + max_height, + "Ingestion task processed {} blocks in {}ms", + batch_size, + timer.elapsed().as_millis(), + ); + processed_checkpoints_metrics.inc_by(batch_size as u64); + if let Some(m) = &remaining_checkpoints_metric { + // Note this is only approximate as the data may come in out of order + m.set(std::cmp::max( + target_checkpoint as i64 - max_height as i64, + 0, + )); + } + if let Some(m) = &live_task_current_checkpoint_metrics { + m.set((max_height) as i64) + } + if max_height > target_checkpoint { + break; + } + } + if is_live_task { + // Live task should never exit, except in unit tests + tracing::error!(task_name, "Live task exiting"); } join_handle.abort(); + if let Some(m) = &remaining_checkpoints_metric { + m.set(0) + } join_handle.await? } @@ -384,6 +472,12 @@ pub trait Datasource: Sync + Send { async fn get_live_task_starting_checkpoint(&self) -> Result; fn get_genesis_height(&self) -> u64; + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec; + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec; + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec; } pub enum BackfillStrategy { diff --git a/crates/sui-indexer-builder/src/lib.rs b/crates/sui-indexer-builder/src/lib.rs index f4cb0d4d32eda..af00dde516ac0 100644 --- a/crates/sui-indexer-builder/src/lib.rs +++ b/crates/sui-indexer-builder/src/lib.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod indexer_builder; -pub mod sui_datasource; #[derive(Clone, Debug)] pub struct Task { diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index 765f03b6a4a43..0f9040ca93b4c 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Error}; use async_trait::async_trait; +use prometheus::{IntCounterVec, IntGaugeVec}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -21,6 +22,8 @@ pub struct TestDatasource { pub data: Vec, pub live_task_starting_checkpoint: u64, pub genesis_checkpoint: u64, + pub gauge_metric: IntGaugeVec, + pub counter_metric: IntCounterVec, } #[async_trait] @@ -55,6 +58,21 @@ where fn get_genesis_height(&self) -> u64 { self.genesis_checkpoint } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + // This is dummy + &self.gauge_metric + } + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { + // This is dummy + &self.counter_metric + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + // This is dummy + &self.gauge_metric + } } #[derive(Clone, Debug, Default)] diff --git a/crates/sui-indexer-builder/tests/indexer_tests.rs b/crates/sui-indexer-builder/tests/indexer_tests.rs index 84dba2748b3a4..8beeddcb5b74b 100644 --- a/crates/sui-indexer-builder/tests/indexer_tests.rs +++ b/crates/sui-indexer-builder/tests/indexer_tests.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::indexer_test_utils::{InMemoryPersistent, NoopDataMapper, TestDatasource}; -use prometheus::Registry; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec, + IntGaugeVec, Registry, +}; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; use sui_indexer_builder::Task; @@ -19,6 +22,8 @@ async fn indexer_simple_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 5, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -39,6 +44,7 @@ async fn indexer_simple_backfill_task_test() { // it should have 2 task created for the indexer - a live task and a backfill task let tasks = persistent.get_all_tasks("test_indexer").await.unwrap(); + println!("{:?}", tasks); assert_ranges(&tasks, vec![(10, i64::MAX as u64), (4, 4)]); // the data recorded in storage should be the same as the datasource let mut recorded_data = persistent.data.lock().await.clone(); @@ -57,6 +63,8 @@ async fn indexer_partitioned_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -102,6 +110,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test1() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -151,6 +161,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test2() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -202,6 +214,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test3() { data: data.clone(), live_task_starting_checkpoint: 28, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -256,6 +270,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test4() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -314,6 +330,8 @@ async fn indexer_with_existing_live_task1() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 10, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -357,6 +375,8 @@ async fn indexer_with_existing_live_task2() { data: data.clone(), live_task_starting_checkpoint: 25, genesis_checkpoint: 10, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -410,6 +430,8 @@ async fn resume_test() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -446,3 +468,13 @@ async fn resume_test() { recorded_data.sort(); assert_eq!((10..=50u64).collect::>(), recorded_data); } + +fn new_gauge_vec(registry: &Registry) -> IntGaugeVec { + register_int_gauge_vec_with_registry!("whatever_gauge", "whatever", &["whatever"], registry,) + .unwrap() +} + +fn new_counter_vec(registry: &Registry) -> IntCounterVec { + register_int_counter_vec_with_registry!("whatever_counter", "whatever", &["whatever"], registry,) + .unwrap() +}