Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 98 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ pub struct AuthorityMetrics {
batch_size: Histogram,

authority_state_handle_transaction_latency: Histogram,
authority_state_handle_transaction_v2_latency: Histogram,

execute_certificate_latency_single_writer: Histogram,
execute_certificate_latency_shared_object: Histogram,
await_transaction_latency: Histogram,

execute_certificate_with_effects_latency: Histogram,
internal_execution_latency: Histogram,
Expand Down Expand Up @@ -434,8 +436,22 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
authority_state_handle_transaction_v2_latency: register_histogram_with_registry!(
"authority_state_handle_transaction_v2_latency",
"Latency of handling transactions with v2",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execute_certificate_latency_single_writer,
execute_certificate_latency_shared_object,
await_transaction_latency: register_histogram_with_registry!(
"await_transaction_latency",
"Latency of awaiting user transaction execution, including waiting for inputs",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execute_certificate_with_effects_latency: register_histogram_with_registry!(
"authority_state_execute_certificate_with_effects_latency",
"Latency of executing certificates with effects, including waiting for inputs",
Expand Down Expand Up @@ -839,17 +855,11 @@ impl AuthorityState {
self.checkpoint_store.get_epoch_state_commitments(epoch)
}

/// This is a private method and should be kept that way. It doesn't check whether
/// the provided transaction is a system transaction, and hence can only be called internally.
#[instrument(level = "trace", skip_all)]
async fn handle_transaction_impl(
fn handle_transaction_deny_checks(
&self,
transaction: VerifiedTransaction,
transaction: &VerifiedTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransaction> {
// Ensure that validator cannot reconfigure while we are signing the tx
let _execution_lock = self.execution_lock_for_signing().await;

) -> SuiResult<CheckedInputObjects> {
let tx_digest = transaction.digest();
let tx_data = transaction.data().transaction_data();

Expand Down Expand Up @@ -903,6 +913,23 @@ impl AuthorityState {
)?;
}

Ok(checked_input_objects)
}

/// This is a private method and should be kept that way. It doesn't check whether
/// the provided transaction is a system transaction, and hence can only be called internally.
#[instrument(level = "trace", skip_all)]
async fn handle_transaction_impl(
&self,
transaction: VerifiedTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransaction> {
// Ensure that validator cannot reconfigure while we are signing the tx
let _execution_lock = self.execution_lock_for_signing().await;

let checked_input_objects =
self.handle_transaction_deny_checks(&transaction, epoch_store)?;

let owned_objects = checked_input_objects.inner().filter_owned_objects();

let signed_transaction = VerifiedSignedTransaction::new(
Expand Down Expand Up @@ -974,6 +1001,51 @@ impl AuthorityState {
}
}

#[instrument(level = "trace", skip_all)]
pub async fn handle_transaction_v2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be the right naming anymore since we are only doing various checks here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an alternate suggestion for name?

I think actually the existing handle_transaction was poorly named and should have been called sign_transaction. But for the new one I'm adding, handle is more apt because we really are handling the full transaction e2e: sending an unsigned tx through consensus, executing it, and returning the effects. I didn't think temporary confusion of naming overload would be too bad, bc eventually we plan to remove the old version.

&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
transaction: VerifiedTransaction,
) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
let tx_digest = *transaction.digest();
debug!("handle_transaction_v2");

// Ensure an idempotent answer.
let tx_status = self.get_transaction_status(&tx_digest, epoch_store)?;
if tx_status.is_some() {
return Ok(tx_status);
}

let _metrics_guard = self
.metrics
.authority_state_handle_transaction_v2_latency
.start_timer();
self.metrics.tx_orders.inc();

// The should_accept_user_certs check here is best effort, because
// between a validator signs a tx and a cert is formed, the validator
// could close the window.
if !epoch_store
.get_reconfig_state_read_lock_guard()
.should_accept_user_certs()
{
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

match self.handle_transaction_impl(transaction, epoch_store).await {
// TODO(fastpath): We don't actually need the signed transaction here but just call
// into this function to acquire locks. Consider refactoring to avoid the extra work.
Ok(_signed) => Ok(None),
// It happens frequently that while we are checking the validity of the transaction, it
// has just been executed.
// In that case, we could still return Ok to avoid showing confusing errors.
Err(e) => self
.get_transaction_status(&tx_digest, epoch_store)?
.ok_or(e)
.map(Some),
}
}

pub fn check_system_overload_at_signing(&self) -> bool {
self.config
.authority_overload_config
Expand Down Expand Up @@ -1127,7 +1199,21 @@ impl AuthorityState {
self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
}

self.notify_read_effects(certificate).await
self.notify_read_effects(*certificate.digest()).await
}

/// Awaits the effects of executing a user transaction.
///
/// Relies on consensus to enqueue the transaction for execution.
pub async fn await_transaction_effects(
&self,
digest: TransactionDigest,
) -> SuiResult<TransactionEffects> {
let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
debug!("await_transaction");

// TODO(fastpath): Add handling for transactions rejected by Mysticeti fast path.
self.notify_read_effects(digest).await
}

/// Internal logic to execute a certificate.
Expand Down Expand Up @@ -1219,10 +1305,10 @@ impl AuthorityState {

pub async fn notify_read_effects(
&self,
certificate: &VerifiedCertificate,
digest: TransactionDigest,
) -> SuiResult<TransactionEffects> {
self.get_transaction_cache_reader()
.notify_read_executed_effects(&[*certificate.digest()])
.notify_read_executed_effects(&[digest])
.await
.map(|mut r| r.pop().expect("must return correct number of effects"))
}
Expand Down
13 changes: 4 additions & 9 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1965,17 +1965,12 @@ impl AuthorityPerEpochStore {
.any(|processed| processed))
}

/// Check whether any certificates were processed by consensus.
/// This handles multiple certificates at once.
pub fn is_all_tx_certs_consensus_message_processed<'a>(
/// Returns true if all messages with the given keys were processed by consensus.
pub fn all_external_consensus_messages_processed(
&self,
certificates: impl Iterator<Item = &'a VerifiedCertificate>,
keys: impl Iterator<Item = ConsensusTransactionKey>,
) -> SuiResult<bool> {
let keys = certificates.map(|cert| {
SequencedConsensusTransactionKey::External(ConsensusTransactionKey::Certificate(
*cert.digest(),
))
});
let keys = keys.map(SequencedConsensusTransactionKey::External);
Ok(self
.check_consensus_messages_processed(keys)?
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub async fn enqueue_all_and_execute_all(
);
let mut output = Vec::new();
for cert in certificates {
let effects = authority.notify_read_effects(&cert).await?;
let effects = authority.notify_read_effects(*cert.digest()).await?;
output.push(effects);
}
Ok(output)
Expand Down
Loading
Loading