Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use move_bytecode_utils::module_cache::SyncModuleCache;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::monitored_scope;
use nonempty::NonEmpty;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard};
use prometheus::IntCounter;
Expand Down Expand Up @@ -4031,7 +4032,7 @@ impl AuthorityPerEpochStore {
pub fn process_pending_checkpoint(
&self,
commit_height: CheckpointHeight,
content_info: Vec<(CheckpointSummary, CheckpointContents)>,
content_info: NonEmpty<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult<()> {
let tables = self.tables()?;
// All created checkpoints are inserted in builder_checkpoint_summary in a single batch.
Expand Down
161 changes: 129 additions & 32 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
use crate::state_accumulator::StateAccumulator;
use diffy::create_patch;
use itertools::Itertools;
use mysten_common::{debug_fatal, fatal};
use mysten_metrics::{monitored_future, monitored_scope, MonitoredFutureExt};
use nonempty::NonEmpty;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sui_macros::fail_point;
Expand All @@ -28,6 +30,7 @@ use sui_types::base_types::ConciseableName;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_checkpoint::CheckpointCommitment;
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use tokio::sync::watch;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::consensus_handler::SequencedConsensusTransactionKey;
Expand Down Expand Up @@ -475,7 +478,7 @@ impl CheckpointStore {
?local_contents,
"Local checkpoint fork detected!",
);
panic!(
fatal!(
"Local checkpoint fork detected for sequence number: {}",
local_checkpoint.sequence_number()
);
Expand Down Expand Up @@ -860,6 +863,7 @@ pub struct CheckpointBuilder {
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
notify_aggregator: Arc<Notify>,
last_built: watch::Sender<CheckpointSequenceNumber>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
Expand Down Expand Up @@ -900,6 +904,7 @@ impl CheckpointBuilder {
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
notify_aggregator: Arc<Notify>,
last_built: watch::Sender<CheckpointSequenceNumber>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
Expand All @@ -913,6 +918,7 @@ impl CheckpointBuilder {
accumulator,
output,
notify_aggregator,
last_built,
metrics,
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
Expand Down Expand Up @@ -985,14 +991,27 @@ impl CheckpointBuilder {
checkpoint_commit_height = height,
"Making checkpoint at commit height"
);
if let Err(e) = self

match self
.make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
.await
{
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
return;
Ok(seq) => {
self.last_built.send_if_modified(|cur| {
if seq > *cur {
*cur = seq;
true
} else {
false
}
});
}
Err(e) => {
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
return;
}
}
// ensure that the task can be cancelled at end of epoch, even if no other await yields
// execution.
Expand All @@ -1005,7 +1024,10 @@ impl CheckpointBuilder {
}

#[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
async fn make_checkpoint(&self, pendings: Vec<PendingCheckpointV2>) -> anyhow::Result<()> {
async fn make_checkpoint(
&self,
pendings: Vec<PendingCheckpointV2>,
) -> anyhow::Result<CheckpointSequenceNumber> {
let last_details = pendings.last().unwrap().details().clone();

// Keeps track of the effects that are already included in the current checkpoint.
Expand All @@ -1024,12 +1046,13 @@ impl CheckpointBuilder {
.await?;
sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
}
let new_checkpoint = self
let new_checkpoints = self
.create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
.await?;
self.write_checkpoints(last_details.checkpoint_height, new_checkpoint)
let highest_sequence = *new_checkpoints.last().0.sequence_number();
self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
.await?;
Ok(())
Ok(highest_sequence)
}

// Given the root transactions of a pending checkpoint, resolve the transactions should be included in
Expand Down Expand Up @@ -1153,7 +1176,7 @@ impl CheckpointBuilder {
async fn write_checkpoints(
&self,
height: CheckpointHeight,
new_checkpoints: Vec<(CheckpointSummary, CheckpointContents)>,
new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult {
let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
let mut batch = self.tables.checkpoint_content.batch();
Expand Down Expand Up @@ -1283,7 +1306,7 @@ impl CheckpointBuilder {
&self,
all_effects: Vec<TransactionEffects>,
details: &PendingCheckpointInfo,
) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
let total = all_effects.len();
let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
Expand Down Expand Up @@ -1519,7 +1542,7 @@ impl CheckpointBuilder {
checkpoints.push((summary, contents));
}

Ok(checkpoints)
Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
}

fn get_epoch_total_gas_cost(
Expand Down Expand Up @@ -2192,17 +2215,40 @@ pub trait CheckpointServiceNotify {
fn notify_checkpoint(&self) -> SuiResult;
}

/// This is a service used to communicate with other pieces of sui(for ex. authority)
enum CheckpointServiceState {
Unstarted((CheckpointBuilder, CheckpointAggregator)),
Started,
}

impl CheckpointServiceState {
fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
let mut state = CheckpointServiceState::Started;
std::mem::swap(self, &mut state);

match state {
CheckpointServiceState::Unstarted((builder, aggregator)) => (builder, aggregator),
CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
}
}
}

pub struct CheckpointService {
tables: Arc<CheckpointStore>,
notify_builder: Arc<Notify>,
notify_aggregator: Arc<Notify>,
last_signature_index: Mutex<u64>,
// A notification for the current highest built sequence number.
highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
// The highest sequence number that had already been built at the time CheckpointService
// was constructed
highest_previously_built_seq: CheckpointSequenceNumber,
metrics: Arc<CheckpointMetrics>,
state: Mutex<CheckpointServiceState>,
}

impl CheckpointService {
pub fn spawn(
/// Constructs a new CheckpointService in an un-started state.
pub fn build(
state: Arc<AuthorityState>,
checkpoint_store: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
Expand All @@ -2213,14 +2259,29 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (Arc<Self>, JoinSet<()> /* Handle to tasks */) {
) -> Arc<Self> {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
let notify_builder = Arc::new(Notify::new());
let notify_aggregator = Arc::new(Notify::new());

let mut tasks = JoinSet::new();
let highest_previously_built_seq = epoch_store
.last_built_checkpoint_builder_summary()
.expect("epoch should not have ended")
.map(|s| s.summary.sequence_number)
.unwrap_or(0);

let (highest_currently_built_seq_tx, _) = watch::channel(highest_previously_built_seq);

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
certified_checkpoint_output,
state.clone(),
metrics.clone(),
);

let builder = CheckpointBuilder::new(
state.clone(),
Expand All @@ -2231,36 +2292,71 @@ impl CheckpointService {
accumulator,
checkpoint_output,
notify_aggregator.clone(),
highest_currently_built_seq_tx.clone(),
metrics.clone(),
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);
tasks.spawn(monitored_future!(builder.run()));

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
certified_checkpoint_output,
state.clone(),
metrics.clone(),
);
tasks.spawn(monitored_future!(aggregator.run()));

let last_signature_index = epoch_store
.get_last_checkpoint_signature_index()
.expect("should not cross end of epoch");
let last_signature_index = Mutex::new(last_signature_index);

let service = Arc::new(Self {
Arc::new(Self {
tables: checkpoint_store,
notify_builder,
notify_aggregator,
last_signature_index,
highest_currently_built_seq_tx,
highest_previously_built_seq,
metrics,
});
state: Mutex::new(CheckpointServiceState::Unstarted((builder, aggregator))),
})
}

/// Starts the CheckpointService.
///
/// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
/// been built before the most recent restart. You can think of this as a WAL replay
/// operation. Upon startup, we may have a number of consensus commits and resulting
/// checkpoints that were built but not committed to disk. We want to reprocess the
/// commits and rebuild the checkpoints before starting normal operation.
pub async fn spawn(&self) -> JoinSet<()> {
let mut tasks = JoinSet::new();

let (builder, aggregator) = self.state.lock().take_unstarted();
tasks.spawn(monitored_future!(builder.run()));
tasks.spawn(monitored_future!(aggregator.run()));

(service, tasks)
loop {
if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
.await
.is_ok()
{
break;
} else {
debug_fatal!("Still waiting for checkpoints to be rebuilt");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it helpful to crash here even if just in debug mode? no invariant has been violated, it seems like it should be a warn! at most?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is - while testing DQ this catches various cases where the rebuilding process got stuck.

}
}

tasks
}
}

impl CheckpointService {
/// Waits until all checkpoints had been built before the node restarted
/// are rebuilt.
pub async fn wait_for_rebuilt_checkpoints(&self) {
let highest_previously_built_seq = self.highest_previously_built_seq;
let mut rx = self.highest_currently_built_seq_tx.subscribe();
loop {
let highest_currently_built_seq = *rx.borrow_and_update();
if highest_currently_built_seq >= highest_previously_built_seq {
break;
}
rx.changed().await.unwrap();
}
}

#[cfg(test)]
Expand Down Expand Up @@ -2506,7 +2602,7 @@ mod tests {
&epoch_store,
));

let (checkpoint_service, _tasks) = CheckpointService::spawn(
let checkpoint_service = CheckpointService::build(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand All @@ -2518,6 +2614,7 @@ mod tests {
3,
100_000,
);
let _tasks = checkpoint_service.spawn().await;

checkpoint_service
.write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Duration};

use fastcrypto::traits::KeyPair;
use futures::FutureExt;
use mysten_metrics::RegistryService;
use prometheus::Registry;
use sui_swarm_config::network_config_builder::ConfigBuilder;
Expand Down Expand Up @@ -34,7 +35,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
let checkpoint_service = CheckpointService::build(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
Expand All @@ -46,6 +47,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
3,
100_000,
);
checkpoint_service.spawn().now_or_never().unwrap();
checkpoint_service
}

Expand Down
8 changes: 5 additions & 3 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
let checkpoint_service = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
Expand Down Expand Up @@ -1374,6 +1374,8 @@ impl SuiNode {
)
.await;

let checkpoint_service_tasks = checkpoint_service.spawn().await;

if epoch_store.authenticator_state_enabled() {
Self::start_jwk_updater(
config,
Expand Down Expand Up @@ -1405,7 +1407,7 @@ impl SuiNode {
state_sync_handle: state_sync::Handle,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, JoinSet<()>) {
) -> Arc<CheckpointService> {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand All @@ -1430,7 +1432,7 @@ impl SuiNode {
let max_checkpoint_size_bytes =
epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;

CheckpointService::spawn(
CheckpointService::build(
state.clone(),
checkpoint_store,
epoch_store,
Expand Down
Loading