Skip to content

Support Bundles: Chunked upload to Sled Agent #8559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 203 additions & 24 deletions nexus/src/app/background/tasks/support_bundle_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use sha2::{Digest, Sha256};
use slog_error_chain::InlineErrorChain;
use std::future::Future;
use std::io::Write;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
Expand All @@ -59,6 +60,10 @@ use zip::write::FullFileOptions;
// rather than "/tmp", which would keep this collected data in-memory.
const TEMPDIR: &str = "/var/tmp";

// The size of piece of a support bundle to transfer to the sled agent
// within a single streaming request.
const CHUNK_SIZE: NonZeroU64 = NonZeroU64::new(1024 * 1024 * 1024).unwrap();

fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle {
authz::SupportBundle::new(
authz::FLEET,
Expand All @@ -68,10 +73,22 @@ fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle {
}

// Specifies the data to be collected within the Support Bundle.
#[derive(Clone, Default)]
#[derive(Clone)]
struct BundleRequest {
// If "false": Skip collecting host-specific info from each sled.
skip_sled_info: bool,

// The size of chunks to use when transferring a bundle from Nexus
// to a sled agent.
//
// Typically, this is CHUNK_SIZE, but can be modified for testing.
transfer_chunk_size: NonZeroU64,
}

impl Default for BundleRequest {
fn default() -> Self {
Self { skip_sled_info: false, transfer_chunk_size: CHUNK_SIZE }
}
}

// Result of asking a sled agent to clean up a bundle
Expand Down Expand Up @@ -390,6 +407,7 @@ impl SupportBundleCollector {
opctx: opctx.child(std::collections::BTreeMap::new()),
request: request.clone(),
bundle: bundle.clone(),
transfer_chunk_size: request.transfer_chunk_size,
});

let authz_bundle = authz_support_bundle_from_id(bundle.id.into());
Expand Down Expand Up @@ -434,6 +452,7 @@ struct BundleCollection {
opctx: OpContext,
request: BundleRequest,
bundle: SupportBundle,
transfer_chunk_size: NonZeroU64,
}

impl BundleCollection {
Expand All @@ -445,6 +464,20 @@ impl BundleCollection {
// as it's being collected.
let dir = tempdir_in(TEMPDIR)?;

let report = self.collect_bundle_locally(&dir).await?;
self.store_bundle_on_sled(dir).await?;
Ok(report)
}

// Create the support bundle, placing the contents into a user-specified
// directory.
//
// Does not attempt to convert the contents into a zipfile, nor send them
// to any durable storage.
async fn collect_bundle_locally(
self: &Arc<Self>,
dir: &Utf8TempDir,
) -> anyhow::Result<SupportBundleCollectionReport> {
let mut collection = Box::pin(self.collect_bundle_as_file(&dir));

// We periodically check the state of the support bundle - if a user
Expand All @@ -456,7 +489,7 @@ impl BundleCollection {
work_duration,
);

let report = loop {
loop {
tokio::select! {
// Timer fired mid-collection - let's check if we should stop.
_ = yield_interval.tick() => {
Expand Down Expand Up @@ -487,15 +520,24 @@ impl BundleCollection {
"Bundle Collection completed";
"bundle" => %self.bundle.id
);
break report?;
return report;
},
}
};
}
}

async fn store_bundle_on_sled(
&self,
dir: Utf8TempDir,
) -> anyhow::Result<()> {
// Create the zipfile as a temporary file
let mut zipfile = tokio::fs::File::from_std(bundle_to_zipfile(&dir)?);
let total_len = zipfile.metadata().await?.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way John pointed this out on one of my previous PRs but you can SeekFrom::End(0) (since we later seek back to the beginning on line 503) to get the length without having to do an extra stat call. Either way works, just thought I would mention it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will end up being roughly equivalent; it's a "stat now, seek later" or "seek now, seek later", right? Either way, we need a syscall to get the length


// Verify the hash locally before we send it over the network
// Collect the hash locally before we send it over the network
//
// We'll use this later during finalization to confirm the bundle
// has been stored successfully.
zipfile.seek(SeekFrom::Start(0)).await?;
let hash = sha2_hash(&mut zipfile).await?;

Expand All @@ -515,27 +557,78 @@ impl BundleCollection {
)
.await?;

// Stream the zipfile to the sled where it should be kept
zipfile.seek(SeekFrom::Start(0)).await?;
let file_access = hyper_staticfile::vfs::TokioFileAccess::new(zipfile);
let file_stream =
hyper_staticfile::util::FileBytesStream::new(file_access);
let body =
reqwest::Body::wrap(hyper_staticfile::Body::Full(file_stream));
let zpool = ZpoolUuid::from(self.bundle.zpool_id);
let dataset = DatasetUuid::from(self.bundle.dataset_id);
let support_bundle = SupportBundleUuid::from(self.bundle.id);

// Tell this sled to create the bundle.
let creation_result = sled_client
.support_bundle_start_creation(&zpool, &dataset, &support_bundle)
.await
.with_context(|| "Support bundle failed to start creation")?;

if matches!(
creation_result.state,
sled_agent_client::types::SupportBundleState::Complete
) {
// Early exit case: the bundle was already created -- we must have either
// crashed or failed between "finalizing" and "writing to the database that we
// finished".
info!(&self.log, "Support bundle was already collected"; "bundle" => %self.bundle.id);
return Ok(());
}
info!(&self.log, "Support bundle creation started"; "bundle" => %self.bundle.id);

let mut offset = 0;
while offset < total_len {
// Stream the zipfile to the sled where it should be kept
let mut file = zipfile
.try_clone()
.await
.with_context(|| "Failed to clone zipfile")?;
file.seek(SeekFrom::Start(offset)).await.with_context(|| {
format!("Failed to seek to offset {offset} / {total_len} within zipfile")
})?;

// Only stream at most "transfer_chunk_size" bytes at once
let remaining = std::cmp::min(
self.transfer_chunk_size.get(),
total_len - offset,
);
let limited_file = file.take(remaining);
let stream = tokio_util::io::ReaderStream::new(limited_file);
let body = reqwest::Body::wrap_stream(stream);

info!(
&self.log,
"Streaming bundle chunk";
"bundle" => %self.bundle.id,
"offset" => offset,
"length" => remaining,
);

sled_client.support_bundle_transfer(
&zpool, &dataset, &support_bundle, offset, body
).await.with_context(|| {
format!("Failed to transfer bundle: {remaining}@{offset} of {total_len} to sled")
})?;

offset += self.transfer_chunk_size.get();
}

sled_client
.support_bundle_create(
&ZpoolUuid::from(self.bundle.zpool_id),
&DatasetUuid::from(self.bundle.dataset_id),
&SupportBundleUuid::from(self.bundle.id),
.support_bundle_finalize(
&zpool,
&dataset,
&support_bundle,
&hash.to_string(),
body,
)
.await?;
.await
.with_context(|| "Failed to finalize bundle")?;

// Returning from this method should drop all temporary storage
// allocated locally for this support bundle.
Ok(report)
Ok(())
}

// Perform the work of collecting the support bundle into a temporary directory
Expand Down Expand Up @@ -795,7 +888,7 @@ impl BackgroundTask for SupportBundleCollector {
Ok(report) => collection_report = Some(report),
Err(err) => {
collection_err =
Some(json!({ "collect_error": err.to_string() }))
Some(json!({ "collect_error": InlineErrorChain::new(err.as_ref()).to_string() }))
}
};

Expand Down Expand Up @@ -1076,7 +1169,9 @@ async fn save_sp_dumps(
mod test {
use super::*;

use crate::app::support_bundles::SupportBundleQueryType;
use camino_tempfile::tempdir;
use http_body_util::BodyExt;
use nexus_db_model::PhysicalDisk;
use nexus_db_model::PhysicalDiskKind;
use nexus_db_model::RendezvousDebugDataset;
Expand Down Expand Up @@ -1356,6 +1451,7 @@ mod test {
// NOTE: The support bundle querying interface isn't supported on
// the simulated sled agent (yet?) so we're skipping this step.
skip_sled_info: true,
..Default::default()
};
let report = collector
.collect_bundle(&opctx, &request)
Expand All @@ -1382,6 +1478,85 @@ mod test {
assert!(report.is_none());
}

#[nexus_test(server = crate::Server)]
async fn test_collect_chunked(cptestctx: &ControlPlaneTestContext) {
let nexus = &cptestctx.server.server_context().nexus;
let datastore = nexus.datastore();
let resolver = nexus.resolver();
let opctx = OpContext::for_tests(
cptestctx.logctx.log.clone(),
datastore.clone(),
);

// Before we can create any bundles, we need to create the
// space for them to be provisioned.
let _datasets =
TestDataset::setup(cptestctx, &datastore, &opctx, 1).await;

let bundle = datastore
.support_bundle_create(&opctx, "For collection testing", nexus.id())
.await
.expect("Couldn't allocate a support bundle");
assert_eq!(bundle.state, SupportBundleState::Collecting);

let collector = SupportBundleCollector::new(
datastore.clone(),
resolver.clone(),
false,
nexus.id(),
);

// The bundle collection should complete successfully.
//
// We're going to use a really small chunk size here to force the bundle
// to get split up.
let request = BundleRequest {
skip_sled_info: true,
transfer_chunk_size: NonZeroU64::new(16).unwrap(),
};

let report = collector
.collect_bundle(&opctx, &request)
.await
.expect("Collection should have succeeded under test")
.expect("Collecting the bundle should have generated a report");
assert_eq!(report.bundle, bundle.id.into());
assert!(report.listed_in_service_sleds);
assert!(report.listed_sps);
assert!(report.activated_in_db_ok);

let observed_bundle = datastore
.support_bundle_get(&opctx, bundle.id.into())
.await
.expect("Bundle should definitely be in db by this point");
assert_eq!(observed_bundle.state, SupportBundleState::Active);

// Download a file from the bundle, to verify that it was trasnferred
// successfully.
let head = false;
let range = None;
let response = nexus
.support_bundle_download(
&opctx,
observed_bundle.id.into(),
SupportBundleQueryType::Path {
file_path: "bundle_id.txt".to_string(),
},
head,
range,
)
.await
.unwrap();

// Read the body to bytes, then convert to string
let body_bytes =
response.into_body().collect().await.unwrap().to_bytes();
let body_string = String::from_utf8(body_bytes.to_vec()).unwrap();

// Verify the content matches the bundle ID
assert_eq!(body_string, observed_bundle.id.to_string());
}

#[nexus_test(server = crate::Server)]
async fn test_collect_many(cptestctx: &ControlPlaneTestContext) {
let nexus = &cptestctx.server.server_context().nexus;
Expand Down Expand Up @@ -1415,7 +1590,8 @@ mod test {
);

// Each time we call "collect_bundle", we collect a SINGLE bundle.
let request = BundleRequest { skip_sled_info: true };
let request =
BundleRequest { skip_sled_info: true, ..Default::default() };
let report = collector
.collect_bundle(&opctx, &request)
.await
Expand Down Expand Up @@ -1553,7 +1729,8 @@ mod test {
false,
nexus.id(),
);
let request = BundleRequest { skip_sled_info: true };
let request =
BundleRequest { skip_sled_info: true, ..Default::default() };
let report = collector
.collect_bundle(&opctx, &request)
.await
Expand Down Expand Up @@ -1689,7 +1866,8 @@ mod test {
false,
nexus.id(),
);
let request = BundleRequest { skip_sled_info: true };
let request =
BundleRequest { skip_sled_info: true, ..Default::default() };
let report = collector
.collect_bundle(&opctx, &request)
.await
Expand Down Expand Up @@ -1768,7 +1946,8 @@ mod test {
false,
nexus.id(),
);
let request = BundleRequest { skip_sled_info: true };
let request =
BundleRequest { skip_sled_info: true, ..Default::default() };
let report = collector
.collect_bundle(&opctx, &request)
.await
Expand Down
Loading
Loading