diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f68b765..af3572f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ All notable changes to this project will be documented in this file. - Include the global Kafka bootstrap service (not the rolegroup-specific) DNS record as SAN entry in the generated certificates used by Kafka. This allows you to access Kafka brokers secured using TLS via the global bootstrap service ([#741]). +- An invalid `KafkaCluster` doesn't cause the operator to stop functioning ([#773]). ### Removed @@ -32,6 +33,7 @@ All notable changes to this project will be documented in this file. [#741]: https://github.com/stackabletech/kafka-operator/pull/741 [#750]: https://github.com/stackabletech/kafka-operator/pull/750 [#753]: https://github.com/stackabletech/kafka-operator/pull/753 +[#773]: https://github.com/stackabletech/kafka-operator/pull/773 ## [24.7.0] - 2024-07-24 diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index e34b5b23..92de3e62 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -55,6 +55,7 @@ use stackable_operator::{ }, kube::{ api::DynamicObject, + core::{error_boundary, DeserializeGuard}, runtime::{controller::Action, reflector::ObjectRef}, Resource, ResourceExt, }, @@ -333,6 +334,11 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("KafkaCluster object is invalid"))] + InvalidKafkaCluster { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -394,12 +400,23 @@ impl ReconcilerError for Error { Error::ConfigureLogging { .. } => None, Error::AddVolume { .. } => None, Error::AddVolumeMount { .. } => None, + Error::InvalidKafkaCluster { .. } => None, } } } -pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result { +pub async fn reconcile_kafka( + kafka: Arc>, + ctx: Arc, +) -> Result { tracing::info!("Starting reconcile"); + + let kafka = kafka + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidKafkaClusterSnafu)?; + let client = &ctx.client; let kafka_role = KafkaRole::Broker; @@ -420,7 +437,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< let validated_config = validate_all_roles_and_groups_config( &resolved_product_image.product_version, &transform_all_roles_to_config( - kafka.as_ref(), + kafka, [( KafkaRole::Broker.to_string(), ( @@ -445,7 +462,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .map(Cow::Borrowed) .unwrap_or_default(); - let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, &kafka) + let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka) .await .context(FailedToInitializeSecurityContextSnafu)?; @@ -454,12 +471,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa { Some( opa_spec - .full_document_url_from_config_map( - client, - &*kafka, - Some("allow"), - OpaApiVersion::V1, - ) + .full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1) .await .context(InvalidOpaConfigSnafu)?, ) @@ -467,14 +479,14 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< None }; - let vector_aggregator_address = resolve_vector_aggregator_address(&kafka, client) + let vector_aggregator_address = resolve_vector_aggregator_address(kafka, client) .await .context(ResolveVectorAggregatorAddressSnafu)?; let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let (rbac_sa, rbac_rolebinding) = build_rbac_resources( - kafka.as_ref(), + kafka, APP_NAME, cluster_resources .get_required_labels() @@ -501,9 +513,9 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .context(FailedToResolveConfigSnafu)?; let rg_service = - build_broker_rolegroup_service(&kafka, &resolved_product_image, &rolegroup_ref)?; + build_broker_rolegroup_service(kafka, &resolved_product_image, &rolegroup_ref)?; let rg_configmap = build_broker_rolegroup_config_map( - &kafka, + kafka, &resolved_product_image, &kafka_security, &rolegroup_ref, @@ -512,7 +524,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< vector_aggregator_address.as_deref(), )?; let rg_statefulset = build_broker_rolegroup_statefulset( - &kafka, + kafka, &kafka_role, &resolved_product_image, &rolegroup_ref, @@ -523,7 +535,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< &rbac_sa.name_any(), )?; let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener( - &kafka, + kafka, &resolved_product_image, &kafka_security, &rolegroup_ref, @@ -564,14 +576,14 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< pod_disruption_budget: pdb, }) = role_config { - add_pdbs(pdb, &kafka, &kafka_role, client, &mut cluster_resources) + add_pdbs(pdb, kafka, &kafka_role, client, &mut cluster_resources) .await .context(FailedToCreatePdbSnafu)?; } for discovery_cm in build_discovery_configmaps( - &kafka, - &*kafka, + kafka, + kafka, &resolved_product_image, &kafka_security, &bootstrap_listeners, @@ -589,10 +601,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< ClusterOperationsConditionBuilder::new(&kafka.spec.cluster_operation); let status = KafkaClusterStatus { - conditions: compute_conditions( - kafka.as_ref(), - &[&ss_cond_builder, &cluster_operation_cond_builder], - ), + conditions: compute_conditions(kafka, &[&ss_cond_builder, &cluster_operation_cond_builder]), }; cluster_resources @@ -601,7 +610,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .context(DeleteOrphansSnafu)?; client - .apply_patch_status(OPERATOR_NAME, &*kafka, &status) + .apply_patch_status(OPERATOR_NAME, kafka, &status) .await .context(ApplyStatusSnafu)?; @@ -1099,8 +1108,15 @@ fn build_broker_rolegroup_statefulset( }) } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidKafkaCluster { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } } /// We only expose client HTTP / HTTPS and Metrics ports. diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 2865c626..7d3d7d7e 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -13,6 +13,7 @@ use stackable_operator::{ core::v1::{ConfigMap, Service, ServiceAccount}, rbac::v1::RoleBinding, }, + kube::core::DeserializeGuard, kube::runtime::{watcher, Controller}, logging::controller::report_controller_reconciled, namespace::WatchNamespace, @@ -96,7 +97,7 @@ pub async fn create_controller( namespace: WatchNamespace, ) { let kafka_controller = Controller::new( - namespace.get_api::(&client), + namespace.get_api::>(&client), watcher::Config::default(), ) .owns(