From 38cbcf99823f022893a5ca36b6b83eb7b044720b Mon Sep 17 00:00:00 2001 From: longbowlu Date: Sat, 31 Aug 2024 00:12:32 -0700 Subject: [PATCH 1/5] progress metrics --- .../src/eth_bridge_indexer.rs | 17 +++++++ crates/sui-bridge-indexer/src/lib.rs | 1 + crates/sui-bridge-indexer/src/main.rs | 3 +- crates/sui-bridge-indexer/src/metrics.rs | 44 +++++++++++++------ .../src/sui_bridge_indexer.rs | 2 +- .../src/sui_datasource.rs | 16 ++++++- .../src/indexer_builder.rs | 43 +++++++++++++++++- crates/sui-indexer-builder/src/lib.rs | 1 - .../tests/indexer_test_utils.rs | 12 +++++ .../tests/indexer_tests.rs | 15 ++++++- 10 files changed, 134 insertions(+), 20 deletions(-) rename crates/{sui-indexer-builder => sui-bridge-indexer}/src/sui_datasource.rs (90%) diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index b4c4254586885..c0e0499714f27 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use ethers::prelude::Transaction; use ethers::providers::{Http, Middleware, Provider, StreamExt, Ws}; use ethers::types::{Address as EthAddress, Block, Filter, H256}; +use prometheus::IntGaugeVec; use sui_bridge::error::BridgeError; use sui_bridge::eth_client::EthClient; use sui_bridge::metered_eth_provider::MeteredEthHttpProvier; @@ -145,6 +146,14 @@ impl Datasource for EthSubscriptionDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_block } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } pub struct EthSyncDatasource { @@ -267,6 +276,14 @@ impl Datasource for EthSyncDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_block } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } #[derive(Clone)] diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index afd4b461303f8..787bd3853f74f 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -19,6 +19,7 @@ pub mod types; pub mod eth_bridge_indexer; pub mod sui_bridge_indexer; +pub mod sui_datasource; #[derive(Clone)] pub enum ProcessedTxnData { diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index a58d5fa10d377..d0a2e94f4887f 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -25,12 +25,12 @@ use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper; use sui_bridge_indexer::metrics::BridgeIndexerMetrics; use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store}; use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper}; +use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource; use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop; use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task; use sui_config::Config; use sui_data_ingestion_core::DataIngestionMetrics; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; -use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource; use sui_sdk::SuiClientBuilder; #[derive(Parser, Clone, Debug)] @@ -137,6 +137,7 @@ async fn main() -> Result<()> { config.checkpoints_path.clone().into(), config.sui_bridge_genesis_checkpoint, ingestion_metrics.clone(), + indexer_meterics.clone(), ); let indexer = IndexerBuilder::new( "SuiBridgeIndexer", diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index b57a3121f4ae3..507453d85ec28 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use prometheus::{ - register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge, - Registry, + register_int_counter_with_registry, register_int_gauge_vec_with_registry, + register_int_gauge_with_registry, IntCounter, IntGauge, IntGaugeVec, Registry, }; #[derive(Clone, Debug)] @@ -20,83 +20,99 @@ pub struct BridgeIndexerMetrics { pub(crate) last_committed_sui_checkpoint: IntGauge, pub(crate) latest_committed_eth_block: IntGauge, pub(crate) last_synced_eth_block: IntGauge, + pub(crate) tasks_remaining_checkpoints: IntGaugeVec, + pub(crate) live_task_current_checkpoint: IntGaugeVec, } impl BridgeIndexerMetrics { pub fn new(registry: &Registry) -> Self { Self { total_sui_bridge_transactions: register_int_counter_with_registry!( - "total_sui_bridge_transactions", + "bridge_indexer_total_sui_bridge_transactions", "Total number of sui bridge transactions", registry, ) .unwrap(), total_sui_token_deposited: register_int_counter_with_registry!( - "total_sui_token_deposited", + "bridge_indexer_total_sui_token_deposited", "Total number of sui token deposited transactions", registry, ) .unwrap(), total_sui_token_transfer_approved: register_int_counter_with_registry!( - "total_sui_token_transfer_approved", + "bridge_indexer_total_sui_token_transfer_approved", "Total number of sui token approved transactions", registry, ) .unwrap(), total_sui_token_transfer_claimed: register_int_counter_with_registry!( - "total_sui_token_transfer_claimed", + "bridge_indexer_total_sui_token_transfer_claimed", "Total number of sui token claimed transactions", registry, ) .unwrap(), total_sui_bridge_txn_other: register_int_counter_with_registry!( - "total_sui_bridge_txn_other", + "bridge_indexer_total_sui_bridge_txn_other", "Total number of other sui bridge transactions", registry, ) .unwrap(), total_eth_bridge_transactions: register_int_counter_with_registry!( - "total_eth_bridge_transactions", + "bridge_indexer_total_eth_bridge_transactions", "Total number of eth bridge transactions", registry, ) .unwrap(), total_eth_token_deposited: register_int_counter_with_registry!( - "total_eth_token_deposited", + "bridge_indexer_total_eth_token_deposited", "Total number of eth token deposited transactions", registry, ) .unwrap(), total_eth_token_transfer_claimed: register_int_counter_with_registry!( - "total_eth_token_transfer_claimed", + "bridge_indexer_total_eth_token_transfer_claimed", "Total number of eth token claimed transactions", registry, ) .unwrap(), total_eth_bridge_txn_other: register_int_counter_with_registry!( - "total_eth_bridge_txn_other", + "bridge_indexer_total_eth_bridge_txn_other", "Total number of other eth bridge transactions", registry, ) .unwrap(), last_committed_sui_checkpoint: register_int_gauge_with_registry!( - "last_committed_sui_checkpoint", + "bridge_indexer_last_committed_sui_checkpoint", "The latest sui checkpoint that indexer committed to DB", registry, ) .unwrap(), latest_committed_eth_block: register_int_gauge_with_registry!( - "last_committed_eth_block", + "bridge_indexer_last_committed_eth_block", "The latest eth block that indexer committed to DB", registry, ) .unwrap(), last_synced_eth_block: register_int_gauge_with_registry!( - "last_synced_eth_block", + "bridge_indexer_last_synced_eth_block", "The last eth block that indexer committed to DB", registry, ) .unwrap(), + tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!( + "bridge_indexer_tasks_remaining_checkpoints", + "The remaining checkpoint for each task", + &["task_name"], + registry, + ) + .unwrap(), + live_task_current_checkpoint: register_int_gauge_vec_with_registry!( + "bridge_indexer_live_task_current_checkpoint", + "Current checkpoint of live task", + &["task_name"], + registry, + ) + .unwrap(), } } diff --git a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs index f76baa6e8ae25..7caf595e1b9f6 100644 --- a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs @@ -15,7 +15,6 @@ use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent}; -use sui_indexer_builder::sui_datasource::CheckpointTxnData; use sui_indexer_builder::Task; use sui_types::effects::TransactionEffectsAPI; use sui_types::event::Event; @@ -27,6 +26,7 @@ use crate::metrics::BridgeIndexerMetrics; use crate::postgres_manager::PgPool; use crate::schema::progress_store::{columns, dsl}; use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; +use crate::sui_datasource::CheckpointTxnData; use crate::{ models, schema, BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer, TokenTransferData, TokenTransferStatus, diff --git a/crates/sui-indexer-builder/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs similarity index 90% rename from crates/sui-indexer-builder/src/sui_datasource.rs rename to crates/sui-bridge-indexer/src/sui_datasource.rs index f1a95a264fdd0..4c4e4ab102e42 100644 --- a/crates/sui-indexer-builder/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -1,15 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer_builder::{DataSender, Datasource}; use anyhow::Error; use async_trait::async_trait; use mysten_metrics::{metered_channel, spawn_monitored_task}; +use prometheus::IntGaugeVec; use std::path::PathBuf; use std::sync::Arc; use sui_data_ingestion_core::{ DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool, }; +use sui_indexer_builder::indexer_builder::{DataSender, Datasource}; use sui_sdk::SuiClient; use sui_types::base_types::TransactionDigest; use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData; @@ -20,6 +21,8 @@ use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tracing::info; +use crate::metrics::BridgeIndexerMetrics; + pub struct SuiCheckpointDatasource { remote_store_url: String, sui_client: Arc, @@ -27,6 +30,7 @@ pub struct SuiCheckpointDatasource { checkpoint_path: PathBuf, genesis_checkpoint: u64, metrics: DataIngestionMetrics, + indexer_metrics: BridgeIndexerMetrics, } impl SuiCheckpointDatasource { pub fn new( @@ -36,6 +40,7 @@ impl SuiCheckpointDatasource { checkpoint_path: PathBuf, genesis_checkpoint: u64, metrics: DataIngestionMetrics, + indexer_metrics: BridgeIndexerMetrics, ) -> Self { SuiCheckpointDatasource { remote_store_url, @@ -43,6 +48,7 @@ impl SuiCheckpointDatasource { concurrency, checkpoint_path, metrics, + indexer_metrics, genesis_checkpoint, } } @@ -97,6 +103,14 @@ impl Datasource for SuiCheckpointDatasource { fn get_genesis_height(&self) -> u64 { self.genesis_checkpoint } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } } struct PerTaskInMemProgressStore { diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index c9ed96073a028..b19ec770e6da4 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; +use prometheus::IntGaugeVec; use tokio::task::JoinHandle; use crate::{Task, Tasks}; @@ -342,7 +343,7 @@ pub trait Datasource: Sync + Send { starting_checkpoint, target_checkpoint ); - // todo: add metrics for number of tasks + let is_live_task = target_checkpoint == i64::MAX as u64; let (data_sender, mut data_channel) = metered_channel::channel( 1000, &mysten_metrics::get_metrics() @@ -354,6 +355,27 @@ pub trait Datasource: Sync + Send { .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender) .await?; + // track remaining checkpoints per task, except for live task + let remaining_checkpoints_metric = if !is_live_task { + let m = self + .get_tasks_remaining_checkpoints_metric() + .with_label_values(&[&task_name]); + m.set((target_checkpoint - starting_checkpoint + 1) as i64); + Some(m) + } else { + None + }; + // tracking current checkpoint for live task + let live_task_current_checkpoint_metrics = if is_live_task { + let m = self + .get_live_task_checkpoint_metric() + .with_label_values(&[&task_name]); + m.set((starting_checkpoint) as i64); + Some(m) + } else { + None + }; + while let Some((block_number, data)) = data_channel.recv().await { if block_number > target_checkpoint { break; @@ -363,14 +385,29 @@ pub trait Datasource: Sync + Send { result.append(&mut data_mapper.map(d)?); Ok::, Error>(result) })?; + // TODO: batch write data // TODO: we might be able to write data and progress in a single transaction. storage.write(processed_data).await?; } + // TODO: batch progress storage .save_progress(task_name.clone(), block_number) .await?; + if let Some(m) = &remaining_checkpoints_metric { + m.set((target_checkpoint - block_number + 1) as i64) + } + if let Some(m) = &live_task_current_checkpoint_metrics { + m.set((block_number) as i64) + } + } + if is_live_task { + // Live task should never exit, except in unit tests + tracing::error!(task_name, "Live task exiting"); } join_handle.abort(); + if let Some(m) = &remaining_checkpoints_metric { + m.set(0) + } join_handle.await? } @@ -384,6 +421,10 @@ pub trait Datasource: Sync + Send { async fn get_live_task_starting_checkpoint(&self) -> Result; fn get_genesis_height(&self) -> u64; + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec; + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec; } pub enum BackfillStrategy { diff --git a/crates/sui-indexer-builder/src/lib.rs b/crates/sui-indexer-builder/src/lib.rs index f4cb0d4d32eda..af00dde516ac0 100644 --- a/crates/sui-indexer-builder/src/lib.rs +++ b/crates/sui-indexer-builder/src/lib.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod indexer_builder; -pub mod sui_datasource; #[derive(Clone, Debug)] pub struct Task { diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index 765f03b6a4a43..34bc06c66936a 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Error}; use async_trait::async_trait; +use prometheus::IntGaugeVec; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -21,6 +22,7 @@ pub struct TestDatasource { pub data: Vec, pub live_task_starting_checkpoint: u64, pub genesis_checkpoint: u64, + pub metric: IntGaugeVec, } #[async_trait] @@ -55,6 +57,16 @@ where fn get_genesis_height(&self) -> u64 { self.genesis_checkpoint } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + // This is dummy + &self.metric + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + // This is dummy + &self.metric + } } #[derive(Clone, Debug, Default)] diff --git a/crates/sui-indexer-builder/tests/indexer_tests.rs b/crates/sui-indexer-builder/tests/indexer_tests.rs index 84dba2748b3a4..9f6f9a697cb3e 100644 --- a/crates/sui-indexer-builder/tests/indexer_tests.rs +++ b/crates/sui-indexer-builder/tests/indexer_tests.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::indexer_test_utils::{InMemoryPersistent, NoopDataMapper, TestDatasource}; -use prometheus::Registry; +use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; use sui_indexer_builder::Task; @@ -19,6 +19,7 @@ async fn indexer_simple_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 5, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -57,6 +58,7 @@ async fn indexer_partitioned_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -102,6 +104,7 @@ async fn indexer_partitioned_task_with_data_already_in_db_test1() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -151,6 +154,7 @@ async fn indexer_partitioned_task_with_data_already_in_db_test2() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -202,6 +206,7 @@ async fn indexer_partitioned_task_with_data_already_in_db_test3() { data: data.clone(), live_task_starting_checkpoint: 28, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -256,6 +261,7 @@ async fn indexer_partitioned_task_with_data_already_in_db_test4() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -314,6 +320,7 @@ async fn indexer_with_existing_live_task1() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 10, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -357,6 +364,7 @@ async fn indexer_with_existing_live_task2() { data: data.clone(), live_task_starting_checkpoint: 25, genesis_checkpoint: 10, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -410,6 +418,7 @@ async fn resume_test() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, + metric: new_metric(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -446,3 +455,7 @@ async fn resume_test() { recorded_data.sort(); assert_eq!((10..=50u64).collect::>(), recorded_data); } + +fn new_metric(registry: &Registry) -> IntGaugeVec { + register_int_gauge_vec_with_registry!("whatever", "whatever", &["whatever"], registry,).unwrap() +} From f831144311f1609ee51e27be8d47e124ae485380 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Sun, 1 Sep 2024 15:39:56 -0700 Subject: [PATCH 2/5] more --- .../src/eth_bridge_indexer.rs | 8 ++++++ crates/sui-bridge-indexer/src/metrics.rs | 10 ++++++- .../sui-bridge-indexer/src/sui_datasource.rs | 7 +++-- .../src/indexer_builder.rs | 27 ++++++++++++++++--- .../tests/indexer_test_utils.rs | 5 ++++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index c0e0499714f27..e836f6775d1ff 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -151,6 +151,10 @@ impl Datasource for EthSubscriptionDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } + fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { &self.indexer_metrics.live_task_current_checkpoint } @@ -281,6 +285,10 @@ impl Datasource for EthSyncDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } + fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { &self.indexer_metrics.live_task_current_checkpoint } diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index 507453d85ec28..8bdedeac79c98 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -21,6 +21,7 @@ pub struct BridgeIndexerMetrics { pub(crate) latest_committed_eth_block: IntGauge, pub(crate) last_synced_eth_block: IntGauge, pub(crate) tasks_remaining_checkpoints: IntGaugeVec, + pub(crate) tasks_processed_checkpoints: IntGaugeVec, pub(crate) live_task_current_checkpoint: IntGaugeVec, } @@ -101,7 +102,14 @@ impl BridgeIndexerMetrics { .unwrap(), tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!( "bridge_indexer_tasks_remaining_checkpoints", - "The remaining checkpoint for each task", + "The remaining checkpoints for each task", + &["task_name"], + registry, + ) + .unwrap(), + tasks_processed_checkpoints: register_int_gauge_vec_with_registry!( + "bridge_indexer_tasks_processed_checkpoints", + "Total processed checkpoints for each task", &["task_name"], registry, ) diff --git a/crates/sui-bridge-indexer/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs index 4c4e4ab102e42..f8fbe3b1b4097 100644 --- a/crates/sui-bridge-indexer/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -19,7 +19,6 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use tracing::info; use crate::metrics::BridgeIndexerMetrics; @@ -108,6 +107,10 @@ impl Datasource for SuiCheckpointDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } + fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { &self.indexer_metrics.live_task_current_checkpoint } @@ -158,7 +161,7 @@ pub type CheckpointTxnData = (CheckpointTransaction, u64, u64); #[async_trait] impl Worker for IndexerWorker { async fn process_checkpoint(&self, checkpoint: SuiCheckpointData) -> anyhow::Result<()> { - info!( + tracing::trace!( "Received checkpoint [{}] {}: {}", checkpoint.checkpoint_summary.epoch, checkpoint.checkpoint_summary.sequence_number, diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index b19ec770e6da4..22987e0c39e07 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -355,13 +355,16 @@ pub trait Datasource: Sync + Send { .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender) .await?; + let processed_metrics_metrics = self + .get_tasks_processed_checkpoints_metric() + .with_label_values(&[&task_name]); // track remaining checkpoints per task, except for live task let remaining_checkpoints_metric = if !is_live_task { - let m = self + let remaining = self .get_tasks_remaining_checkpoints_metric() .with_label_values(&[&task_name]); - m.set((target_checkpoint - starting_checkpoint + 1) as i64); - Some(m) + remaining.set((target_checkpoint - starting_checkpoint + 1) as i64); + Some(remaining) } else { None }; @@ -377,6 +380,14 @@ pub trait Datasource: Sync + Send { }; while let Some((block_number, data)) = data_channel.recv().await { + let data_len = data.len(); + tracing::debug!( + task_name, + height = block_number, + "Ingestion task received {} data", + data_len, + ); + let timer = tokio::time::Instant::now(); if block_number > target_checkpoint { break; } @@ -393,6 +404,14 @@ pub trait Datasource: Sync + Send { storage .save_progress(task_name.clone(), block_number) .await?; + tracing::debug!( + task_name, + height = block_number, + "Ingestion task processed {} data in {}ms", + data_len, + timer.elapsed().as_millis(), + ); + processed_metrics_metrics.inc(); if let Some(m) = &remaining_checkpoints_metric { m.set((target_checkpoint - block_number + 1) as i64) } @@ -424,6 +443,8 @@ pub trait Datasource: Sync + Send { fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec; + fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec; + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec; } diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index 34bc06c66936a..ca7a778a9dadf 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -63,6 +63,11 @@ where &self.metric } + fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + // This is dummy + &self.metric + } + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { // This is dummy &self.metric From 9c16744e0301b341d4c54700a6afa82f98b92f72 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Sun, 1 Sep 2024 16:15:41 -0700 Subject: [PATCH 3/5] batch ingestion tasks --- Cargo.lock | 1 + .../src/eth_bridge_indexer.rs | 1 + crates/sui-indexer-builder/Cargo.toml | 1 + .../src/indexer_builder.rs | 59 +++++++++++++------ 4 files changed, 45 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1849fc77f020f..bdec902bd473c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13642,6 +13642,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "futures", "mysten-metrics", "prometheus", "sui-data-ingestion-core", diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index e836f6775d1ff..dedfbfe2e4f6c 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -108,6 +108,7 @@ impl Datasource for EthSubscriptionDatasource { eth_ws_client.get_block(block_number), Duration::from_secs(30000) ) else { + // FIXME: why panic!("Unable to get block from provider"); }; diff --git a/crates/sui-indexer-builder/Cargo.toml b/crates/sui-indexer-builder/Cargo.toml index 306e6522ad0d6..bd863f8dc12f9 100644 --- a/crates/sui-indexer-builder/Cargo.toml +++ b/crates/sui-indexer-builder/Cargo.toml @@ -15,6 +15,7 @@ mysten-metrics.workspace = true sui-types.workspace = true sui-sdk.workspace = true sui-data-ingestion-core.workspace = true +futures.workspace = true tracing.workspace = true prometheus.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index 22987e0c39e07..8bffb090ba3b5 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; +use futures::StreamExt; use prometheus::IntGaugeVec; use tokio::task::JoinHandle; @@ -16,6 +17,8 @@ use tap::tap::TapFallible; type CheckpointData = (u64, Vec); pub type DataSender = metered_channel::Sender>; +const INGESTION_BATCH_SIZE: usize = 100; + pub struct IndexerBuilder { name: String, datasource: D, @@ -337,14 +340,19 @@ pub trait Datasource: Sync + Send { M: DataMapper, P: Persistent, { + let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE") + .unwrap_or(INGESTION_BATCH_SIZE.to_string()) + .parse::() + .unwrap(); tracing::info!( task_name, + ingestion_batch_size, "Starting ingestion task ({}-{})", starting_checkpoint, - target_checkpoint + target_checkpoint, ); let is_live_task = target_checkpoint == i64::MAX as u64; - let (data_sender, mut data_channel) = metered_channel::channel( + let (data_sender, data_rx) = metered_channel::channel( 1000, &mysten_metrics::get_metrics() .unwrap() @@ -368,7 +376,7 @@ pub trait Datasource: Sync + Send { } else { None }; - // tracking current checkpoint for live task + // track current checkpoint for live task let live_task_current_checkpoint_metrics = if is_live_task { let m = self .get_live_task_checkpoint_metric() @@ -379,18 +387,30 @@ pub trait Datasource: Sync + Send { None }; - while let Some((block_number, data)) = data_channel.recv().await { - let data_len = data.len(); + let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx) + .ready_chunks(ingestion_batch_size); + + while let Some(batch) = stream.next().await { + // unwrap safe: at least 1 element in the batch + let first_height = batch.first().map(|(h, _)| *h).unwrap(); + let last_height = batch.last().map(|(h, _)| *h).unwrap(); + let batch_size = batch.len(); tracing::debug!( task_name, - height = block_number, - "Ingestion task received {} data", - data_len, + first_height, + last_height, + "Ingestion task received {} blocks.", + batch_size, ); let timer = tokio::time::Instant::now(); - if block_number > target_checkpoint { - break; - } + + let data = batch + .into_iter() + .take_while(|(height, _)| *height <= target_checkpoint) + .flat_map(|(_, data)| data) + .collect::>(); + let new_len = data.len(); + let last_height = min(last_height, target_checkpoint); if !data.is_empty() { let processed_data = data.into_iter().try_fold(vec![], |mut result, d| { result.append(&mut data_mapper.map(d)?); @@ -402,21 +422,26 @@ pub trait Datasource: Sync + Send { } // TODO: batch progress storage - .save_progress(task_name.clone(), block_number) + .save_progress(task_name.clone(), last_height) .await?; tracing::debug!( task_name, - height = block_number, - "Ingestion task processed {} data in {}ms", - data_len, + first_height, + last_height, + "Ingestion task processed {} blocks in {}ms", + new_len, timer.elapsed().as_millis(), ); processed_metrics_metrics.inc(); if let Some(m) = &remaining_checkpoints_metric { - m.set((target_checkpoint - block_number + 1) as i64) + // Note this is +-1 accurate if one checkpoint is split into multiple batches + m.set((target_checkpoint - last_height) as i64) } if let Some(m) = &live_task_current_checkpoint_metrics { - m.set((block_number) as i64) + m.set((last_height) as i64) + } + if last_height > target_checkpoint { + break; } } if is_live_task { From 577b2368b381c3b6c227d806668961fa361d1d3a Mon Sep 17 00:00:00 2001 From: longbowlu Date: Sun, 1 Sep 2024 17:27:57 -0700 Subject: [PATCH 4/5] max length --- .../src/indexer_builder.rs | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index 8bffb090ba3b5..ea9be15a45c26 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -225,6 +225,7 @@ impl Indexer { .await .tap_ok(|_| { tracing::info!( + task_name = self.name.as_str(), "Created backfill tasks ({}-{})", from_checkpoint, live_task_from_checkpoint - 1 @@ -232,6 +233,7 @@ impl Indexer { }) .tap_err(|e| { tracing::error!( + task_name = self.name.as_str(), "Failed to create backfill tasks ({}-{}): {:?}", from_checkpoint, live_task_from_checkpoint - 1, @@ -392,25 +394,22 @@ pub trait Datasource: Sync + Send { while let Some(batch) = stream.next().await { // unwrap safe: at least 1 element in the batch - let first_height = batch.first().map(|(h, _)| *h).unwrap(); - let last_height = batch.last().map(|(h, _)| *h).unwrap(); + let max_height = batch.iter().max_by(|(a, _), (b, _)| a.cmp(b)).unwrap().0; let batch_size = batch.len(); tracing::debug!( task_name, - first_height, - last_height, + max_height, "Ingestion task received {} blocks.", batch_size, ); let timer = tokio::time::Instant::now(); + // Note: for simplicity here we assume all data's height <= target_checkpoint let data = batch .into_iter() - .take_while(|(height, _)| *height <= target_checkpoint) + .filter(|(height, _)| *height <= target_checkpoint) .flat_map(|(_, data)| data) .collect::>(); - let new_len = data.len(); - let last_height = min(last_height, target_checkpoint); if !data.is_empty() { let processed_data = data.into_iter().try_fold(vec![], |mut result, d| { result.append(&mut data_mapper.map(d)?); @@ -421,26 +420,23 @@ pub trait Datasource: Sync + Send { storage.write(processed_data).await?; } // TODO: batch progress - storage - .save_progress(task_name.clone(), last_height) - .await?; + storage.save_progress(task_name.clone(), max_height).await?; tracing::debug!( task_name, - first_height, - last_height, + max_height, "Ingestion task processed {} blocks in {}ms", - new_len, + batch_size, timer.elapsed().as_millis(), ); - processed_metrics_metrics.inc(); + processed_metrics_metrics.add(batch_size as i64); if let Some(m) = &remaining_checkpoints_metric { - // Note this is +-1 accurate if one checkpoint is split into multiple batches - m.set((target_checkpoint - last_height) as i64) + // Note this is only approximate as the data may come in out of order + m.set((target_checkpoint - max_height) as i64) } if let Some(m) = &live_task_current_checkpoint_metrics { - m.set((last_height) as i64) + m.set((max_height) as i64) } - if last_height > target_checkpoint { + if max_height > target_checkpoint { break; } } From 0856195f263abc971b99ca53c4265e49b751dfb7 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Thu, 5 Sep 2024 16:17:22 -0700 Subject: [PATCH 5/5] metrics + batch process blocks --- .../src/eth_bridge_indexer.rs | 7 ++- crates/sui-bridge-indexer/src/metrics.rs | 9 ++-- .../sui-bridge-indexer/src/sui_datasource.rs | 3 +- .../src/indexer_builder.rs | 37 ++++++++++------ .../tests/indexer_test_utils.rs | 13 +++--- .../tests/indexer_tests.rs | 43 +++++++++++++------ 6 files changed, 72 insertions(+), 40 deletions(-) diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index dedfbfe2e4f6c..7b00e3eeaa877 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use ethers::prelude::Transaction; use ethers::providers::{Http, Middleware, Provider, StreamExt, Ws}; use ethers::types::{Address as EthAddress, Block, Filter, H256}; -use prometheus::IntGaugeVec; +use prometheus::{IntCounterVec, IntGaugeVec}; use sui_bridge::error::BridgeError; use sui_bridge::eth_client::EthClient; use sui_bridge::metered_eth_provider::MeteredEthHttpProvier; @@ -108,7 +108,6 @@ impl Datasource for EthSubscriptionDatasource { eth_ws_client.get_block(block_number), Duration::from_secs(30000) ) else { - // FIXME: why panic!("Unable to get block from provider"); }; @@ -152,7 +151,7 @@ impl Datasource for EthSubscriptionDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } - fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } @@ -286,7 +285,7 @@ impl Datasource for EthSyncDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } - fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index 8bdedeac79c98..9ea85514e20d3 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use prometheus::{ - register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, IntCounter, IntGauge, IntGaugeVec, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, Registry, }; #[derive(Clone, Debug)] @@ -21,7 +22,7 @@ pub struct BridgeIndexerMetrics { pub(crate) latest_committed_eth_block: IntGauge, pub(crate) last_synced_eth_block: IntGauge, pub(crate) tasks_remaining_checkpoints: IntGaugeVec, - pub(crate) tasks_processed_checkpoints: IntGaugeVec, + pub(crate) tasks_processed_checkpoints: IntCounterVec, pub(crate) live_task_current_checkpoint: IntGaugeVec, } @@ -107,7 +108,7 @@ impl BridgeIndexerMetrics { registry, ) .unwrap(), - tasks_processed_checkpoints: register_int_gauge_vec_with_registry!( + tasks_processed_checkpoints: register_int_counter_vec_with_registry!( "bridge_indexer_tasks_processed_checkpoints", "Total processed checkpoints for each task", &["task_name"], diff --git a/crates/sui-bridge-indexer/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs index f8fbe3b1b4097..a9d5de0fac774 100644 --- a/crates/sui-bridge-indexer/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -4,6 +4,7 @@ use anyhow::Error; use async_trait::async_trait; use mysten_metrics::{metered_channel, spawn_monitored_task}; +use prometheus::IntCounterVec; use prometheus::IntGaugeVec; use std::path::PathBuf; use std::sync::Arc; @@ -107,7 +108,7 @@ impl Datasource for SuiCheckpointDatasource { &self.indexer_metrics.tasks_remaining_checkpoints } - fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index ea9be15a45c26..f37cab7d58c3c 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; use futures::StreamExt; -use prometheus::IntGaugeVec; +use prometheus::{IntCounterVec, IntGaugeVec}; use tokio::task::JoinHandle; use crate::{Task, Tasks}; @@ -365,7 +365,7 @@ pub trait Datasource: Sync + Send { .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender) .await?; - let processed_metrics_metrics = self + let processed_checkpoints_metrics = self .get_tasks_processed_checkpoints_metric() .with_label_values(&[&task_name]); // track remaining checkpoints per task, except for live task @@ -394,8 +394,22 @@ pub trait Datasource: Sync + Send { while let Some(batch) = stream.next().await { // unwrap safe: at least 1 element in the batch - let max_height = batch.iter().max_by(|(a, _), (b, _)| a.cmp(b)).unwrap().0; - let batch_size = batch.len(); + let mut max_height = 0; + let mut data = vec![]; + let mut batch_size = 0; + for (height, d) in batch { + if height > target_checkpoint { + tracing::warn!( + task_name, + height, + "Received data with height > target_checkpoint, skipping." + ); + continue; + } + max_height = std::cmp::max(max_height, height); + batch_size += 1; + data.extend(d); + } tracing::debug!( task_name, max_height, @@ -404,12 +418,6 @@ pub trait Datasource: Sync + Send { ); let timer = tokio::time::Instant::now(); - // Note: for simplicity here we assume all data's height <= target_checkpoint - let data = batch - .into_iter() - .filter(|(height, _)| *height <= target_checkpoint) - .flat_map(|(_, data)| data) - .collect::>(); if !data.is_empty() { let processed_data = data.into_iter().try_fold(vec![], |mut result, d| { result.append(&mut data_mapper.map(d)?); @@ -428,10 +436,13 @@ pub trait Datasource: Sync + Send { batch_size, timer.elapsed().as_millis(), ); - processed_metrics_metrics.add(batch_size as i64); + processed_checkpoints_metrics.inc_by(batch_size as u64); if let Some(m) = &remaining_checkpoints_metric { // Note this is only approximate as the data may come in out of order - m.set((target_checkpoint - max_height) as i64) + m.set(std::cmp::max( + target_checkpoint as i64 - max_height as i64, + 0, + )); } if let Some(m) = &live_task_current_checkpoint_metrics { m.set((max_height) as i64) @@ -464,7 +475,7 @@ pub trait Datasource: Sync + Send { fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec; - fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec; + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec; fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec; } diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index ca7a778a9dadf..0f9040ca93b4c 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -7,7 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Error}; use async_trait::async_trait; -use prometheus::IntGaugeVec; +use prometheus::{IntCounterVec, IntGaugeVec}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -22,7 +22,8 @@ pub struct TestDatasource { pub data: Vec, pub live_task_starting_checkpoint: u64, pub genesis_checkpoint: u64, - pub metric: IntGaugeVec, + pub gauge_metric: IntGaugeVec, + pub counter_metric: IntCounterVec, } #[async_trait] @@ -60,17 +61,17 @@ where fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { // This is dummy - &self.metric + &self.gauge_metric } - fn get_tasks_processed_checkpoints_metric(&self) -> &IntGaugeVec { + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { // This is dummy - &self.metric + &self.counter_metric } fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { // This is dummy - &self.metric + &self.gauge_metric } } diff --git a/crates/sui-indexer-builder/tests/indexer_tests.rs b/crates/sui-indexer-builder/tests/indexer_tests.rs index 9f6f9a697cb3e..8beeddcb5b74b 100644 --- a/crates/sui-indexer-builder/tests/indexer_tests.rs +++ b/crates/sui-indexer-builder/tests/indexer_tests.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::indexer_test_utils::{InMemoryPersistent, NoopDataMapper, TestDatasource}; -use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec, + IntGaugeVec, Registry, +}; use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder}; use sui_indexer_builder::Task; @@ -19,7 +22,8 @@ async fn indexer_simple_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 5, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -40,6 +44,7 @@ async fn indexer_simple_backfill_task_test() { // it should have 2 task created for the indexer - a live task and a backfill task let tasks = persistent.get_all_tasks("test_indexer").await.unwrap(); + println!("{:?}", tasks); assert_ranges(&tasks, vec![(10, i64::MAX as u64), (4, 4)]); // the data recorded in storage should be the same as the datasource let mut recorded_data = persistent.data.lock().await.clone(); @@ -58,7 +63,8 @@ async fn indexer_partitioned_backfill_task_test() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); let mut indexer = IndexerBuilder::new( @@ -104,7 +110,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test1() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -154,7 +161,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test2() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.data.lock().await.append(&mut (0..=30).collect()); @@ -206,7 +214,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test3() { data: data.clone(), live_task_starting_checkpoint: 28, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -261,7 +270,8 @@ async fn indexer_partitioned_task_with_data_already_in_db_test4() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -320,7 +330,8 @@ async fn indexer_with_existing_live_task1() { data: data.clone(), live_task_starting_checkpoint: 35, genesis_checkpoint: 10, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -364,7 +375,8 @@ async fn indexer_with_existing_live_task2() { data: data.clone(), live_task_starting_checkpoint: 25, genesis_checkpoint: 10, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -418,7 +430,8 @@ async fn resume_test() { data: data.clone(), live_task_starting_checkpoint: 31, genesis_checkpoint: 0, - metric: new_metric(®istry), + gauge_metric: new_gauge_vec(®istry), + counter_metric: new_counter_vec(®istry), }; let persistent = InMemoryPersistent::new(); persistent.progress_store.lock().await.insert( @@ -456,6 +469,12 @@ async fn resume_test() { assert_eq!((10..=50u64).collect::>(), recorded_data); } -fn new_metric(registry: &Registry) -> IntGaugeVec { - register_int_gauge_vec_with_registry!("whatever", "whatever", &["whatever"], registry,).unwrap() +fn new_gauge_vec(registry: &Registry) -> IntGaugeVec { + register_int_gauge_vec_with_registry!("whatever_gauge", "whatever", &["whatever"], registry,) + .unwrap() +} + +fn new_counter_vec(registry: &Registry) -> IntCounterVec { + register_int_counter_vec_with_registry!("whatever_counter", "whatever", &["whatever"], registry,) + .unwrap() }