Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c1577ec
Add Deepbook Indexer
Bridgerz Aug 17, 2024
572b20b
Add Deepbook Indexer
Bridgerz Aug 17, 2024
bc9a430
Update Cargo.lock
Bridgerz Aug 23, 2024
c65a876
upgrade diesel
Bridgerz Aug 23, 2024
83e0577
Update schema.rs
Bridgerz Aug 23, 2024
2a0faad
PR fixes
Bridgerz Aug 23, 2024
642a421
Add Deepbook Indexer
Bridgerz Aug 17, 2024
70b7457
Update Cargo.lock
Bridgerz Aug 27, 2024
763129d
new events
0xaslan Sep 4, 2024
6721ae5
Merge branch 'deepbook-indexer' of https://github.com/MystenLabs/sui …
0xaslan Sep 4, 2024
9dbad6e
id as primary key
0xaslan Sep 5, 2024
e63751c
cargo
0xaslan Sep 5, 2024
bf4e8dd
Merge branch 'main' into deepbook-indexer
0xaslan Sep 5, 2024
b8ce672
clippy
0xaslan Sep 5, 2024
c4c501a
Merge branch 'deepbook-indexer' of https://github.com/MystenLabs/sui …
0xaslan Sep 5, 2024
db5288b
fixes
0xaslan Sep 5, 2024
63c6628
Merge branch 'main' of https://github.com/MystenLabs/sui into deepboo…
0xaslan Sep 5, 2024
653dfb8
Merge branch 'main' of https://github.com/MystenLabs/sui into deepboo…
0xaslan Sep 6, 2024
108833e
cargolock
0xaslan Sep 6, 2024
ce8f22c
latest indexer changes
0xaslan Sep 6, 2024
d92fd89
Merge branch 'main' of https://github.com/MystenLabs/sui into deepboo…
0xaslan Sep 9, 2024
42aec12
pull latest indexer changes
0xaslan Sep 9, 2024
2a37333
Update Cargo.toml
Bridgerz Sep 9, 2024
8e87606
Update cargo.lock
Bridgerz Sep 9, 2024
31805a8
Update Cargo.lock
Bridgerz Sep 9, 2024
69600fb
Update events.rs
Bridgerz Sep 9, 2024
72c92da
Merge branch 'main' into db-indexer-squashed
longbowlu Sep 9, 2024
91849b2
cargo.lock
longbowlu Sep 9, 2024
9e648f8
remove unused files
longbowlu Sep 9, 2024
e4e295f
diesel stuff
longbowlu Sep 9, 2024
d9e898f
make it compile
longbowlu Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ members = [
"crates/sui-cost",
"crates/sui-data-ingestion",
"crates/sui-data-ingestion-core",
"crates/sui-deepbook-indexer",
"crates/sui-e2e-tests",
"crates/sui-enum-compat-util",
"crates/sui-faucet",
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::storage::{
OutOfOrderSaveAfterDurationPolicy, PgBridgePersistent, ProgressSavingPolicy,
SaveAfterDurationPolicy,
};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_config::Config;
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder};
use sui_indexer_builder::progress::{
OutOfOrderSaveAfterDurationPolicy, ProgressSavingPolicy, SaveAfterDurationPolicy,
};
use sui_sdk::SuiClientBuilder;

#[derive(Parser, Clone, Debug)]
Expand Down
244 changes: 3 additions & 241 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![allow(dead_code)] // TODO: remove in next PR where integration of ProgressSavingPolicy is done

use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Error};
use async_trait::async_trait;
use diesel::dsl::now;
Expand All @@ -21,7 +16,9 @@ use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::{models, schema, ProcessedTxnData};
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT};
use sui_indexer_builder::{
progress::ProgressSavingPolicy, Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT,
};

/// Persistent layer impl
#[derive(Clone)]
Expand Down Expand Up @@ -233,238 +230,3 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(())
}
}

#[derive(Debug, Clone)]
pub enum ProgressSavingPolicy {
SaveAfterDuration(SaveAfterDurationPolicy),
OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy),
}

#[derive(Debug, Clone)]
pub struct SaveAfterDurationPolicy {
duration: tokio::time::Duration,
last_save_time: Arc<Mutex<HashMap<String, Option<tokio::time::Instant>>>>,
}

impl SaveAfterDurationPolicy {
pub fn new(duration: tokio::time::Duration) -> Self {
Self {
duration,
last_save_time: Arc::new(Mutex::new(HashMap::new())),
}
}
}

#[derive(Debug, Clone)]
pub struct OutOfOrderSaveAfterDurationPolicy {
duration: tokio::time::Duration,
last_save_time: Arc<Mutex<HashMap<String, Option<tokio::time::Instant>>>>,
seen: Arc<Mutex<HashMap<String, HashSet<u64>>>>,
next_to_fill: Arc<Mutex<HashMap<String, Option<u64>>>>,
}

impl OutOfOrderSaveAfterDurationPolicy {
pub fn new(duration: tokio::time::Duration) -> Self {
Self {
duration,
last_save_time: Arc::new(Mutex::new(HashMap::new())),
seen: Arc::new(Mutex::new(HashMap::new())),
next_to_fill: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl ProgressSavingPolicy {
/// If returns Some(progress), it means we should save the progress to DB.
fn cache_progress(
&mut self,
task_name: String,
heights: &[u64],
start_height: u64,
target_height: u64,
) -> Option<u64> {
match self {
ProgressSavingPolicy::SaveAfterDuration(policy) => {
let height = *heights.iter().max().unwrap();
let mut last_save_time_guard = policy.last_save_time.lock().unwrap();
let last_save_time = last_save_time_guard.entry(task_name).or_insert(None);
if height >= target_height {
*last_save_time = Some(tokio::time::Instant::now());
return Some(height);
}
if let Some(v) = last_save_time {
if v.elapsed() >= policy.duration {
*last_save_time = Some(tokio::time::Instant::now());
Some(height)
} else {
None
}
} else {
// update `last_save_time` to now but don't actually save progress
*last_save_time = Some(tokio::time::Instant::now());
None
}
}
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(policy) => {
let mut next_to_fill = {
let mut next_to_fill_guard = policy.next_to_fill.lock().unwrap();
(*next_to_fill_guard
.entry(task_name.clone())
.or_insert(Some(start_height)))
.unwrap()
};
let old_next_to_fill = next_to_fill;
{
let mut seen_guard = policy.seen.lock().unwrap();
let seen = seen_guard
.entry(task_name.clone())
.or_insert(HashSet::new());
seen.extend(heights.iter().cloned());
while seen.remove(&next_to_fill) {
next_to_fill += 1;
}
}
// We made some progress in filling gaps
if old_next_to_fill != next_to_fill {
policy
.next_to_fill
.lock()
.unwrap()
.insert(task_name.clone(), Some(next_to_fill));
}

let mut last_save_time_guard = policy.last_save_time.lock().unwrap();
let last_save_time = last_save_time_guard
.entry(task_name.clone())
.or_insert(None);

// If we have reached the target height, we always save
if next_to_fill > target_height {
*last_save_time = Some(tokio::time::Instant::now());
return Some(next_to_fill - 1);
}
// Regardless of whether we made progress, we should save if we have waited long enough
if let Some(v) = last_save_time {
if v.elapsed() >= policy.duration && next_to_fill > start_height {
*last_save_time = Some(tokio::time::Instant::now());
Some(next_to_fill - 1)
} else {
None
}
} else {
// update `last_save_time` to now but don't actually save progress
*last_save_time = Some(tokio::time::Instant::now());
None
}
}
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn test_save_after_duration_policy() {
let duration = tokio::time::Duration::from_millis(100);
let mut policy =
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(duration));
assert_eq!(
policy.cache_progress("task1".to_string(), &[1], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[2], 0, 100),
Some(2)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[3], 0, 100),
Some(3)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[4], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[5, 6], 0, 100),
Some(6)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[8, 7], 0, 100),
Some(8)
);
}

#[tokio::test]
async fn test_out_of_order_save_after_duration_policy() {
let duration = tokio::time::Duration::from_millis(100);
let mut policy = ProgressSavingPolicy::OutOfOrderSaveAfterDuration(
OutOfOrderSaveAfterDurationPolicy::new(duration),
);

assert_eq!(
policy.cache_progress("task1".to_string(), &[0], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[1], 0, 100),
Some(1)
);
assert_eq!(
policy.cache_progress("task1".to_string(), &[3], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[4], 0, 100),
Some(1)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[2], 0, 100),
Some(4)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[0], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[1], 0, 100),
Some(1)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[2], 0, 100),
Some(2)
);
assert_eq!(
policy.cache_progress("task2".to_string(), &[3], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[4], 0, 100),
Some(4)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[6, 7, 8], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[5, 9], 0, 100),
Some(9)
);
}
}
44 changes: 44 additions & 0 deletions crates/sui-deepbook-indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "sui-deepbook-indexer"
version = "0.1.0"
authors = ["Mysten Labs <build@mystenlabs.com>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
serde.workspace = true
tap.workspace = true
diesel = { workspace = true, features = ["serde_json"] }
diesel-async = { workspace = true, features = ["bb8", "postgres"] }
tokio = { workspace = true, features = ["full"] }
anyhow.workspace = true
futures.workspace = true
async-trait.workspace = true
bcs.workspace = true
bin-version.workspace = true
clap.workspace = true
mysten-metrics.workspace = true
prometheus.workspace = true
serde_yaml.workspace = true
sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
backoff.workspace = true
sui-config.workspace = true
sui-indexer-builder.workspace = true
bigdecimal = "0.4.0"

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"

[[bin]]
name = "deepbook-indexer"
path = "src/main.rs"
Loading
Loading