Skip to content

Commit a5e9547

Browse files
committed
Add upgrade mode with serialized deployments
1 parent 3b91fb7 commit a5e9547

File tree

3 files changed

+94
-27
lines changed

3 files changed

+94
-27
lines changed

rust/crd/src/lib.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use stackable_operator::{
4141
status::condition::{ClusterCondition, HasStatusCondition},
4242
time::Duration,
4343
};
44-
use strum::{Display, EnumIter, EnumString};
44+
use strum::{Display, EnumIter, EnumString, IntoStaticStr};
4545

4646
use crate::{
4747
affinity::get_affinity,
@@ -130,6 +130,9 @@ pub struct HdfsClusterSpec {
130130
// no doc string - See ProductImage struct
131131
pub image: ProductImage,
132132

133+
#[serde(default)]
134+
pub upgrading: bool,
135+
133136
// no doc string - See ClusterOperation struct
134137
#[serde(default)]
135138
pub cluster_operation: ClusterOperation,
@@ -312,27 +315,29 @@ impl AnyNodeConfig {
312315

313316
#[derive(
314317
Clone,
318+
Copy,
315319
Debug,
316320
Deserialize,
317321
Display,
318322
EnumIter,
319323
EnumString,
324+
IntoStaticStr,
320325
Eq,
321326
Hash,
322327
JsonSchema,
323328
PartialEq,
324329
Serialize,
325330
)]
326331
pub enum HdfsRole {
332+
#[serde(rename = "journalnode")]
333+
#[strum(serialize = "journalnode")]
334+
JournalNode,
327335
#[serde(rename = "namenode")]
328336
#[strum(serialize = "namenode")]
329337
NameNode,
330338
#[serde(rename = "datanode")]
331339
#[strum(serialize = "datanode")]
332340
DataNode,
333-
#[serde(rename = "journalnode")]
334-
#[strum(serialize = "journalnode")]
335-
JournalNode,
336341
}
337342

338343
impl HdfsRole {
@@ -802,6 +807,17 @@ impl HdfsCluster {
802807
Ok(result)
803808
}
804809

810+
pub fn is_upgrading(&self) -> bool {
811+
// *Ideally* we'd detect this from the version mismatch, but we need manual intervention to confirm that the upgrade is done
812+
self.spec.upgrading
813+
// self.status
814+
// .as_ref()
815+
// .and_then(|status| status.deployed_product_version.as_deref())
816+
// .map_or(false, |deployed_version| {
817+
// deployed_version != self.spec.image.product_version()
818+
// })
819+
}
820+
805821
pub fn authentication_config(&self) -> Option<&AuthenticationConfig> {
806822
self.spec.cluster_config.authentication.as_ref()
807823
}
@@ -1322,6 +1338,8 @@ impl Configuration for JournalNodeConfigFragment {
13221338
pub struct HdfsClusterStatus {
13231339
#[serde(default)]
13241340
pub conditions: Vec<ClusterCondition>,
1341+
1342+
pub deployed_product_version: Option<String>,
13251343
}
13261344

13271345
impl HasStatusCondition for HdfsCluster {

rust/operator-binary/src/container.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl ContainerConfig {
212212
labels: &Labels,
213213
) -> Result<(), Error> {
214214
// HDFS main container
215-
let main_container_config = Self::from(role.clone());
215+
let main_container_config = Self::from(*role);
216216
pb.add_volumes(main_container_config.volumes(merged_config, object_name, labels)?);
217217
pb.add_container(main_container_config.main_container(
218218
hdfs,
@@ -566,11 +566,16 @@ if [[ -d {LISTENER_VOLUME_DIR} ]]; then
566566
export $(basename $i | tr a-z- A-Z_)_PORT="$(cat $i)"
567567
done
568568
fi
569-
{hadoop_home}/bin/hdfs {role} &
569+
{hadoop_home}/bin/hdfs {role} {upgrade_args} &
570570
wait_for_termination $!
571571
{create_vector_shutdown_file_command}
572572
"#,
573573
hadoop_home = Self::HADOOP_HOME,
574+
upgrade_args = if hdfs.is_upgrading() && *role == HdfsRole::NameNode {
575+
"-rollingUpgrade started"
576+
} else {
577+
""
578+
},
574579
remove_vector_shutdown_file_command =
575580
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
576581
create_vector_shutdown_file_command =
@@ -1317,7 +1322,7 @@ impl From<HdfsRole> for ContainerConfig {
13171322
fn from(role: HdfsRole) -> Self {
13181323
match role {
13191324
HdfsRole::NameNode => Self::Hdfs {
1320-
role: role.clone(),
1325+
role,
13211326
container_name: role.to_string(),
13221327
volume_mounts: ContainerVolumeDirs::from(role),
13231328
ipc_port_name: SERVICE_PORT_NAME_RPC,
@@ -1327,7 +1332,7 @@ impl From<HdfsRole> for ContainerConfig {
13271332
metrics_port: DEFAULT_NAME_NODE_METRICS_PORT,
13281333
},
13291334
HdfsRole::DataNode => Self::Hdfs {
1330-
role: role.clone(),
1335+
role,
13311336
container_name: role.to_string(),
13321337
volume_mounts: ContainerVolumeDirs::from(role),
13331338
ipc_port_name: SERVICE_PORT_NAME_IPC,
@@ -1337,7 +1342,7 @@ impl From<HdfsRole> for ContainerConfig {
13371342
metrics_port: DEFAULT_DATA_NODE_METRICS_PORT,
13381343
},
13391344
HdfsRole::JournalNode => Self::Hdfs {
1340-
role: role.clone(),
1345+
role,
13411346
container_name: role.to_string(),
13421347
volume_mounts: ContainerVolumeDirs::from(role),
13431348
ipc_port_name: SERVICE_PORT_NAME_RPC,

rust/operator-binary/src/hdfs_controller.rs

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{
22
collections::{BTreeMap, HashMap},
3-
str::FromStr,
43
sync::Arc,
54
};
65

@@ -45,7 +44,7 @@ use stackable_operator::{
4544
},
4645
time::Duration,
4746
};
48-
use strum::{EnumDiscriminants, IntoStaticStr};
47+
use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr};
4948

5049
use stackable_hdfs_crd::{
5150
constants::*, AnyNodeConfig, HdfsCluster, HdfsClusterStatus, HdfsPodRef, HdfsRole,
@@ -323,10 +322,15 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
323322
let dfs_replication = hdfs.spec.cluster_config.dfs_replication;
324323
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
325324

326-
for (role_name, group_config) in validated_config.iter() {
327-
let role: HdfsRole = HdfsRole::from_str(role_name).with_context(|_| InvalidRoleSnafu {
328-
role: role_name.to_string(),
329-
})?;
325+
let mut deploy_done = true;
326+
327+
// Roles must be deployed in order during rolling upgrades
328+
'roles: for role in HdfsRole::iter() {
329+
let role_name: &str = role.into();
330+
let Some(group_config) = validated_config.get(role_name) else {
331+
tracing::debug!(?role, "role has no configuration, skipping");
332+
continue;
333+
};
330334

331335
if let Some(content) = build_invalid_replica_message(&hdfs, &role, dfs_replication) {
332336
publish_event(
@@ -408,14 +412,43 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
408412
name: rg_configmap_name,
409413
})?;
410414
let rg_statefulset_name = rg_statefulset.name_any();
411-
ss_cond_builder.add(
412-
cluster_resources
413-
.add(client, rg_statefulset.clone())
414-
.await
415-
.with_context(|_| ApplyRoleGroupStatefulSetSnafu {
416-
name: rg_statefulset_name,
417-
})?,
418-
);
415+
let mut deployed_rg_statefulset = cluster_resources
416+
.add(client, rg_statefulset.clone())
417+
.await
418+
.with_context(|_| ApplyRoleGroupStatefulSetSnafu {
419+
name: rg_statefulset_name,
420+
})?;
421+
ss_cond_builder.add(deployed_rg_statefulset.clone());
422+
if hdfs.is_upgrading() {
423+
tracing::info!("aaaaaaa UPGRADING");
424+
let status = deployed_rg_statefulset.status.take().unwrap_or_default();
425+
426+
let current_generation = dbg!(deployed_rg_statefulset.metadata.generation);
427+
let observed_generation = dbg!(status.observed_generation);
428+
if current_generation != observed_generation {
429+
tracing::info!(
430+
object = %ObjectRef::from_obj(&deployed_rg_statefulset),
431+
generation.current = current_generation,
432+
generation.observed = observed_generation,
433+
"rolegroup is still upgrading, waiting... (generation not yet observed by statefulset controller)",
434+
);
435+
deploy_done = false;
436+
break 'roles;
437+
}
438+
439+
let total_replicas = dbg!(status.replicas);
440+
let updated_replicas = dbg!(status.updated_replicas.unwrap_or(0));
441+
if total_replicas != updated_replicas {
442+
tracing::info!(
443+
object = %ObjectRef::from_obj(&deployed_rg_statefulset),
444+
replicas.total = total_replicas,
445+
replicas.updated = updated_replicas,
446+
"rolegroup is still upgrading, waiting... (not all replicas are updated)",
447+
);
448+
deploy_done = false;
449+
break 'roles;
450+
}
451+
}
419452
}
420453

421454
let role_config = hdfs.role_config(&role);
@@ -459,12 +492,23 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
459492
hdfs.as_ref(),
460493
&[&ss_cond_builder, &cluster_operation_cond_builder],
461494
),
495+
deployed_product_version: if deploy_done {
496+
Some(hdfs.spec.image.product_version().to_string())
497+
} else {
498+
hdfs.status
499+
.as_ref()
500+
.and_then(|status| status.deployed_product_version.clone())
501+
},
462502
};
463503

464-
cluster_resources
465-
.delete_orphaned_resources(client)
466-
.await
467-
.context(DeleteOrphanedResourcesSnafu)?;
504+
// During upgrades we do partial deployments, we don't want to garbage collect after those
505+
// since we *will* redeploy (or properly orphan) the remaining resources layer.
506+
if deploy_done {
507+
cluster_resources
508+
.delete_orphaned_resources(client)
509+
.await
510+
.context(DeleteOrphanedResourcesSnafu)?;
511+
}
468512
client
469513
.apply_patch_status(OPERATOR_NAME, &*hdfs, &status)
470514
.await

0 commit comments

Comments
 (0)