Skip to content

fix: don't stop the reconciliation if one cluster is invalid #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ All notable changes to this project will be documented in this file.
- `podOverrides`
- `affinity`

### Fixed

- An invalid `HdfsCluster` doesn't cause the operator to stop functioning ([#594]).

[#574]: https://github.com/stackabletech/hdfs-operator/pull/574
[#591]: https://github.com/stackabletech/hdfs-operator/pull/591
[#594]: https://github.com/stackabletech/hdfs-operator/pull/594

## [24.7.0] - 2024-07-24

Expand Down
67 changes: 43 additions & 24 deletions rust/operator-binary/src/hdfs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use stackable_operator::{
},
kube::{
api::ObjectMeta,
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
Resource, ResourceExt,
},
Expand Down Expand Up @@ -241,6 +242,11 @@ pub enum Error {

#[snafu(display("invalid OPA configuration"))]
InvalidOpaConfig { source: security::opa::Error },

#[snafu(display("HdfsCluster object is invalid"))]
InvalidHdfsCluster {
source: error_boundary::InvalidObject,
},
}

impl ReconcilerError for Error {
Expand All @@ -256,23 +262,32 @@ pub struct Ctx {
pub product_config: ProductConfigManager,
}

pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperatorResult<Action> {
pub async fn reconcile_hdfs(
hdfs: Arc<DeserializeGuard<HdfsCluster>>,
ctx: Arc<Ctx>,
) -> HdfsOperatorResult<Action> {
tracing::info!("Starting reconcile");

let hdfs = hdfs
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidHdfsClusterSnafu)?;
let client = &ctx.client;

let resolved_product_image = hdfs
.spec
.image
.resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION);

let vector_aggregator_address = resolve_vector_aggregator_address(&hdfs, client)
let vector_aggregator_address = resolve_vector_aggregator_address(hdfs, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let validated_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&transform_all_roles_to_config(
hdfs.as_ref(),
hdfs,
hdfs.build_role_properties()
.context(BuildRolePropertiesSnafu)?,
)
Expand Down Expand Up @@ -302,7 +317,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

// The service account and rolebinding will be created per cluster
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
hdfs.as_ref(),
hdfs,
APP_NAME,
cluster_resources
.get_required_labels()
Expand All @@ -321,7 +336,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

let hdfs_opa_config = match &hdfs.spec.cluster_config.authorization {
Some(opa_config) => Some(
HdfsOpaConfig::from_opa_config(client, &hdfs, opa_config)
HdfsOpaConfig::from_opa_config(client, hdfs, opa_config)
.await
.context(InvalidOpaConfigSnafu)?,
),
Expand Down Expand Up @@ -354,9 +369,9 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
continue;
};

if let Some(content) = build_invalid_replica_message(&hdfs, &role, dfs_replication) {
if let Some(content) = build_invalid_replica_message(hdfs, &role, dfs_replication) {
publish_event(
&hdfs,
hdfs,
client,
"Reconcile",
"Invalid replicas",
Expand All @@ -368,7 +383,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

for (rolegroup_name, rolegroup_config) in group_config.iter() {
let merged_config = role
.merged_config(&hdfs, rolegroup_name)
.merged_config(hdfs, rolegroup_name)
.context(ConfigMergeSnafu)?;

let env_overrides = rolegroup_config.get(&PropertyNameKind::Env);
Expand All @@ -379,25 +394,25 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
// to avoid the compiler error "E0716 (temporary value dropped while borrowed)".
let mut metadata = ObjectMetaBuilder::new();
let metadata = metadata
.name_and_namespace(hdfs.as_ref())
.name_and_namespace(hdfs)
.name(rolegroup_ref.object_name())
.ownerreference_from_resource(hdfs.as_ref(), None, Some(true))
.ownerreference_from_resource(hdfs, None, Some(true))
.with_context(|_| ObjectMissingMetadataForOwnerRefSnafu {
obj_ref: ObjectRef::from_obj(hdfs.as_ref()),
obj_ref: ObjectRef::from_obj(hdfs),
})?
.with_recommended_labels(build_recommended_labels(
hdfs.as_ref(),
hdfs,
RESOURCE_MANAGER_HDFS_CONTROLLER,
&resolved_product_image.app_version_label,
&rolegroup_ref.role,
&rolegroup_ref.role_group,
))
.context(ObjectMetaSnafu)?;

let rg_service = rolegroup_service(&hdfs, metadata, &role, &rolegroup_ref)?;
let rg_service = rolegroup_service(hdfs, metadata, &role, &rolegroup_ref)?;

let rg_configmap = rolegroup_config_map(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
metadata,
&rolegroup_ref,
Expand All @@ -410,7 +425,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
)?;

let rg_statefulset = rolegroup_statefulset(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
metadata,
&role,
Expand Down Expand Up @@ -463,7 +478,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &hdfs, &role, client, &mut cluster_resources)
add_pdbs(pdb, hdfs, &role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}
Expand All @@ -472,7 +487,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
// Discovery CM will fail to build until the rest of the cluster has been deployed, so do it last
// so that failure won't inhibit the rest of the cluster from booting up.
let discovery_cm = build_discovery_configmap(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
HDFS_CONTROLLER,
&hdfs
Expand All @@ -496,10 +511,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
ClusterOperationsConditionBuilder::new(&hdfs.spec.cluster_operation);

let status = HdfsClusterStatus {
conditions: compute_conditions(
hdfs.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
conditions: compute_conditions(hdfs, &[&ss_cond_builder, &cluster_operation_cond_builder]),
// FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized
deployed_product_version: Some(
hdfs.status
Expand Down Expand Up @@ -539,7 +551,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
.context(DeleteOrphanedResourcesSnafu)?;
}
client
.apply_patch_status(OPERATOR_NAME, &*hdfs, &status)
.apply_patch_status(OPERATOR_NAME, hdfs, &status)
.await
.context(ApplyStatusSnafu)?;

Expand Down Expand Up @@ -893,8 +905,15 @@ fn rolegroup_statefulset(
})
}

pub fn error_policy(_obj: Arc<HdfsCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<HdfsCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidHdfsCluster { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

#[cfg(test)]
Expand Down
9 changes: 5 additions & 4 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use stackable_operator::{
apps::v1::StatefulSet,
core::v1::{ConfigMap, Service},
},
kube::core::DeserializeGuard,
kube::{
api::PartialObjectMeta,
runtime::{reflector, watcher, Controller},
Expand Down Expand Up @@ -109,19 +110,19 @@ pub async fn create_controller(
});

let hdfs_controller = Controller::new(
namespace.get_api::<HdfsCluster>(&client),
namespace.get_api::<DeserializeGuard<HdfsCluster>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<StatefulSet>(&client),
namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Service>(&client),
namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ConfigMap>(&client),
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand Down
Loading