Skip to content

[3/5] User data export: fix region snapshot replacement step saga #8557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 195 additions & 52 deletions nexus/src/app/sagas/region_snapshot_replacement_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ use crate::app::db::datastore::VolumeWithTarget;
use crate::app::sagas::declare_saga_actions;
use crate::app::{authn, authz, db};
use nexus_db_lookup::LookupPath;
use nexus_db_model::UserDataExport;
use nexus_db_model::VmmState;
use nexus_types::identity::Asset;
use omicron_common::api::external::Error;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::VolumeUuid;
use propolis_client::types::ReplaceResult;
use serde::Deserialize;
use serde::Serialize;
use sled_agent_client::CrucibleOpts;
Expand Down Expand Up @@ -394,63 +395,35 @@ async fn rsrss_replace_snapshot_in_volume_undo(
Ok(())
}

async fn rsrss_notify_upstairs(
async fn notify_potential_propolis_upstairs(
sagactx: NexusActionContext,
disk: nexus_db_model::Disk,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let datastore = osagactx.datastore();
let params = sagactx.saga_params::<Params>()?;
let log = sagactx.user_data().log();

// If the associated volume was deleted, then skip this notification step as
// there is no Upstairs to talk to. Continue with the saga to transition the
// step request to Complete, and then perform the associated clean up.

let volume_replace_snapshot_result = sagactx
.lookup::<VolumeReplaceResult>("volume_replace_snapshot_result")?;
if matches!(
volume_replace_snapshot_result,
VolumeReplaceResult::ExistingVolumeSoftDeleted
| VolumeReplaceResult::ExistingVolumeHardDeleted
) {
return Ok(());
}

// Make an effort to notify a Propolis if one was booted for this volume.
// This is best effort: if there is a failure, this saga will unwind and be
// triggered again for the same request. If there is no Propolis booted for
// this volume, then there's nothing to be done: any future Propolis will
// receive the updated Volume.
//
// Unlike for region replacement, there's no step required here if there
// isn't an active Propolis: any Upstairs created after the snapshot_addr
// is replaced will reference the cloned data.
let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

let Some(disk) = osagactx
.datastore()
.disk_for_volume_id(params.request.volume_id())
.await
.map_err(ActionError::action_failed)?
else {
return Ok(());
};
// Bail out if this disk is not attached to an instance

let Some(instance_id) = disk.runtime().attach_instance_id else {
return Ok(());
};

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);
// Bail if there is no active VMM

let (.., authz_instance) = LookupPath::new(&opctx, osagactx.datastore())
let (.., authz_instance) = LookupPath::new(&opctx, datastore)
.instance_id(instance_id)
.lookup_for(authz::Action::Read)
.await
.map_err(ActionError::action_failed)?;

let instance_and_vmm = osagactx
.datastore()
let instance_and_vmm = datastore
.instance_fetch_with_vmm(&opctx, &authz_instance)
.await
.map_err(ActionError::action_failed)?;
Expand All @@ -463,15 +436,17 @@ async fn rsrss_notify_upstairs(

info!(
log,
"volume associated with disk attached to instance with vmm in \
state {state}";
"volume associated with disk attached to instance with vmm in state \
{state}";
"request id" => %params.request.id,
"volume id" => %params.request.volume_id(),
"disk id" => ?disk.id(),
"instance id" => ?instance_id,
"vmm id" => ?vmm.id,
);

// Bail if the VMM is not in a state to receive requests

match &state {
VmmState::Running | VmmState::Rebooting => {
// Propolis server is ok to receive the volume replacement request.
Expand All @@ -494,8 +469,9 @@ async fn rsrss_notify_upstairs(
}
}

let new_volume_vcr = match osagactx
.datastore()
// Send the new VCR via a replacement request

let new_volume_vcr = match datastore
.volume_get(params.request.volume_id())
.await
.map_err(ActionError::action_failed)?
Expand All @@ -510,7 +486,7 @@ async fn rsrss_notify_upstairs(
};

let instance_lookup =
LookupPath::new(&opctx, osagactx.datastore()).instance_id(instance_id);
LookupPath::new(&opctx, datastore).instance_id(instance_id);

let (vmm, client) = osagactx
.nexus()
Expand Down Expand Up @@ -567,27 +543,110 @@ async fn rsrss_notify_upstairs(
);

match &replace_result {
ReplaceResult::Started => {
propolis_client::types::ReplaceResult::Started => {
// This saga's call just started the replacement
}

ReplaceResult::StartedAlready => {
propolis_client::types::ReplaceResult::StartedAlready => {
// A previous run of this saga (or saga node) started the
// replacement
}

ReplaceResult::CompletedAlready => {
propolis_client::types::ReplaceResult::CompletedAlready => {
// It's done! We see this if the same propolis that received the
// original replace request started and finished the replacement.
}

ReplaceResult::VcrMatches => {
propolis_client::types::ReplaceResult::VcrMatches => {
// This propolis booted with the updated VCR
}

ReplaceResult::Missing => {
// The volume does not contain the region to be replaced. This is an
// error!
propolis_client::types::ReplaceResult::Missing => {
// The volume does not contain the read-only target to be replaced.
// This is an error!
return Err(ActionError::action_failed(String::from(
"saw ReplaceResult::Missing",
)));
}
}

Ok(())
}

async fn notify_pantry_upstairs(
sagactx: NexusActionContext,
pantry_address: SocketAddrV6,
attachment_id: Uuid,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let datastore = osagactx.datastore();
let params = sagactx.saga_params::<Params>()?;
let log = sagactx.user_data().log();

info!(
log,
"volume attached to pantry {pantry_address} with id {attachment_id}";
"request id" => %params.request.id,
"volume id" => %params.request.volume_id(),
);

// Grab the new volume's VCR

let volume_construction_request = match datastore
.volume_get(params.request.volume_id())
.await
.map_err(ActionError::action_failed)?
{
Some(volume) => serde_json::from_str(&volume.data()).map_err(|e| {
ActionError::action_failed(Error::internal_error(&format!(
"failed to deserialize volume {} data: {e}",
volume.id()
)))
})?,

None => {
return Err(ActionError::action_failed(Error::internal_error(
"new volume is gone!",
)));
}
};

let endpoint = format!("http://{}", pantry_address);
let client = crucible_pantry_client::Client::new(&endpoint);

let replace_request = crucible_pantry_client::types::ReplaceRequest {
volume_construction_request,
};

let replace_result = client
.replace(&attachment_id.to_string(), &replace_request)
.await
.map_err(|e| {
ActionError::action_failed(Error::internal_error(&e.to_string()))
})?;

match replace_result.into_inner() {
crucible_pantry_client::types::ReplaceResult::Started => {
// This saga's call just started the replacement
}

crucible_pantry_client::types::ReplaceResult::StartedAlready => {
// A previous run of this saga (or saga node) started the
// replacement
}

crucible_pantry_client::types::ReplaceResult::CompletedAlready => {
// It's done! We see this if the same pantry that received the
// original replace request started and finished the replacement.
}

crucible_pantry_client::types::ReplaceResult::VcrMatches => {
// This pantry booted with the updated VCR
}

crucible_pantry_client::types::ReplaceResult::Missing => {
// The volume does not contain the read-only target to be replaced.
// This is an error!
return Err(ActionError::action_failed(String::from(
"saw ReplaceResult::Missing",
)));
Expand All @@ -597,6 +656,90 @@ async fn rsrss_notify_upstairs(
Ok(())
}

async fn rsrss_notify_upstairs(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

// If the associated volume was deleted, then skip this notification step as
// there is no Upstairs to talk to. Continue with the saga to transition the
// step request to Complete, and then perform the associated clean up.

let volume_replace_snapshot_result = sagactx
.lookup::<VolumeReplaceResult>("volume_replace_snapshot_result")?;
if matches!(
volume_replace_snapshot_result,
VolumeReplaceResult::ExistingVolumeSoftDeleted
| VolumeReplaceResult::ExistingVolumeHardDeleted
) {
return Ok(());
}

// Make an effort to notify an Upstairs if one was constructed for this
// volume. This is best effort: if there is a failure, this saga will unwind
// and be triggered again for the same request. If there is no Upstairs
// constructed for this volume, then there's nothing to be done: any future
// construction will receive the updated Volume.
//
// Unlike for region replacement, there's no step required here if there
// isn't an active Upstairs: any Upstairs created after the snapshot_addr is
// replaced will reference the cloned data.

let maybe_disk = osagactx
.datastore()
.disk_for_volume_id(params.request.volume_id())
.await
.map_err(ActionError::action_failed)?;

let maybe_user_data_export = osagactx
.datastore()
.user_data_export_lookup_by_volume_id(
&opctx,
params.request.volume_id(),
)
.await
.map_err(ActionError::action_failed)?;

if let Some(disk) = maybe_disk {
notify_potential_propolis_upstairs(sagactx, disk).await?;
} else if let Some(record) = maybe_user_data_export {
let (pantry_address, volume_id) = match record.is_live() {
Err(s) => {
// There was an error with a Live user data export that means we
// have to unwind here. This will likely require support
// intervention, as the record is in state Live but does not
// have either a Pantry address or volume id.
return Err(ActionError::action_failed(s.to_string()));
}

Ok(UserDataExport::NotLive) => {
// The user data export is not Live, meaning no notification is
// required.
return Ok(());
}

Ok(UserDataExport::Live { pantry_address, volume_id }) => {
(pantry_address, volume_id)
}
};

notify_pantry_upstairs(
sagactx,
pantry_address,
volume_id.into_untyped_uuid(),
)
.await?;
}

Ok(())
}

async fn rsrss_update_request_record(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
Expand Down
Loading