@@ -55,6 +55,7 @@ use stackable_operator::{
55
55
} ,
56
56
kube:: {
57
57
api:: DynamicObject ,
58
+ core:: { error_boundary, DeserializeGuard } ,
58
59
runtime:: { controller:: Action , reflector:: ObjectRef } ,
59
60
Resource , ResourceExt ,
60
61
} ,
@@ -333,6 +334,11 @@ pub enum Error {
333
334
AddVolumeMount {
334
335
source : builder:: pod:: container:: Error ,
335
336
} ,
337
+
338
+ #[ snafu( display( "KafkaCluster object is invalid" ) ) ]
339
+ InvalidKafkaCluster {
340
+ source : error_boundary:: InvalidObject ,
341
+ } ,
336
342
}
337
343
type Result < T , E = Error > = std:: result:: Result < T , E > ;
338
344
@@ -394,12 +400,23 @@ impl ReconcilerError for Error {
394
400
Error :: ConfigureLogging { .. } => None ,
395
401
Error :: AddVolume { .. } => None ,
396
402
Error :: AddVolumeMount { .. } => None ,
403
+ Error :: InvalidKafkaCluster { .. } => None ,
397
404
}
398
405
}
399
406
}
400
407
401
- pub async fn reconcile_kafka ( kafka : Arc < KafkaCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
408
+ pub async fn reconcile_kafka (
409
+ kafka : Arc < DeserializeGuard < KafkaCluster > > ,
410
+ ctx : Arc < Ctx > ,
411
+ ) -> Result < Action > {
402
412
tracing:: info!( "Starting reconcile" ) ;
413
+
414
+ let kafka = kafka
415
+ . 0
416
+ . as_ref ( )
417
+ . map_err ( error_boundary:: InvalidObject :: clone)
418
+ . context ( InvalidKafkaClusterSnafu ) ?;
419
+
403
420
let client = & ctx. client ;
404
421
let kafka_role = KafkaRole :: Broker ;
405
422
@@ -420,7 +437,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
420
437
let validated_config = validate_all_roles_and_groups_config (
421
438
& resolved_product_image. product_version ,
422
439
& transform_all_roles_to_config (
423
- kafka. as_ref ( ) ,
440
+ kafka,
424
441
[ (
425
442
KafkaRole :: Broker . to_string ( ) ,
426
443
(
@@ -445,7 +462,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
445
462
. map ( Cow :: Borrowed )
446
463
. unwrap_or_default ( ) ;
447
464
448
- let kafka_security = KafkaTlsSecurity :: new_from_kafka_cluster ( client, & kafka)
465
+ let kafka_security = KafkaTlsSecurity :: new_from_kafka_cluster ( client, kafka)
449
466
. await
450
467
. context ( FailedToInitializeSecurityContextSnafu ) ?;
451
468
@@ -454,27 +471,22 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
454
471
let opa_connect = if let Some ( opa_spec) = & kafka. spec . cluster_config . authorization . opa {
455
472
Some (
456
473
opa_spec
457
- . full_document_url_from_config_map (
458
- client,
459
- & * kafka,
460
- Some ( "allow" ) ,
461
- OpaApiVersion :: V1 ,
462
- )
474
+ . full_document_url_from_config_map ( client, kafka, Some ( "allow" ) , OpaApiVersion :: V1 )
463
475
. await
464
476
. context ( InvalidOpaConfigSnafu ) ?,
465
477
)
466
478
} else {
467
479
None
468
480
} ;
469
481
470
- let vector_aggregator_address = resolve_vector_aggregator_address ( & kafka, client)
482
+ let vector_aggregator_address = resolve_vector_aggregator_address ( kafka, client)
471
483
. await
472
484
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
473
485
474
486
let mut ss_cond_builder = StatefulSetConditionBuilder :: default ( ) ;
475
487
476
488
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
477
- kafka. as_ref ( ) ,
489
+ kafka,
478
490
APP_NAME ,
479
491
cluster_resources
480
492
. get_required_labels ( )
@@ -501,9 +513,9 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
501
513
. context ( FailedToResolveConfigSnafu ) ?;
502
514
503
515
let rg_service =
504
- build_broker_rolegroup_service ( & kafka, & resolved_product_image, & rolegroup_ref) ?;
516
+ build_broker_rolegroup_service ( kafka, & resolved_product_image, & rolegroup_ref) ?;
505
517
let rg_configmap = build_broker_rolegroup_config_map (
506
- & kafka,
518
+ kafka,
507
519
& resolved_product_image,
508
520
& kafka_security,
509
521
& rolegroup_ref,
@@ -512,7 +524,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
512
524
vector_aggregator_address. as_deref ( ) ,
513
525
) ?;
514
526
let rg_statefulset = build_broker_rolegroup_statefulset (
515
- & kafka,
527
+ kafka,
516
528
& kafka_role,
517
529
& resolved_product_image,
518
530
& rolegroup_ref,
@@ -523,7 +535,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
523
535
& rbac_sa. name_any ( ) ,
524
536
) ?;
525
537
let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener (
526
- & kafka,
538
+ kafka,
527
539
& resolved_product_image,
528
540
& kafka_security,
529
541
& rolegroup_ref,
@@ -564,14 +576,14 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
564
576
pod_disruption_budget : pdb,
565
577
} ) = role_config
566
578
{
567
- add_pdbs ( pdb, & kafka, & kafka_role, client, & mut cluster_resources)
579
+ add_pdbs ( pdb, kafka, & kafka_role, client, & mut cluster_resources)
568
580
. await
569
581
. context ( FailedToCreatePdbSnafu ) ?;
570
582
}
571
583
572
584
for discovery_cm in build_discovery_configmaps (
573
- & kafka,
574
- & * kafka,
585
+ kafka,
586
+ kafka,
575
587
& resolved_product_image,
576
588
& kafka_security,
577
589
& bootstrap_listeners,
@@ -589,10 +601,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
589
601
ClusterOperationsConditionBuilder :: new ( & kafka. spec . cluster_operation ) ;
590
602
591
603
let status = KafkaClusterStatus {
592
- conditions : compute_conditions (
593
- kafka. as_ref ( ) ,
594
- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
595
- ) ,
604
+ conditions : compute_conditions ( kafka, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
596
605
} ;
597
606
598
607
cluster_resources
@@ -601,7 +610,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
601
610
. context ( DeleteOrphansSnafu ) ?;
602
611
603
612
client
604
- . apply_patch_status ( OPERATOR_NAME , & * kafka, & status)
613
+ . apply_patch_status ( OPERATOR_NAME , kafka, & status)
605
614
. await
606
615
. context ( ApplyStatusSnafu ) ?;
607
616
@@ -1099,8 +1108,15 @@ fn build_broker_rolegroup_statefulset(
1099
1108
} )
1100
1109
}
1101
1110
1102
- pub fn error_policy ( _obj : Arc < KafkaCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1103
- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1111
+ pub fn error_policy (
1112
+ _obj : Arc < DeserializeGuard < KafkaCluster > > ,
1113
+ error : & Error ,
1114
+ _ctx : Arc < Ctx > ,
1115
+ ) -> Action {
1116
+ match error {
1117
+ Error :: InvalidKafkaCluster { .. } => Action :: await_change ( ) ,
1118
+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1119
+ }
1104
1120
}
1105
1121
1106
1122
/// We only expose client HTTP / HTTPS and Metrics ports.
0 commit comments