Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions crates/sui-archival/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,44 @@ impl ArchiveReader {
.await
}

pub async fn get_summaries_for_list_no_verify(
&self,
cp_list: Vec<CheckpointSequenceNumber>,
) -> Result<Vec<CertifiedCheckpointSummary>> {
let summary_files = self.get_summary_files_for_list(cp_list.clone()).await?;
let remote_object_store = self.remote_object_store.clone();
let stream = futures::stream::iter(summary_files.iter())
.map(|summary_metadata| {
let remote_object_store = remote_object_store.clone();
async move {
let summary_data =
get(&remote_object_store, &summary_metadata.file_path()).await?;
Ok::<Bytes, anyhow::Error>(summary_data)
}
})
.boxed();

stream
.buffer_unordered(self.concurrency)
.try_fold(Vec::new(), |mut acc, summary_data| async move {
let summary_result: Result<Vec<CertifiedCheckpointSummary>, anyhow::Error> =
make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
SUMMARY_FILE_MAGIC,
summary_data.reader(),
)
.map(|summary_iter| summary_iter.collect::<Vec<_>>());

match summary_result {
Ok(summaries) => {
acc.extend(summaries);
Ok(acc)
}
Err(e) => Err(e),
}
})
.await
}

/// Load checkpoints+txns+effects from archive into the input store `S` for the given
/// checkpoint range. If latest available checkpoint in archive is older than the start of the
/// input range then this call fails with an error otherwise we load as many checkpoints as
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ impl std::fmt::Display for DataDownloadError {

#[derive(Debug, Error)]
pub enum IndexerError {
#[error("Indexer failed to read from archives store with error: `{0}`")]
ArchiveReaderError(String),

#[error("Indexer failed to convert timestamp to NaiveDateTime with error: `{0}`")]
DateTimeParsingError(String),

Expand Down
25 changes: 22 additions & 3 deletions crates/sui-indexer/src/restorer/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@
use std::num::NonZeroUsize;

use prometheus::Registry;
use sui_types::digests::CheckpointDigest;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

use crate::errors::IndexerError;
use crate::types::IndexerResult;

pub async fn read_next_checkpoint_after_epoch(
#[derive(Clone, Debug)]
pub struct RestoreCheckpointInfo {
pub next_checkpoint_after_epoch: u64,
pub chain_identifier: CheckpointDigest,
}

pub async fn read_restore_checkpoint_info(
cred_path: String,
archive_bucket: Option<String>,
epoch: u64,
) -> IndexerResult<u64> {
) -> IndexerResult<RestoreCheckpointInfo> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
Expand All @@ -39,5 +47,16 @@ pub async fn read_next_checkpoint_after_epoch(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
Ok(next_checkpoint_after_epoch)
let cp_summaries = archive_reader
.get_summaries_for_list_no_verify(vec![0])
.await
.map_err(|e| IndexerError::ArchiveReaderError(format!("Failed to get summaries: {}", e)))?;
let first_cp = cp_summaries
.first()
.ok_or_else(|| IndexerError::ArchiveReaderError("No checkpoint found".to_string()))?;
let chain_identifier = *first_cp.digest();
Ok(RestoreCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
})
}
52 changes: 32 additions & 20 deletions crates/sui-indexer/src/restorer/formal_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use sui_types::accumulator::Accumulator;
use crate::config::RestoreConfig;
use crate::errors::IndexerError;
use crate::handlers::TransactionObjectChangesToCommit;
use crate::restorer::archives::read_next_checkpoint_after_epoch;
use crate::restorer::archives::{read_restore_checkpoint_info, RestoreCheckpointInfo};
use crate::store::{indexer_store::IndexerStore, PgIndexerStore};
use crate::types::IndexedObject;
use crate::types::{IndexedCheckpoint, IndexedObject};

pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
Expand All @@ -37,7 +37,6 @@ pub struct IndexerFormalSnapshotRestorer {
store: PgIndexerStore,
reader: StateSnapshotReaderV1,
restore_config: RestoreConfig,
next_checkpoint_after_epoch: u64,
}

impl IndexerFormalSnapshotRestorer {
Expand Down Expand Up @@ -89,17 +88,9 @@ impl IndexerFormalSnapshotRestorer {
restore_config.start_epoch
);

let next_checkpoint_after_epoch = read_next_checkpoint_after_epoch(
restore_config.gcs_cred_path.clone(),
Some(restore_config.gcs_archive_bucket.clone()),
restore_config.start_epoch,
)
.await?;

Ok(Self {
store,
reader,
next_checkpoint_after_epoch,
restore_config: restore_config.clone(),
})
}
Expand All @@ -125,6 +116,8 @@ impl IndexerFormalSnapshotRestorer {
info!("Finished restoring move objects");
self.restore_display_table().await?;
info!("Finished restoring display table");
self.restore_cp_watermark_and_chain_id().await?;
info!("Finished restoring checkpoint info");
Ok(())
}

Expand Down Expand Up @@ -161,7 +154,6 @@ impl IndexerFormalSnapshotRestorer {
let store_clone = self.store.clone();
let bar_clone = move_object_progress_bar.clone();
let restore_config = self.restore_config.clone();
let next_checkpoint_after_epoch = self.next_checkpoint_after_epoch;

let restore_task = task::spawn(async move {
let _permit = sema_limit_clone.acquire().await.unwrap();
Expand All @@ -186,14 +178,11 @@ impl IndexerFormalSnapshotRestorer {
for object in obj_iter {
match object {
LiveObject::Normal(obj) => {
if !obj.is_package() {
let indexed_object = IndexedObject::from_object(
next_checkpoint_after_epoch,
obj,
None,
);
move_objects.push(indexed_object);
}
// TODO: placeholder values for df_info and checkpoint_seq_num,
// will clean it up when the columne cleanup is done.
let indexed_object =
IndexedObject::from_object(0, obj, None);
move_objects.push(indexed_object);
}
LiveObject::Wrapped(_) => {}
}
Expand Down Expand Up @@ -269,4 +258,27 @@ impl IndexerFormalSnapshotRestorer {
self.store.restore_display(bytes).await?;
Ok(())
}

async fn restore_cp_watermark_and_chain_id(&self) -> Result<(), IndexerError> {
let restore_checkpoint_info = read_restore_checkpoint_info(
self.restore_config.gcs_cred_path.clone(),
Some(self.restore_config.gcs_archive_bucket.clone()),
self.restore_config.start_epoch,
)
.await?;
let RestoreCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
} = restore_checkpoint_info;
self.store
.persist_chain_identifier(chain_identifier.into_inner().to_vec())
.await?;
assert!(next_checkpoint_after_epoch > 0);
let last_cp = IndexedCheckpoint {
sequence_number: next_checkpoint_after_epoch - 1,
..Default::default()
};
self.store.persist_checkpoints(vec![last_cp]).await?;
Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
checkpoints: Vec<IndexedCheckpoint>,
) -> Result<(), IndexerError>;

async fn persist_chain_identifier(
&self,
checkpoint_digest: Vec<u8>,
) -> Result<(), IndexerError>;

async fn persist_transactions(
&self,
transactions: Vec<IndexedTransaction>,
Expand Down
41 changes: 24 additions & 17 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,7 @@ impl PgIndexerStore {
let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())
.await?;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
let checkpoint_digest =
first_checkpoint.checkpoint_digest.into_inner().to_vec();
diesel::insert_into(chain_identifier::table)
.values(StoredChainIdentifier { checkpoint_digest })
.on_conflict_do_nothing()
.execute(conn)
.await
.map_err(IndexerError::from)
.context("failed to write to chain_identifier table")?;
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await?;
self.persist_chain_identifier(checkpoint_digest).await?;
}
let guard = self
.metrics
Expand Down Expand Up @@ -2137,6 +2121,29 @@ impl IndexerStore for PgIndexerStore {
.await?;
Ok(())
}

async fn persist_chain_identifier(
&self,
checkpoint_digest: Vec<u8>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(chain_identifier::table)
.values(StoredChainIdentifier { checkpoint_digest })
.on_conflict_do_nothing()
.execute(conn)
.await
.map_err(IndexerError::from)
.context("failed to write to chain_identifier table")?;
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await?;
Ok(())
}
}

/// Construct deleted objects and mutated objects to commit.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sui_types::transaction::SenderSignedData;

pub type IndexerResult<T> = Result<T, IndexerError>;

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct IndexedCheckpoint {
pub sequence_number: u64,
pub checkpoint_digest: CheckpointDigest,
Expand Down
Loading