Skip to content

Commit a36de0f

Browse files
committed
Move upgrade readiness check into utils module
1 parent 38809e2 commit a36de0f

File tree

4 files changed

+56
-22
lines changed

4 files changed

+56
-22
lines changed

rust/operator-binary/src/hdfs_controller.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use crate::{
6262
},
6363
product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address},
6464
security::{self, kerberos, opa::HdfsOpaConfig},
65+
utils::statefulset::check_all_replicas_updated,
6566
OPERATOR_NAME,
6667
};
6768

@@ -412,37 +413,19 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
412413
name: rg_configmap_name,
413414
})?;
414415
let rg_statefulset_name = rg_statefulset.name_any();
415-
let mut deployed_rg_statefulset = cluster_resources
416+
let deployed_rg_statefulset = cluster_resources
416417
.add(client, rg_statefulset.clone())
417418
.await
418419
.with_context(|_| ApplyRoleGroupStatefulSetSnafu {
419420
name: rg_statefulset_name,
420421
})?;
421422
ss_cond_builder.add(deployed_rg_statefulset.clone());
422423
if hdfs.is_upgrading() {
423-
let status = deployed_rg_statefulset.status.take().unwrap_or_default();
424-
425-
let current_generation = deployed_rg_statefulset.metadata.generation;
426-
let observed_generation = status.observed_generation;
427-
if current_generation != observed_generation {
428-
tracing::info!(
429-
object = %ObjectRef::from_obj(&deployed_rg_statefulset),
430-
generation.current = current_generation,
431-
generation.observed = observed_generation,
432-
"rolegroup is still upgrading, waiting... (generation not yet observed by statefulset controller)",
433-
);
434-
deploy_done = false;
435-
break 'roles;
436-
}
437-
438-
let total_replicas = status.replicas;
439-
let updated_replicas = status.updated_replicas.unwrap_or(0);
440-
if total_replicas != updated_replicas {
424+
if let Err(reason) = check_all_replicas_updated(&deployed_rg_statefulset) {
441425
tracing::info!(
442426
object = %ObjectRef::from_obj(&deployed_rg_statefulset),
443-
replicas.total = total_replicas,
444-
replicas.updated = updated_replicas,
445-
"rolegroup is still upgrading, waiting... (not all replicas are updated)",
427+
reason = &reason as &dyn std::error::Error,
428+
"rolegroup is still upgrading, waiting..."
446429
);
447430
deploy_done = false;
448431
break 'roles;

rust/operator-binary/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ mod hdfs_controller;
3434
mod operations;
3535
mod product_logging;
3636
mod security;
37+
mod utils;
3738

3839
mod built_info {
3940
include!(concat!(env!("OUT_DIR"), "/built.rs"));

rust/operator-binary/src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod statefulset;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::borrow::Cow;
2+
3+
use snafu::Snafu;
4+
use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet;
5+
6+
#[derive(Debug, Snafu)]
7+
#[snafu(module(outdated_statefulset))]
8+
pub enum OutdatedStatefulSet {
9+
#[snafu(display("generation {current_generation:?} not yet observed by statefulset controller, last seen was {observed_generation:?}"))]
10+
NotYetObserved {
11+
current_generation: Option<i64>,
12+
observed_generation: Option<i64>,
13+
},
14+
15+
#[snafu(display("only {updated_replicas} out of {total_replicas} are updated"))]
16+
HasOutdatedReplicas {
17+
total_replicas: i32,
18+
updated_replicas: i32,
19+
},
20+
}
21+
22+
/// Checks whether all ReplicaSet replicas are up-to-date according to `sts.spec`
23+
pub fn check_all_replicas_updated(sts: &StatefulSet) -> Result<(), OutdatedStatefulSet> {
24+
use outdated_statefulset::*;
25+
26+
let status = sts.status.as_ref().map_or_else(Cow::default, Cow::Borrowed);
27+
28+
let current_generation = sts.metadata.generation;
29+
let observed_generation = status.observed_generation;
30+
if current_generation != observed_generation {
31+
return NotYetObservedSnafu {
32+
current_generation,
33+
observed_generation,
34+
}
35+
.fail();
36+
}
37+
38+
let total_replicas = status.replicas;
39+
let updated_replicas = status.updated_replicas.unwrap_or(0);
40+
if total_replicas != updated_replicas {
41+
return HasOutdatedReplicasSnafu {
42+
total_replicas,
43+
updated_replicas,
44+
}
45+
.fail();
46+
}
47+
48+
Ok(())
49+
}

0 commit comments

Comments
 (0)