Skip to content

fix: don't stop the reconciliation if one cluster is invalid #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ All notable changes to this project will be documented in this file.
- Include the global Kafka bootstrap service (not the rolegroup-specific) DNS record as SAN entry in the generated
certificates used by Kafka. This allows you to access Kafka brokers secured using TLS via the global bootstrap
service ([#741]).
- An invalid `KafkaCluster` doesn't cause the operator to stop functioning ([#773]).

### Removed

Expand All @@ -32,6 +33,7 @@ All notable changes to this project will be documented in this file.
[#741]: https://github.com/stackabletech/kafka-operator/pull/741
[#750]: https://github.com/stackabletech/kafka-operator/pull/750
[#753]: https://github.com/stackabletech/kafka-operator/pull/753
[#773]: https://github.com/stackabletech/kafka-operator/pull/773

## [24.7.0] - 2024-07-24

Expand Down
66 changes: 41 additions & 25 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use stackable_operator::{
},
kube::{
api::DynamicObject,
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
Resource, ResourceExt,
},
Expand Down Expand Up @@ -333,6 +334,11 @@ pub enum Error {
AddVolumeMount {
source: builder::pod::container::Error,
},

#[snafu(display("KafkaCluster object is invalid"))]
InvalidKafkaCluster {
source: error_boundary::InvalidObject,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -394,12 +400,23 @@ impl ReconcilerError for Error {
Error::ConfigureLogging { .. } => None,
Error::AddVolume { .. } => None,
Error::AddVolumeMount { .. } => None,
Error::InvalidKafkaCluster { .. } => None,
}
}
}

pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<Action> {
pub async fn reconcile_kafka(
kafka: Arc<DeserializeGuard<KafkaCluster>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");

let kafka = kafka
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidKafkaClusterSnafu)?;

let client = &ctx.client;
let kafka_role = KafkaRole::Broker;

Expand All @@ -420,7 +437,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
let validated_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&transform_all_roles_to_config(
kafka.as_ref(),
kafka,
[(
KafkaRole::Broker.to_string(),
(
Expand All @@ -445,7 +462,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
.map(Cow::Borrowed)
.unwrap_or_default();

let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, &kafka)
let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka)
.await
.context(FailedToInitializeSecurityContextSnafu)?;

Expand All @@ -454,27 +471,22 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa {
Some(
opa_spec
.full_document_url_from_config_map(
client,
&*kafka,
Some("allow"),
OpaApiVersion::V1,
)
.full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1)
.await
.context(InvalidOpaConfigSnafu)?,
)
} else {
None
};

let vector_aggregator_address = resolve_vector_aggregator_address(&kafka, client)
let vector_aggregator_address = resolve_vector_aggregator_address(kafka, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
kafka.as_ref(),
kafka,
APP_NAME,
cluster_resources
.get_required_labels()
Expand All @@ -501,9 +513,9 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_broker_rolegroup_service(&kafka, &resolved_product_image, &rolegroup_ref)?;
build_broker_rolegroup_service(kafka, &resolved_product_image, &rolegroup_ref)?;
let rg_configmap = build_broker_rolegroup_config_map(
&kafka,
kafka,
&resolved_product_image,
&kafka_security,
&rolegroup_ref,
Expand All @@ -512,7 +524,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
vector_aggregator_address.as_deref(),
)?;
let rg_statefulset = build_broker_rolegroup_statefulset(
&kafka,
kafka,
&kafka_role,
&resolved_product_image,
&rolegroup_ref,
Expand All @@ -523,7 +535,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
&rbac_sa.name_any(),
)?;
let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener(
&kafka,
kafka,
&resolved_product_image,
&kafka_security,
&rolegroup_ref,
Expand Down Expand Up @@ -564,14 +576,14 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &kafka, &kafka_role, client, &mut cluster_resources)
add_pdbs(pdb, kafka, &kafka_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}

for discovery_cm in build_discovery_configmaps(
&kafka,
&*kafka,
kafka,
kafka,
&resolved_product_image,
&kafka_security,
&bootstrap_listeners,
Expand All @@ -589,10 +601,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
ClusterOperationsConditionBuilder::new(&kafka.spec.cluster_operation);

let status = KafkaClusterStatus {
conditions: compute_conditions(
kafka.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
conditions: compute_conditions(kafka, &[&ss_cond_builder, &cluster_operation_cond_builder]),
};

cluster_resources
Expand All @@ -601,7 +610,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
.context(DeleteOrphansSnafu)?;

client
.apply_patch_status(OPERATOR_NAME, &*kafka, &status)
.apply_patch_status(OPERATOR_NAME, kafka, &status)
.await
.context(ApplyStatusSnafu)?;

Expand Down Expand Up @@ -1099,8 +1108,15 @@ fn build_broker_rolegroup_statefulset(
})
}

pub fn error_policy(_obj: Arc<KafkaCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<KafkaCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidKafkaCluster { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

/// We only expose client HTTP / HTTPS and Metrics ports.
Expand Down
3 changes: 2 additions & 1 deletion rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use stackable_operator::{
core::v1::{ConfigMap, Service, ServiceAccount},
rbac::v1::RoleBinding,
},
kube::core::DeserializeGuard,
kube::runtime::{watcher, Controller},
logging::controller::report_controller_reconciled,
namespace::WatchNamespace,
Expand Down Expand Up @@ -96,7 +97,7 @@ pub async fn create_controller(
namespace: WatchNamespace,
) {
let kafka_controller = Controller::new(
namespace.get_api::<KafkaCluster>(&client),
namespace.get_api::<DeserializeGuard<KafkaCluster>>(&client),
watcher::Config::default(),
)
.owns(
Expand Down
Loading