diff --git a/Cargo.lock b/Cargo.lock index 73e87fc2..ff44e3b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,8 +2156,8 @@ dependencies = [ [[package]] name = "stackable-operator" -version = "0.73.0" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.73.0#4d98a29b08a7d959e5e287f774cf064c02ffbd62" +version = "0.74.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.74.0#c77a5423b66bc1667b63af7d8bec00de88a5303f" dependencies = [ "chrono", "clap", @@ -2170,7 +2170,6 @@ dependencies = [ "json-patch", "k8s-openapi", "kube", - "lazy_static", "opentelemetry-jaeger", "opentelemetry_sdk", "product-config", @@ -2194,7 +2193,7 @@ dependencies = [ [[package]] name = "stackable-operator-derive" version = "0.3.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.73.0#4d98a29b08a7d959e5e287f774cf064c02ffbd62" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.74.0#c77a5423b66bc1667b63af7d8bec00de88a5303f" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.nix b/Cargo.nix index 76991377..8060cb01 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -6682,13 +6682,13 @@ rec { }; "stackable-operator" = rec { crateName = "stackable-operator"; - version = "0.73.0"; + version = "0.74.0"; edition = "2021"; workspace_member = null; src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; - rev = "4d98a29b08a7d959e5e287f774cf064c02ffbd62"; - sha256 = "0cmfbc3v9kklsfkqbnhwig45106gfizhmlmg9p1qgdjp8az43l9m"; + rev = "c77a5423b66bc1667b63af7d8bec00de88a5303f"; + sha256 = "1g1a0v98wlcb36ibwv1nv75g3b3s1mjmaps443fc2w2maam94lya"; }; libName = "stackable_operator"; authors = [ @@ -6745,10 +6745,6 @@ rec { usesDefaultFeatures = false; features = [ "client" "jsonpatch" "runtime" "derive" "rustls-tls" ]; } - { - name = "lazy_static"; - packageId = "lazy_static"; - } { name = "opentelemetry-jaeger"; packageId = "opentelemetry-jaeger"; @@ -6841,8 +6837,8 @@ rec { workspace_member = null; src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; - rev = "4d98a29b08a7d959e5e287f774cf064c02ffbd62"; - sha256 = "0cmfbc3v9kklsfkqbnhwig45106gfizhmlmg9p1qgdjp8az43l9m"; + rev = "c77a5423b66bc1667b63af7d8bec00de88a5303f"; + sha256 = "1g1a0v98wlcb36ibwv1nv75g3b3s1mjmaps443fc2w2maam94lya"; }; procMacro = true; libName = "stackable_operator_derive"; diff --git a/Cargo.toml b/Cargo.toml index 04049e6e..0e4626da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,13 +21,13 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" snafu = "0.8" -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.73.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.74.0" } product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } strum = { version = "0.26", features = ["derive"] } tokio = { version = "1.39", features = ["full"] } tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } -#[patch."https://github.com/stackabletech/operator-rs.git"] +[patch."https://github.com/stackabletech/operator-rs.git"] #stackable-operator = { path = "../operator-rs/crates/stackable-operator" } #stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" } diff --git a/crate-hashes.json b/crate-hashes.json index ddd91d81..8725a492 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -1,5 +1,5 @@ { - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.73.0#stackable-operator-derive@0.3.1": "0cmfbc3v9kklsfkqbnhwig45106gfizhmlmg9p1qgdjp8az43l9m", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.73.0#stackable-operator@0.73.0": "0cmfbc3v9kklsfkqbnhwig45106gfizhmlmg9p1qgdjp8az43l9m", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.74.0#stackable-operator-derive@0.3.1": "1g1a0v98wlcb36ibwv1nv75g3b3s1mjmaps443fc2w2maam94lya", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.74.0#stackable-operator@0.74.0": "1g1a0v98wlcb36ibwv1nv75g3b3s1mjmaps443fc2w2maam94lya", "git+https://github.com/stackabletech/product-config.git?tag=0.7.0#product-config@0.7.0": "0gjsm80g6r75pm3824dcyiz4ysq1ka4c1if6k1mjm9cnd5ym0gny" } \ No newline at end of file diff --git a/deploy/helm/hdfs-operator/crds/crds.yaml b/deploy/helm/hdfs-operator/crds/crds.yaml index 9f16690e..d7813a9d 100644 --- a/deploy/helm/hdfs-operator/crds/crds.yaml +++ b/deploy/helm/hdfs-operator/crds/crds.yaml @@ -1807,6 +1807,17 @@ spec: - type type: object type: array + deployedProductVersion: + description: |- + The product version that the HDFS cluster is currently running. + + During upgrades, this field contains the *old* version. + nullable: true + type: string + upgradeTargetProductVersion: + description: The product version that is currently being upgraded to, otherwise null. + nullable: true + type: string type: object required: - spec diff --git a/docs/modules/hdfs/pages/usage-guide/upgrading.adoc b/docs/modules/hdfs/pages/usage-guide/upgrading.adoc new file mode 100644 index 00000000..f78e9def --- /dev/null +++ b/docs/modules/hdfs/pages/usage-guide/upgrading.adoc @@ -0,0 +1,107 @@ += Upgrading HDFS + +IMPORTANT: HDFS upgrades are experimental, and details may change at any time + +HDFS currently requires a manual process to upgrade. This guide will take you through an example case, upgrading an example cluster (from our xref:getting_started/index.adoc[Getting Started] guide) from HDFS 3.3.6 to 3.4.0. + +== Preparing for the worst + +Upgrades can fail, and it is important to prepare for when that happens. Apache HDFS supports https://hadoop.apache.org/docs/r3.4.0/hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html#Downgrade_and_Rollback[two ways to revert an upgrade]: + +Rollback:: Reverts all user data to the pre-upgrade state. Requires taking the cluster offline. +Downgrade:: Downgrades the HDFS software but preserves all changes made by users. Can be performed as a rolling change, keeping the cluster online. + +The Stackable Operator for HDFS supports downgrading but not rollbacks. + +In order to downgrade, revert the `.spec.image.productVersion` field, and then proceed to xref:#finalize[finalizing] once the cluster is downgraded: + +[source,shell] +---- +$ kubectl patch hdfs/simple-hdfs --patch '{"spec": {"image": {"productVersion": "3.3.6"}}}' --type=merge +hdfscluster.hdfs.stackable.tech/simple-hdfs patched +---- + +== Preparing HDFS + +HDFS must be configured to initiate the upgrade process. To do this, put the cluster into upgrade mode by running the following commands in an HDFS superuser environment +(either a client configured with a superuser account, or from inside a NameNode pod): + +// This could be automated by the operator, but dfsadmin does not have good machine-readable output. +// It *can* be queried over JMX, but we're not so lucky for finalization. + +[source,shell] +---- +$ hdfs dfsadmin -rollingUpgrade prepare +PREPARE rolling upgrade ... +Preparing for upgrade. Data is being saved for rollback. +Run "dfsadmin -rollingUpgrade query" to check the status +for proceeding with rolling upgrade + Block Pool ID: BP-841432641-10.244.0.29-1722612757853 + Start Time: Fri Aug 02 15:49:12 GMT 2024 (=1722613752341) + Finalize Time: + +$ # Then run query until the HDFS is ready to proceed +$ hdfs dfsadmin -rollingUpgrade query +QUERY rolling upgrade ... +Preparing for upgrade. Data is being saved for rollback. +Run "dfsadmin -rollingUpgrade query" to check the status +for proceeding with rolling upgrade + Block Pool ID: BP-841432641-10.244.0.29-1722612757853 + Start Time: Fri Aug 02 15:49:12 GMT 2024 (=1722613752341) + Finalize Time: + +$ # It is safe to proceed when the output indicates so, like this: +$ hdfs dfsadmin -rollingUpgrade query +QUERY rolling upgrade ... +Proceed with rolling upgrade: + Block Pool ID: BP-841432641-10.244.0.29-1722612757853 + Start Time: Fri Aug 02 15:49:12 GMT 2024 (=1722613752341) + Finalize Time: +---- + +== Starting the upgrade + +Once HDFS is ready to upgrade, the HdfsCluster can be updated with the new product version: + +[source,shell] +---- +$ kubectl patch hdfs/simple-hdfs --patch '{"spec": {"image": {"productVersion": "3.4.0"}}}' --type=merge +hdfscluster.hdfs.stackable.tech/simple-hdfs patched +---- + +Then wait until all pods have restarted, are in the Ready state, and running the new HDFS version. + +NOTE: This will automatically enable the NameNodes' compatibility mode, allowing them to start despite the fsImage version mismatch. + +NOTE: Services will be upgraded in order: JournalNodes, then NameNodes, then DataNodes. + +[#finalize] +== Finalizing the upgrade + +Once all HDFS pods are running the new version, the HDFS upgrade can be finalized (from the HDFS superuser environment as described in the preparation step): + +[source,shell] +---- +$ hdfs dfsadmin -rollingUpgrade finalize +FINALIZE rolling upgrade ... +Rolling upgrade is finalized. + Block Pool ID: BP-841432641-10.244.0.29-1722612757853 + Start Time: Fri Aug 02 15:49:12 GMT 2024 (=1722613752341) + Finalize Time: Fri Aug 02 15:58:39 GMT 2024 (=1722614319854) +---- + +// We can't safely automate this, because finalize is asynchronous and doesn't tell us whether all NameNodes have even received the request to finalize. + +WARNING: Please ensure that all NameNodes are running and available before proceeding. NameNodes that have not finalized yet will crash on launch when taken out of compatibility mode. + +Finally, mark the cluster as upgraded: + +[source,shell] +---- +$ kubectl patch hdfs/simple-hdfs --subresource=status --patch '{"status": {"deployedProductVersion": "3.4.0"}}' --type=merge +hdfscluster.hdfs.stackable.tech/simple-hdfs patched +---- + +NOTE: `deployedProductVersion` is located in the _status_ subresource, which will not be modified by most graphical editors, and `kubectl` requires the `--subresource=status` flag. + +The NameNodes will then be restarted a final time, taking them out of compatibility mode. diff --git a/docs/modules/hdfs/partials/nav.adoc b/docs/modules/hdfs/partials/nav.adoc index 8409eaf9..2e67c8e8 100644 --- a/docs/modules/hdfs/partials/nav.adoc +++ b/docs/modules/hdfs/partials/nav.adoc @@ -10,6 +10,7 @@ ** xref:hdfs:usage-guide/logging-log-aggregation.adoc[] ** xref:hdfs:usage-guide/monitoring.adoc[] ** xref:hdfs:usage-guide/configuration-environment-overrides.adoc[] +** xref:hdfs:usage-guide/upgrading.adoc[] ** xref:hdfs:usage-guide/operations/index.adoc[] *** xref:hdfs:usage-guide/operations/cluster-operations.adoc[] *** xref:hdfs:usage-guide/operations/pod-placement.adoc[] diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 4b47a122..ff1b697f 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -41,7 +41,7 @@ use stackable_operator::{ status::condition::{ClusterCondition, HasStatusCondition}, time::Duration, }; -use strum::{Display, EnumIter, EnumString}; +use strum::{Display, EnumIter, EnumString, IntoStaticStr}; use crate::{ affinity::get_affinity, @@ -312,11 +312,13 @@ impl AnyNodeConfig { #[derive( Clone, + Copy, Debug, Deserialize, Display, EnumIter, EnumString, + IntoStaticStr, Eq, Hash, JsonSchema, @@ -324,15 +326,15 @@ impl AnyNodeConfig { Serialize, )] pub enum HdfsRole { + #[serde(rename = "journalnode")] + #[strum(serialize = "journalnode")] + JournalNode, #[serde(rename = "namenode")] #[strum(serialize = "namenode")] NameNode, #[serde(rename = "datanode")] #[strum(serialize = "datanode")] DataNode, - #[serde(rename = "journalnode")] - #[strum(serialize = "journalnode")] - JournalNode, } impl HdfsRole { @@ -802,6 +804,43 @@ impl HdfsCluster { Ok(result) } + pub fn upgrade_state(&self) -> Result, UpgradeStateError> { + use upgrade_state_error::*; + let Some(status) = self.status.as_ref() else { + return Ok(None); + }; + let requested_version = self.spec.image.product_version(); + let Some(deployed_version) = status.deployed_product_version.as_deref() else { + // If no deployed version, fresh install -> no upgrade + return Ok(None); + }; + let current_upgrade_target_version = status.upgrade_target_product_version.as_deref(); + + if requested_version != deployed_version { + // If we're requesting a different version than what is deployed, assume that we're upgrading. + // Could also be a downgrade to an older version, but we don't support downgrades after upgrade finalization. + match current_upgrade_target_version { + Some(upgrading_version) if requested_version != upgrading_version => { + // If we're in an upgrade, do not allow switching to a third version + InvalidCrossgradeSnafu { + requested_version, + deployed_version, + upgrading_version, + } + .fail() + } + _ => Ok(Some(UpgradeState::Upgrading)), + } + } else if current_upgrade_target_version.is_some_and(|x| requested_version != x) { + // If we're requesting the old version mid-upgrade, assume that we're downgrading. + // We only support downgrading to the exact previous version. + Ok(Some(UpgradeState::Downgrading)) + } else { + // All three versions match, upgrade was completed without clearing `upgrading_product_version`. + Ok(None) + } + } + pub fn authentication_config(&self) -> Option<&AuthenticationConfig> { self.spec.cluster_config.authentication.as_ref() } @@ -955,6 +994,26 @@ impl HdfsPodRef { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UpgradeState { + /// The cluster is currently being upgraded to a new version. + Upgrading, + + /// The cluster is currently being downgraded to the previous version. + Downgrading, +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum UpgradeStateError { + #[snafu(display("requested version {requested_version:?} while still upgrading from {deployed_version:?} to {upgrading_version:?}, please finish the upgrade or downgrade first"))] + InvalidCrossgrade { + requested_version: String, + deployed_version: String, + upgrading_version: String, + }, +} + #[derive( Clone, Debug, @@ -1322,6 +1381,14 @@ impl Configuration for JournalNodeConfigFragment { pub struct HdfsClusterStatus { #[serde(default)] pub conditions: Vec, + + /// The product version that the HDFS cluster is currently running. + /// + /// During upgrades, this field contains the *old* version. + pub deployed_product_version: Option, + + /// The product version that is currently being upgraded to, otherwise null. + pub upgrade_target_product_version: Option, } impl HasStatusCondition for HdfsCluster { diff --git a/rust/operator-binary/src/container.rs b/rust/operator-binary/src/container.rs index 961c2527..171fb2b3 100644 --- a/rust/operator-binary/src/container.rs +++ b/rust/operator-binary/src/container.rs @@ -12,6 +12,7 @@ use crate::DATANODE_ROOT_DATA_DIR_PREFIX; use crate::JVM_SECURITY_PROPERTIES_FILE; use crate::LOG4J_PROPERTIES; +use stackable_hdfs_crd::UpgradeState; use stackable_operator::utils::COMMON_BASH_TRAP_FUNCTIONS; use std::{collections::BTreeMap, str::FromStr}; @@ -212,7 +213,7 @@ impl ContainerConfig { labels: &Labels, ) -> Result<(), Error> { // HDFS main container - let main_container_config = Self::from(role.clone()); + let main_container_config = Self::from(*role); pb.add_volumes(main_container_config.volumes(merged_config, object_name, labels)?); pb.add_container(main_container_config.main_container( hdfs, @@ -548,6 +549,14 @@ impl ContainerConfig { args.push_str(&Self::export_kerberos_real_env_var_command()); } + let upgrade_args = if hdfs.upgrade_state().ok() == Some(Some(UpgradeState::Upgrading)) + && *role == HdfsRole::NameNode + { + "-rollingUpgrade started" + } else { + "" + }; + match self { ContainerConfig::Hdfs { role, .. } => { args.push_str(&self.copy_log4j_properties_cmd( @@ -566,7 +575,7 @@ if [[ -d {LISTENER_VOLUME_DIR} ]]; then export $(basename $i | tr a-z- A-Z_)_PORT="$(cat $i)" done fi -{hadoop_home}/bin/hdfs {role} & +{hadoop_home}/bin/hdfs {role} {upgrade_args} & wait_for_termination $! {create_vector_shutdown_file_command} "#, @@ -1317,7 +1326,7 @@ impl From for ContainerConfig { fn from(role: HdfsRole) -> Self { match role { HdfsRole::NameNode => Self::Hdfs { - role: role.clone(), + role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), ipc_port_name: SERVICE_PORT_NAME_RPC, @@ -1327,7 +1336,7 @@ impl From for ContainerConfig { metrics_port: DEFAULT_NAME_NODE_METRICS_PORT, }, HdfsRole::DataNode => Self::Hdfs { - role: role.clone(), + role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), ipc_port_name: SERVICE_PORT_NAME_IPC, @@ -1337,7 +1346,7 @@ impl From for ContainerConfig { metrics_port: DEFAULT_DATA_NODE_METRICS_PORT, }, HdfsRole::JournalNode => Self::Hdfs { - role: role.clone(), + role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), ipc_port_name: SERVICE_PORT_NAME_RPC, diff --git a/rust/operator-binary/src/hdfs_controller.rs b/rust/operator-binary/src/hdfs_controller.rs index eadf07ef..bd9fb6e5 100644 --- a/rust/operator-binary/src/hdfs_controller.rs +++ b/rust/operator-binary/src/hdfs_controller.rs @@ -1,6 +1,5 @@ use std::{ collections::{BTreeMap, HashMap}, - str::FromStr, sync::Arc, }; @@ -22,6 +21,7 @@ use stackable_operator::{ product_image_selection::ResolvedProductImage, rbac::{build_rbac_resources, service_account_name}, }, + iter::reverse_if, k8s_openapi::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec}, @@ -39,16 +39,20 @@ use stackable_operator::{ logging::controller::ReconcilerError, product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, role_utils::{GenericRoleConfig, RoleGroupRef}, - status::condition::{ - compute_conditions, operations::ClusterOperationsConditionBuilder, - statefulset::StatefulSetConditionBuilder, + status::{ + condition::{ + compute_conditions, operations::ClusterOperationsConditionBuilder, + statefulset::StatefulSetConditionBuilder, + }, + rollout::check_statefulset_rollout_complete, }, time::Duration, }; -use strum::{EnumDiscriminants, IntoStaticStr}; +use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr}; use stackable_hdfs_crd::{ constants::*, AnyNodeConfig, HdfsCluster, HdfsClusterStatus, HdfsPodRef, HdfsRole, + UpgradeState, UpgradeStateError, }; use crate::{ @@ -83,6 +87,9 @@ pub enum Error { source: stackable_operator::product_config_utils::Error, }, + #[snafu(display("invalid upgrade state"))] + InvalidUpgradeState { source: UpgradeStateError }, + #[snafu(display("cannot create rolegroup service {name:?}"))] ApplyRoleGroupService { source: stackable_operator::cluster_resources::Error, @@ -323,10 +330,28 @@ pub async fn reconcile_hdfs(hdfs: Arc, ctx: Arc) -> HdfsOperat let dfs_replication = hdfs.spec.cluster_config.dfs_replication; let mut ss_cond_builder = StatefulSetConditionBuilder::default(); - for (role_name, group_config) in validated_config.iter() { - let role: HdfsRole = HdfsRole::from_str(role_name).with_context(|_| InvalidRoleSnafu { - role: role_name.to_string(), - })?; + let upgrade_state = hdfs.upgrade_state().context(InvalidUpgradeStateSnafu)?; + let mut deploy_done = true; + + // Roles must be deployed in order during rolling upgrades, + // namenode version must be >= datanode version (and so on). + let roles = reverse_if( + match upgrade_state { + // Downgrades have the opposite version relationship, so they need to be rolled out in reverse order. + Some(UpgradeState::Downgrading) => { + tracing::info!("HdfsCluster is being downgraded, deploying in reverse order"); + true + } + _ => false, + }, + HdfsRole::iter(), + ); + 'roles: for role in roles { + let role_name: &str = role.into(); + let Some(group_config) = validated_config.get(role_name) else { + tracing::debug!(?role, "role has no configuration, skipping"); + continue; + }; if let Some(content) = build_invalid_replica_message(&hdfs, &role, dfs_replication) { publish_event( @@ -408,14 +433,26 @@ pub async fn reconcile_hdfs(hdfs: Arc, ctx: Arc) -> HdfsOperat name: rg_configmap_name, })?; let rg_statefulset_name = rg_statefulset.name_any(); - ss_cond_builder.add( - cluster_resources - .add(client, rg_statefulset.clone()) - .await - .with_context(|_| ApplyRoleGroupStatefulSetSnafu { - name: rg_statefulset_name, - })?, - ); + let deployed_rg_statefulset = cluster_resources + .add(client, rg_statefulset.clone()) + .await + .with_context(|_| ApplyRoleGroupStatefulSetSnafu { + name: rg_statefulset_name, + })?; + ss_cond_builder.add(deployed_rg_statefulset.clone()); + if upgrade_state.is_some() { + // When upgrading, ensure that each role is upgraded before moving on to the next as recommended by + // https://hadoop.apache.org/docs/r3.4.0/hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html#Upgrading_Non-Federated_Clusters + if let Err(reason) = check_statefulset_rollout_complete(&deployed_rg_statefulset) { + tracing::info!( + rolegroup.statefulset = %ObjectRef::from_obj(&deployed_rg_statefulset), + reason = &reason as &dyn std::error::Error, + "rolegroup is still upgrading, waiting..." + ); + deploy_done = false; + break 'roles; + } + } } let role_config = hdfs.role_config(&role); @@ -459,12 +496,44 @@ pub async fn reconcile_hdfs(hdfs: Arc, ctx: Arc) -> HdfsOperat hdfs.as_ref(), &[&ss_cond_builder, &cluster_operation_cond_builder], ), + // FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized + deployed_product_version: Some( + hdfs.status + .as_ref() + // Keep current version if set + .and_then(|status| status.deployed_product_version.as_deref()) + // Otherwise (on initial deploy) fall back to user's specified version + .unwrap_or(hdfs.spec.image.product_version()) + .to_string(), + ), + upgrade_target_product_version: match upgrade_state { + // User is upgrading, whatever they're upgrading to is (by definition) the target + Some(UpgradeState::Upgrading) => Some(hdfs.spec.image.product_version().to_string()), + Some(UpgradeState::Downgrading) => { + if deploy_done { + // Downgrade is done, clear + tracing::info!("downgrade deployed, clearing upgrade state"); + None + } else { + // Downgrade is still in progress, preserve the current value + hdfs.status + .as_ref() + .and_then(|status| status.upgrade_target_product_version.clone()) + } + } + // Upgrade is complete (if any), clear + None => None, + }, }; - cluster_resources - .delete_orphaned_resources(client) - .await - .context(DeleteOrphanedResourcesSnafu)?; + // During upgrades we do partial deployments, we don't want to garbage collect after those + // since we *will* redeploy (or properly orphan) the remaining resources later. + if deploy_done { + cluster_resources + .delete_orphaned_resources(client) + .await + .context(DeleteOrphanedResourcesSnafu)?; + } client .apply_patch_status(OPERATOR_NAME, &*hdfs, &status) .await @@ -873,7 +942,7 @@ properties: [] let validated_config = validate_all_roles_and_groups_config( "3.4.0", &config, - &ProductConfigManager::from_str(product_config).unwrap(), + &product_config.parse::().unwrap(), false, false, )