Skip to content

Commit 4b8f2db

Browse files
authored
fix: don't stop the reconciliation if one cluster is invalid (#520)
* fix: don't stop the reconciliation if one cluster is invalid * review feedback and changelog * restore authn reference check
1 parent cd7e5ec commit 4b8f2db

File tree

3 files changed

+66
-40
lines changed

3 files changed

+66
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
### Fixed
2121

2222
- Pass gitsync credentials through properly and use a fine-grained access token ([#489]).
23+
- Failing to parse one `AirflowCluster`/`AuthenticationClass` should no longer cause the whole operator to stop functioning ([#520]).
2324

2425
[#488]: https://github.com/stackabletech/airflow-operator/pull/488
2526
[#489]: https://github.com/stackabletech/airflow-operator/pull/489
2627
[#493]: https://github.com/stackabletech/airflow-operator/pull/493
2728
[#494]: https://github.com/stackabletech/airflow-operator/pull/494
29+
[#520]: https://github.com/stackabletech/airflow-operator/pull/520
2830

2931
## [24.7.0] - 2024-07-24
3032

rust/operator-binary/src/airflow_controller.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use stackable_operator::{
4545
DeepMerge,
4646
},
4747
kube::{
48+
core::{error_boundary, DeserializeGuard},
4849
runtime::{controller::Action, reflector::ObjectRef},
4950
Resource, ResourceExt,
5051
},
@@ -289,6 +290,11 @@ pub enum Error {
289290
"failed to write to String (Vec<u8> to be precise) containing Airflow config"
290291
))]
291292
WriteToConfigFileString { source: std::io::Error },
293+
294+
#[snafu(display("AirflowCluster object is invalid"))]
295+
InvalidAirflowCluster {
296+
source: error_boundary::InvalidObject,
297+
},
292298
}
293299

294300
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -299,9 +305,18 @@ impl ReconcilerError for Error {
299305
}
300306
}
301307

302-
pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> Result<Action> {
308+
pub async fn reconcile_airflow(
309+
airflow: Arc<DeserializeGuard<AirflowCluster>>,
310+
ctx: Arc<Ctx>,
311+
) -> Result<Action> {
303312
tracing::info!("Starting reconcile");
304313

314+
let airflow = airflow
315+
.0
316+
.as_ref()
317+
.map_err(error_boundary::InvalidObject::clone)
318+
.context(InvalidAirflowClusterSnafu)?;
319+
305320
let client = &ctx.client;
306321
let resolved_product_image: ResolvedProductImage = airflow
307322
.spec
@@ -338,7 +353,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
338353
}
339354
}
340355

341-
let role_config = transform_all_roles_to_config::<AirflowConfigFragment, _>(&airflow, roles);
356+
let role_config = transform_all_roles_to_config::<AirflowConfigFragment, _>(airflow, roles);
342357
let validated_role_config = validate_all_roles_and_groups_config(
343358
&resolved_product_image.product_version,
344359
&role_config.context(ProductConfigTransformSnafu)?,
@@ -350,7 +365,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
350365

351366
let vector_aggregator_address = resolve_vector_aggregator_address(
352367
client,
353-
airflow.as_ref(),
368+
airflow,
354369
airflow
355370
.spec
356371
.cluster_config
@@ -374,8 +389,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
374389
.context(BuildLabelSnafu)?;
375390

376391
let (rbac_sa, rbac_rolebinding) =
377-
build_rbac_resources(airflow.as_ref(), APP_NAME, required_labels)
378-
.context(BuildRBACObjectsSnafu)?;
392+
build_rbac_resources(airflow, APP_NAME, required_labels).context(BuildRBACObjectsSnafu)?;
379393

380394
let rbac_sa = cluster_resources
381395
.add(client, rbac_sa)
@@ -397,7 +411,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
397411
} = &airflow_executor
398412
{
399413
build_executor_template(
400-
&airflow,
414+
airflow,
401415
common_configuration,
402416
&resolved_product_image,
403417
&authentication_config,
@@ -418,7 +432,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
418432
// some roles will only run "internally" and do not need to be created as services
419433
if let Some(resolved_port) = role_port(role_name) {
420434
let role_service =
421-
build_role_service(&airflow, &resolved_product_image, role_name, resolved_port)?;
435+
build_role_service(airflow, &resolved_product_image, role_name, resolved_port)?;
422436
cluster_resources
423437
.add(client, role_service)
424438
.await
@@ -427,7 +441,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
427441

428442
for (rolegroup_name, rolegroup_config) in role_config.iter() {
429443
let rolegroup = RoleGroupRef {
430-
cluster: ObjectRef::from_obj(&*airflow),
444+
cluster: ObjectRef::from_obj(airflow),
431445
role: role_name.into(),
432446
role_group: rolegroup_name.into(),
433447
};
@@ -436,16 +450,15 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
436450
.merged_config(&airflow_role, &rolegroup)
437451
.context(FailedToResolveConfigSnafu)?;
438452

439-
let rg_service =
440-
build_rolegroup_service(&airflow, &resolved_product_image, &rolegroup)?;
453+
let rg_service = build_rolegroup_service(airflow, &resolved_product_image, &rolegroup)?;
441454
cluster_resources.add(client, rg_service).await.context(
442455
ApplyRoleGroupServiceSnafu {
443456
rolegroup: rolegroup.clone(),
444457
},
445458
)?;
446459

447460
let rg_statefulset = build_server_rolegroup_statefulset(
448-
&airflow,
461+
airflow,
449462
&resolved_product_image,
450463
&airflow_role,
451464
&rolegroup,
@@ -466,7 +479,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
466479
);
467480

468481
let rg_configmap = build_rolegroup_config_map(
469-
&airflow,
482+
airflow,
470483
&resolved_product_image,
471484
&rolegroup,
472485
rolegroup_config,
@@ -488,7 +501,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
488501
pod_disruption_budget: pdb,
489502
}) = role_config
490503
{
491-
add_pdbs(pdb, &airflow, &airflow_role, client, &mut cluster_resources)
504+
add_pdbs(pdb, airflow, &airflow_role, client, &mut cluster_resources)
492505
.await
493506
.context(FailedToCreatePdbSnafu)?;
494507
}
@@ -501,13 +514,13 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
501514

502515
let status = AirflowClusterStatus {
503516
conditions: compute_conditions(
504-
airflow.as_ref(),
517+
airflow,
505518
&[&ss_cond_builder, &cluster_operation_cond_builder],
506519
),
507520
};
508521

509522
client
510-
.apply_patch_status(OPERATOR_NAME, &*airflow, &status)
523+
.apply_patch_status(OPERATOR_NAME, airflow, &status)
511524
.await
512525
.context(ApplyStatusSnafu)?;
513526

@@ -516,7 +529,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
516529

517530
#[allow(clippy::too_many_arguments)]
518531
async fn build_executor_template(
519-
airflow: &Arc<AirflowCluster>,
532+
airflow: &AirflowCluster,
520533
common_config: &CommonConfiguration<ExecutorConfigFragment>,
521534
resolved_product_image: &ResolvedProductImage,
522535
authentication_config: &Vec<AirflowAuthenticationConfigResolved>,
@@ -529,7 +542,7 @@ async fn build_executor_template(
529542
.merged_executor_config(&common_config.config)
530543
.context(FailedToResolveConfigSnafu)?;
531544
let rolegroup = RoleGroupRef {
532-
cluster: ObjectRef::from_obj(&**airflow),
545+
cluster: ObjectRef::from_obj(airflow),
533546
role: "executor".into(),
534547
role_group: "kubernetes".into(),
535548
};
@@ -1223,8 +1236,17 @@ fn build_gitsync_container(
12231236
Ok(gitsync_container)
12241237
}
12251238

1226-
pub fn error_policy(_obj: Arc<AirflowCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
1227-
Action::requeue(*Duration::from_secs(5))
1239+
pub fn error_policy(
1240+
_obj: Arc<DeserializeGuard<AirflowCluster>>,
1241+
error: &Error,
1242+
_ctx: Arc<Ctx>,
1243+
) -> Action {
1244+
match error {
1245+
// root object is invalid, will be requeued when modified anyway
1246+
Error::InvalidAirflowCluster { .. } => Action::await_change(),
1247+
1248+
_ => Action::requeue(*Duration::from_secs(10)),
1249+
}
12281250
}
12291251

12301252
fn add_authentication_volumes_and_volume_mounts(

rust/operator-binary/src/main.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ use crate::airflow_controller::AIRFLOW_CONTROLLER_NAME;
1010

1111
use clap::{crate_description, crate_version, Parser};
1212
use futures::StreamExt;
13-
use stackable_airflow_crd::{
14-
authentication::AirflowAuthentication, AirflowCluster, APP_NAME, OPERATOR_NAME,
15-
};
13+
use stackable_airflow_crd::{AirflowCluster, APP_NAME, OPERATOR_NAME};
1614
use stackable_operator::{
1715
cli::{Command, ProductOperatorRun},
1816
commons::authentication::AuthenticationClass,
1917
k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Service},
2018
kube::{
21-
runtime::{reflector::ObjectRef, watcher, Controller},
22-
ResourceExt,
19+
core::DeserializeGuard,
20+
runtime::{
21+
reflector::{Lookup, ObjectRef},
22+
watcher, Controller,
23+
},
2324
},
2425
logging::controller::report_controller_reconciled,
2526
CustomResourceExt,
@@ -72,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
7273
stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?;
7374

7475
let airflow_controller_builder = Controller::new(
75-
watch_namespace.get_api::<AirflowCluster>(&client),
76+
watch_namespace.get_api::<DeserializeGuard<AirflowCluster>>(&client),
7677
watcher::Config::default(),
7778
);
7879

@@ -88,17 +89,14 @@ async fn main() -> anyhow::Result<()> {
8889
)
8990
.shutdown_on_signal()
9091
.watches(
91-
client.get_api::<AuthenticationClass>(&()),
92+
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
9293
watcher::Config::default(),
9394
move |authentication_class| {
9495
airflow_store_1
9596
.state()
9697
.into_iter()
97-
.filter(move |airflow: &Arc<AirflowCluster>| {
98-
references_authentication_class(
99-
&airflow.spec.cluster_config.authentication,
100-
&authentication_class,
101-
)
98+
.filter(move |airflow: &Arc<DeserializeGuard<AirflowCluster>>| {
99+
references_authentication_class(airflow, &authentication_class)
102100
})
103101
.map(|airflow| ObjectRef::from_obj(&*airflow))
104102
},
@@ -127,15 +125,19 @@ async fn main() -> anyhow::Result<()> {
127125
}
128126

129127
fn references_authentication_class(
130-
authentication_config: &AirflowAuthentication,
131-
authentication_class: &AuthenticationClass,
128+
airflow: &DeserializeGuard<AirflowCluster>,
129+
authentication_class: &DeserializeGuard<AuthenticationClass>,
132130
) -> bool {
133-
assert!(authentication_class.metadata.name.is_some());
134-
135-
authentication_config
131+
let Ok(airflow) = &airflow.0 else {
132+
return false;
133+
};
134+
let Some(authn_class_name) = authentication_class.name() else {
135+
return false;
136+
};
137+
airflow
138+
.spec
139+
.cluster_config
140+
.authentication
136141
.authentication_class_names()
137-
.into_iter()
138-
.filter(|c| *c == authentication_class.name_any())
139-
.count()
140-
> 0
142+
.contains(&&*authn_class_name)
141143
}

0 commit comments

Comments
 (0)