Skip to content

Commit 10e5220

Browse files
committed
Perform downgrades in order
1 parent 1e68f1d commit 10e5220

File tree

8 files changed

+85
-34
lines changed

8 files changed

+85
-34
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ serde = { version = "1.0", features = ["derive"] }
2121
serde_json = "1.0"
2222
serde_yaml = "0.9"
2323
snafu = "0.8"
24-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.70.0" }
24+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.73.0" }
2525
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
2626
strum = { version = "0.26", features = ["derive"] }
2727
tokio = { version = "1.38", features = ["full"] }
@@ -31,4 +31,4 @@ tracing-futures = { version = "0.2", features = ["futures-03"] }
3131
[patch."https://github.com/stackabletech/operator-rs.git"]
3232
#stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
3333
#stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
34-
stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "revert/lazylock" }
34+
stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "feature/iter-reverse-if" }

crate-hashes.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/modules/hdfs/pages/usage-guide/upgrading.adoc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ $ kubectl patch hdfs/simple-hdfs --patch '{"spec": {"image": {"productVersion":
2121
hdfscluster.hdfs.stackable.tech/simple-hdfs patched
2222
----
2323

24-
WARNING: The Stackable Operator for HDFS will _not_ currently roll downgrades in order, they will incur some downtime.
25-
2624
== Preparing HDFS
2725

2826
HDFS must be configured to initiate the upgrade process. To do this, put the cluster into upgrade mode by running the following commands in an HDFS superuser environment

rust/crd/src/lib.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -804,13 +804,22 @@ impl HdfsCluster {
804804
Ok(result)
805805
}
806806

807-
pub fn is_upgrading(&self) -> bool {
808-
self.status
809-
.as_ref()
810-
.and_then(|status| status.deployed_product_version.as_deref())
811-
.map_or(false, |deployed_version| {
812-
deployed_version != self.spec.image.product_version()
813-
})
807+
pub fn upgrade_state(&self) -> Option<UpgradeState> {
808+
let status = self.status.as_ref()?;
809+
let requested_version = self.spec.image.product_version();
810+
811+
if requested_version != status.deployed_product_version.as_deref()? {
812+
// If we're requesting a different version than what is deployed, assume that we're upgrading.
813+
// Could also be a downgrade to an older version, but we don't support downgrades after upgrade finalization.
814+
Some(UpgradeState::Upgrading)
815+
} else if requested_version != status.upgrading_product_version.as_deref()? {
816+
// If we're requesting the old version mid-upgrade, assume that we're downgrading.
817+
// We only support downgrading to the exact previous version.
818+
Some(UpgradeState::Downgrading)
819+
} else {
820+
// All three versions match, upgrade was completed without clearing `upgrading_product_version`.
821+
None
822+
}
814823
}
815824

816825
pub fn authentication_config(&self) -> Option<&AuthenticationConfig> {
@@ -966,6 +975,14 @@ impl HdfsPodRef {
966975
}
967976
}
968977

978+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
979+
pub enum UpgradeState {
980+
/// The cluster is currently being upgraded to a new version.
981+
Upgrading,
982+
/// The cluster is currently being downgraded to the previous version.
983+
Downgrading,
984+
}
985+
969986
#[derive(
970987
Clone,
971988
Debug,
@@ -1334,7 +1351,13 @@ pub struct HdfsClusterStatus {
13341351
#[serde(default)]
13351352
pub conditions: Vec<ClusterCondition>,
13361353

1354+
/// The product version that the HDFS cluster is currently running.
1355+
///
1356+
/// During upgrades, this field contains the *old* version.
13371357
pub deployed_product_version: Option<String>,
1358+
1359+
/// The product version that is currently being upgraded to, otherwise null.
1360+
pub upgrading_product_version: Option<String>,
13381361
}
13391362

13401363
impl HasStatusCondition for HdfsCluster {

rust/operator-binary/src/container.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use crate::DATANODE_ROOT_DATA_DIR_PREFIX;
1313
use crate::JVM_SECURITY_PROPERTIES_FILE;
1414
use crate::LOG4J_PROPERTIES;
15+
use stackable_hdfs_crd::UpgradeState;
1516
use stackable_operator::utils::COMMON_BASH_TRAP_FUNCTIONS;
1617
use std::{collections::BTreeMap, str::FromStr};
1718

@@ -548,7 +549,9 @@ impl ContainerConfig {
548549
args.push_str(&Self::export_kerberos_real_env_var_command());
549550
}
550551

551-
let upgrade_args = if hdfs.is_upgrading() && *role == HdfsRole::NameNode {
552+
let upgrade_args = if hdfs.upgrade_state() == Some(UpgradeState::Upgrading)
553+
&& *role == HdfsRole::NameNode
554+
{
552555
"-rollingUpgrade started"
553556
} else {
554557
""

rust/operator-binary/src/hdfs_controller.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use stackable_operator::{
2121
product_image_selection::ResolvedProductImage,
2222
rbac::{build_rbac_resources, service_account_name},
2323
},
24+
iter::reverse_if,
2425
k8s_openapi::{
2526
api::{
2627
apps::v1::{StatefulSet, StatefulSetSpec},
@@ -50,7 +51,7 @@ use stackable_operator::{
5051
use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr};
5152

5253
use stackable_hdfs_crd::{
53-
constants::*, AnyNodeConfig, HdfsCluster, HdfsClusterStatus, HdfsPodRef, HdfsRole,
54+
constants::*, AnyNodeConfig, HdfsCluster, HdfsClusterStatus, HdfsPodRef, HdfsRole, UpgradeState,
5455
};
5556

5657
use crate::{
@@ -325,10 +326,23 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
325326
let dfs_replication = hdfs.spec.cluster_config.dfs_replication;
326327
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
327328

329+
let upgrade_state = hdfs.upgrade_state();
328330
let mut deploy_done = true;
329331

330-
// Roles must be deployed in order during rolling upgrades
331-
'roles: for role in HdfsRole::iter() {
332+
// Roles must be deployed in order during rolling upgrades,
333+
// namenode version must be >= datanode version (and so on).
334+
let roles = reverse_if(
335+
match upgrade_state {
336+
// Downgrades have the opposite version relationship, so they need to be rolled out in reverse order.
337+
Some(UpgradeState::Downgrading) => {
338+
tracing::info!("HdfsCluster is being downgraded, deploying in reverse order");
339+
true
340+
}
341+
_ => false,
342+
},
343+
HdfsRole::iter(),
344+
);
345+
'roles: for role in roles {
332346
let role_name: &str = role.into();
333347
let Some(group_config) = validated_config.get(role_name) else {
334348
tracing::debug!(?role, "role has no configuration, skipping");
@@ -422,12 +436,12 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
422436
name: rg_statefulset_name,
423437
})?;
424438
ss_cond_builder.add(deployed_rg_statefulset.clone());
425-
if hdfs.is_upgrading() {
439+
if upgrade_state.is_some() {
426440
// When upgrading, ensure that each role is upgraded before moving on to the next as recommended by
427441
// https://hadoop.apache.org/docs/r3.4.0/hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html#Upgrading_Non-Federated_Clusters
428442
if let Err(reason) = check_statefulset_rollout_complete(&deployed_rg_statefulset) {
429443
tracing::info!(
430-
object = %ObjectRef::from_obj(&deployed_rg_statefulset),
444+
rolegroup.statefulset = %ObjectRef::from_obj(&deployed_rg_statefulset),
431445
reason = &reason as &dyn std::error::Error,
432446
"rolegroup is still upgrading, waiting..."
433447
);
@@ -482,17 +496,30 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
482496
deployed_product_version: Some(
483497
hdfs.status
484498
.as_ref()
499+
// Keep current version if set
485500
.and_then(|status| status.deployed_product_version.as_deref())
501+
// Otherwise (on initial deploy) fall back to user's specified version
486502
.unwrap_or(hdfs.spec.image.product_version())
487503
.to_string(),
488504
),
489-
// deployed_product_version: if deploy_done {
490-
// Some(hdfs.spec.image.product_version().to_string())
491-
// } else {
492-
// hdfs.status
493-
// .as_ref()
494-
// .and_then(|status| status.deployed_product_version.clone())
495-
// },
505+
upgrading_product_version: match upgrade_state {
506+
// User is upgrading, whatever they're upgrading to is (by definition) the target
507+
Some(UpgradeState::Upgrading) => Some(hdfs.spec.image.product_version().to_string()),
508+
Some(UpgradeState::Downgrading) => {
509+
if deploy_done {
510+
// Downgrade is done, clear
511+
tracing::info!("downgrade deployed, clearing upgrade state");
512+
None
513+
} else {
514+
// Downgrade is still in progress, preserve the current value
515+
hdfs.status
516+
.as_ref()
517+
.and_then(|status| status.upgrading_product_version.clone())
518+
}
519+
}
520+
// Upgrade is complete (if any), clear
521+
None => None,
522+
},
496523
};
497524

498525
// During upgrades we do partial deployments, we don't want to garbage collect after those

0 commit comments

Comments
 (0)