Skip to content

Commit 248e7ed

Browse files
authored
Support Bundles: Chunked upload to Sled Agent (#8559)
Support bundle collection in Nexus previously transferred the entire bundle as a streamed file from Nexus -> Sled Agent. Unfortunately, dropshot has upper bounds for streaming file sizes, and as we hit the 2 GiB boundary, we noticed that collection started to fail during transfers. This PR modifies how Nexus transfers bundles to Sled Agents: it uses an incremental upload scheme, transferring up to 1 GiB at a time, at a requested offset. This should allow for incremental transfer of files which are much larger than 2 GiB, and makes it easier for us to "re-do" failed operations later. (Changing behavior on partial failure would be a welcome change, but is out-of-scope for this PR, which tries to only modify the behavior from a "oneshot" upload to a "start, transfer, finalize" API). Added tests for: - [x] Chunked files in the bg task ("are the requests split successfully") - [x] Chunked files in the bundle storage implementation ("are we putting things where we should") - [x] New lifetime quirks ("does start_creation restart the file? What about finalize after completion?") Fixes #8556
1 parent c65212d commit 248e7ed

File tree

9 files changed

+1109
-282
lines changed

9 files changed

+1109
-282
lines changed

nexus/src/app/background/tasks/support_bundle_collector.rs

Lines changed: 203 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use sha2::{Digest, Sha256};
4646
use slog_error_chain::InlineErrorChain;
4747
use std::future::Future;
4848
use std::io::Write;
49+
use std::num::NonZeroU64;
4950
use std::sync::Arc;
5051
use tokio::io::AsyncReadExt;
5152
use tokio::io::AsyncSeekExt;
@@ -59,6 +60,10 @@ use zip::write::FullFileOptions;
5960
// rather than "/tmp", which would keep this collected data in-memory.
6061
const TEMPDIR: &str = "/var/tmp";
6162

63+
// The size of piece of a support bundle to transfer to the sled agent
64+
// within a single streaming request.
65+
const CHUNK_SIZE: NonZeroU64 = NonZeroU64::new(1024 * 1024 * 1024).unwrap();
66+
6267
fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle {
6368
authz::SupportBundle::new(
6469
authz::FLEET,
@@ -68,10 +73,22 @@ fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle {
6873
}
6974

7075
// Specifies the data to be collected within the Support Bundle.
71-
#[derive(Clone, Default)]
76+
#[derive(Clone)]
7277
struct BundleRequest {
7378
// If "false": Skip collecting host-specific info from each sled.
7479
skip_sled_info: bool,
80+
81+
// The size of chunks to use when transferring a bundle from Nexus
82+
// to a sled agent.
83+
//
84+
// Typically, this is CHUNK_SIZE, but can be modified for testing.
85+
transfer_chunk_size: NonZeroU64,
86+
}
87+
88+
impl Default for BundleRequest {
89+
fn default() -> Self {
90+
Self { skip_sled_info: false, transfer_chunk_size: CHUNK_SIZE }
91+
}
7592
}
7693

7794
// Result of asking a sled agent to clean up a bundle
@@ -390,6 +407,7 @@ impl SupportBundleCollector {
390407
opctx: opctx.child(std::collections::BTreeMap::new()),
391408
request: request.clone(),
392409
bundle: bundle.clone(),
410+
transfer_chunk_size: request.transfer_chunk_size,
393411
});
394412

395413
let authz_bundle = authz_support_bundle_from_id(bundle.id.into());
@@ -434,6 +452,7 @@ struct BundleCollection {
434452
opctx: OpContext,
435453
request: BundleRequest,
436454
bundle: SupportBundle,
455+
transfer_chunk_size: NonZeroU64,
437456
}
438457

439458
impl BundleCollection {
@@ -445,6 +464,20 @@ impl BundleCollection {
445464
// as it's being collected.
446465
let dir = tempdir_in(TEMPDIR)?;
447466

467+
let report = self.collect_bundle_locally(&dir).await?;
468+
self.store_bundle_on_sled(dir).await?;
469+
Ok(report)
470+
}
471+
472+
// Create the support bundle, placing the contents into a user-specified
473+
// directory.
474+
//
475+
// Does not attempt to convert the contents into a zipfile, nor send them
476+
// to any durable storage.
477+
async fn collect_bundle_locally(
478+
self: &Arc<Self>,
479+
dir: &Utf8TempDir,
480+
) -> anyhow::Result<SupportBundleCollectionReport> {
448481
let mut collection = Box::pin(self.collect_bundle_as_file(&dir));
449482

450483
// We periodically check the state of the support bundle - if a user
@@ -456,7 +489,7 @@ impl BundleCollection {
456489
work_duration,
457490
);
458491

459-
let report = loop {
492+
loop {
460493
tokio::select! {
461494
// Timer fired mid-collection - let's check if we should stop.
462495
_ = yield_interval.tick() => {
@@ -487,15 +520,24 @@ impl BundleCollection {
487520
"Bundle Collection completed";
488521
"bundle" => %self.bundle.id
489522
);
490-
break report?;
523+
return report;
491524
},
492525
}
493-
};
526+
}
527+
}
494528

529+
async fn store_bundle_on_sled(
530+
&self,
531+
dir: Utf8TempDir,
532+
) -> anyhow::Result<()> {
495533
// Create the zipfile as a temporary file
496534
let mut zipfile = tokio::fs::File::from_std(bundle_to_zipfile(&dir)?);
535+
let total_len = zipfile.metadata().await?.len();
497536

498-
// Verify the hash locally before we send it over the network
537+
// Collect the hash locally before we send it over the network
538+
//
539+
// We'll use this later during finalization to confirm the bundle
540+
// has been stored successfully.
499541
zipfile.seek(SeekFrom::Start(0)).await?;
500542
let hash = sha2_hash(&mut zipfile).await?;
501543

@@ -515,27 +557,78 @@ impl BundleCollection {
515557
)
516558
.await?;
517559

518-
// Stream the zipfile to the sled where it should be kept
519-
zipfile.seek(SeekFrom::Start(0)).await?;
520-
let file_access = hyper_staticfile::vfs::TokioFileAccess::new(zipfile);
521-
let file_stream =
522-
hyper_staticfile::util::FileBytesStream::new(file_access);
523-
let body =
524-
reqwest::Body::wrap(hyper_staticfile::Body::Full(file_stream));
560+
let zpool = ZpoolUuid::from(self.bundle.zpool_id);
561+
let dataset = DatasetUuid::from(self.bundle.dataset_id);
562+
let support_bundle = SupportBundleUuid::from(self.bundle.id);
563+
564+
// Tell this sled to create the bundle.
565+
let creation_result = sled_client
566+
.support_bundle_start_creation(&zpool, &dataset, &support_bundle)
567+
.await
568+
.with_context(|| "Support bundle failed to start creation")?;
569+
570+
if matches!(
571+
creation_result.state,
572+
sled_agent_client::types::SupportBundleState::Complete
573+
) {
574+
// Early exit case: the bundle was already created -- we must have either
575+
// crashed or failed between "finalizing" and "writing to the database that we
576+
// finished".
577+
info!(&self.log, "Support bundle was already collected"; "bundle" => %self.bundle.id);
578+
return Ok(());
579+
}
580+
info!(&self.log, "Support bundle creation started"; "bundle" => %self.bundle.id);
581+
582+
let mut offset = 0;
583+
while offset < total_len {
584+
// Stream the zipfile to the sled where it should be kept
585+
let mut file = zipfile
586+
.try_clone()
587+
.await
588+
.with_context(|| "Failed to clone zipfile")?;
589+
file.seek(SeekFrom::Start(offset)).await.with_context(|| {
590+
format!("Failed to seek to offset {offset} / {total_len} within zipfile")
591+
})?;
592+
593+
// Only stream at most "transfer_chunk_size" bytes at once
594+
let remaining = std::cmp::min(
595+
self.transfer_chunk_size.get(),
596+
total_len - offset,
597+
);
598+
let limited_file = file.take(remaining);
599+
let stream = tokio_util::io::ReaderStream::new(limited_file);
600+
let body = reqwest::Body::wrap_stream(stream);
601+
602+
info!(
603+
&self.log,
604+
"Streaming bundle chunk";
605+
"bundle" => %self.bundle.id,
606+
"offset" => offset,
607+
"length" => remaining,
608+
);
609+
610+
sled_client.support_bundle_transfer(
611+
&zpool, &dataset, &support_bundle, offset, body
612+
).await.with_context(|| {
613+
format!("Failed to transfer bundle: {remaining}@{offset} of {total_len} to sled")
614+
})?;
615+
616+
offset += self.transfer_chunk_size.get();
617+
}
525618

526619
sled_client
527-
.support_bundle_create(
528-
&ZpoolUuid::from(self.bundle.zpool_id),
529-
&DatasetUuid::from(self.bundle.dataset_id),
530-
&SupportBundleUuid::from(self.bundle.id),
620+
.support_bundle_finalize(
621+
&zpool,
622+
&dataset,
623+
&support_bundle,
531624
&hash.to_string(),
532-
body,
533625
)
534-
.await?;
626+
.await
627+
.with_context(|| "Failed to finalize bundle")?;
535628

536629
// Returning from this method should drop all temporary storage
537630
// allocated locally for this support bundle.
538-
Ok(report)
631+
Ok(())
539632
}
540633

541634
// Perform the work of collecting the support bundle into a temporary directory
@@ -795,7 +888,7 @@ impl BackgroundTask for SupportBundleCollector {
795888
Ok(report) => collection_report = Some(report),
796889
Err(err) => {
797890
collection_err =
798-
Some(json!({ "collect_error": err.to_string() }))
891+
Some(json!({ "collect_error": InlineErrorChain::new(err.as_ref()).to_string() }))
799892
}
800893
};
801894

@@ -1076,7 +1169,9 @@ async fn save_sp_dumps(
10761169
mod test {
10771170
use super::*;
10781171

1172+
use crate::app::support_bundles::SupportBundleQueryType;
10791173
use camino_tempfile::tempdir;
1174+
use http_body_util::BodyExt;
10801175
use nexus_db_model::PhysicalDisk;
10811176
use nexus_db_model::PhysicalDiskKind;
10821177
use nexus_db_model::RendezvousDebugDataset;
@@ -1356,6 +1451,7 @@ mod test {
13561451
// NOTE: The support bundle querying interface isn't supported on
13571452
// the simulated sled agent (yet?) so we're skipping this step.
13581453
skip_sled_info: true,
1454+
..Default::default()
13591455
};
13601456
let report = collector
13611457
.collect_bundle(&opctx, &request)
@@ -1382,6 +1478,85 @@ mod test {
13821478
assert!(report.is_none());
13831479
}
13841480

1481+
#[nexus_test(server = crate::Server)]
1482+
async fn test_collect_chunked(cptestctx: &ControlPlaneTestContext) {
1483+
let nexus = &cptestctx.server.server_context().nexus;
1484+
let datastore = nexus.datastore();
1485+
let resolver = nexus.resolver();
1486+
let opctx = OpContext::for_tests(
1487+
cptestctx.logctx.log.clone(),
1488+
datastore.clone(),
1489+
);
1490+
1491+
// Before we can create any bundles, we need to create the
1492+
// space for them to be provisioned.
1493+
let _datasets =
1494+
TestDataset::setup(cptestctx, &datastore, &opctx, 1).await;
1495+
1496+
let bundle = datastore
1497+
.support_bundle_create(&opctx, "For collection testing", nexus.id())
1498+
.await
1499+
.expect("Couldn't allocate a support bundle");
1500+
assert_eq!(bundle.state, SupportBundleState::Collecting);
1501+
1502+
let collector = SupportBundleCollector::new(
1503+
datastore.clone(),
1504+
resolver.clone(),
1505+
false,
1506+
nexus.id(),
1507+
);
1508+
1509+
// The bundle collection should complete successfully.
1510+
//
1511+
// We're going to use a really small chunk size here to force the bundle
1512+
// to get split up.
1513+
let request = BundleRequest {
1514+
skip_sled_info: true,
1515+
transfer_chunk_size: NonZeroU64::new(16).unwrap(),
1516+
};
1517+
1518+
let report = collector
1519+
.collect_bundle(&opctx, &request)
1520+
.await
1521+
.expect("Collection should have succeeded under test")
1522+
.expect("Collecting the bundle should have generated a report");
1523+
assert_eq!(report.bundle, bundle.id.into());
1524+
assert!(report.listed_in_service_sleds);
1525+
assert!(report.listed_sps);
1526+
assert!(report.activated_in_db_ok);
1527+
1528+
let observed_bundle = datastore
1529+
.support_bundle_get(&opctx, bundle.id.into())
1530+
.await
1531+
.expect("Bundle should definitely be in db by this point");
1532+
assert_eq!(observed_bundle.state, SupportBundleState::Active);
1533+
1534+
// Download a file from the bundle, to verify that it was trasnferred
1535+
// successfully.
1536+
let head = false;
1537+
let range = None;
1538+
let response = nexus
1539+
.support_bundle_download(
1540+
&opctx,
1541+
observed_bundle.id.into(),
1542+
SupportBundleQueryType::Path {
1543+
file_path: "bundle_id.txt".to_string(),
1544+
},
1545+
head,
1546+
range,
1547+
)
1548+
.await
1549+
.unwrap();
1550+
1551+
// Read the body to bytes, then convert to string
1552+
let body_bytes =
1553+
response.into_body().collect().await.unwrap().to_bytes();
1554+
let body_string = String::from_utf8(body_bytes.to_vec()).unwrap();
1555+
1556+
// Verify the content matches the bundle ID
1557+
assert_eq!(body_string, observed_bundle.id.to_string());
1558+
}
1559+
13851560
#[nexus_test(server = crate::Server)]
13861561
async fn test_collect_many(cptestctx: &ControlPlaneTestContext) {
13871562
let nexus = &cptestctx.server.server_context().nexus;
@@ -1415,7 +1590,8 @@ mod test {
14151590
);
14161591

14171592
// Each time we call "collect_bundle", we collect a SINGLE bundle.
1418-
let request = BundleRequest { skip_sled_info: true };
1593+
let request =
1594+
BundleRequest { skip_sled_info: true, ..Default::default() };
14191595
let report = collector
14201596
.collect_bundle(&opctx, &request)
14211597
.await
@@ -1553,7 +1729,8 @@ mod test {
15531729
false,
15541730
nexus.id(),
15551731
);
1556-
let request = BundleRequest { skip_sled_info: true };
1732+
let request =
1733+
BundleRequest { skip_sled_info: true, ..Default::default() };
15571734
let report = collector
15581735
.collect_bundle(&opctx, &request)
15591736
.await
@@ -1689,7 +1866,8 @@ mod test {
16891866
false,
16901867
nexus.id(),
16911868
);
1692-
let request = BundleRequest { skip_sled_info: true };
1869+
let request =
1870+
BundleRequest { skip_sled_info: true, ..Default::default() };
16931871
let report = collector
16941872
.collect_bundle(&opctx, &request)
16951873
.await
@@ -1768,7 +1946,8 @@ mod test {
17681946
false,
17691947
nexus.id(),
17701948
);
1771-
let request = BundleRequest { skip_sled_info: true };
1949+
let request =
1950+
BundleRequest { skip_sled_info: true, ..Default::default() };
17721951
let report = collector
17731952
.collect_bundle(&opctx, &request)
17741953
.await

0 commit comments

Comments
 (0)