@@ -54,6 +54,7 @@ use stackable_operator::{
54
54
} ,
55
55
DeepMerge ,
56
56
} ,
57
+ kube:: core:: { error_boundary, DeserializeGuard } ,
57
58
kube:: { runtime:: controller:: Action , Resource , ResourceExt } ,
58
59
kvp:: { Label , Labels , ObjectLabels } ,
59
60
logging:: controller:: ReconcilerError ,
@@ -323,6 +324,11 @@ pub enum Error {
323
324
AddVolumeMount {
324
325
source : builder:: pod:: container:: Error ,
325
326
} ,
327
+
328
+ #[ snafu( display( "HiveCluster object is invalid" ) ) ]
329
+ InvalidHiveCluster {
330
+ source : error_boundary:: InvalidObject ,
331
+ } ,
326
332
}
327
333
type Result < T , E = Error > = std:: result:: Result < T , E > ;
328
334
@@ -332,8 +338,16 @@ impl ReconcilerError for Error {
332
338
}
333
339
}
334
340
335
- pub async fn reconcile_hive ( hive : Arc < HiveCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
341
+ pub async fn reconcile_hive (
342
+ hive : Arc < DeserializeGuard < HiveCluster > > ,
343
+ ctx : Arc < Ctx > ,
344
+ ) -> Result < Action > {
336
345
tracing:: info!( "Starting reconcile" ) ;
346
+ let hive = hive
347
+ . 0
348
+ . as_ref ( )
349
+ . map_err ( error_boundary:: InvalidObject :: clone)
350
+ . context ( InvalidHiveClusterSnafu ) ?;
337
351
let client = & ctx. client ;
338
352
let hive_namespace = hive. namespace ( ) . context ( ObjectHasNoNamespaceSnafu ) ?;
339
353
@@ -361,7 +375,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
361
375
let validated_config = validate_all_roles_and_groups_config (
362
376
& resolved_product_image. product_version ,
363
377
& transform_all_roles_to_config (
364
- hive. as_ref ( ) ,
378
+ hive,
365
379
[ (
366
380
HiveRole :: MetaStore . to_string ( ) ,
367
381
(
@@ -399,7 +413,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
399
413
. context ( CreateClusterResourcesSnafu ) ?;
400
414
401
415
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
402
- hive. as_ref ( ) ,
416
+ hive,
403
417
APP_NAME ,
404
418
cluster_resources
405
419
. get_required_labels ( )
@@ -416,15 +430,15 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
416
430
. await
417
431
. context ( ApplyRoleBindingSnafu ) ?;
418
432
419
- let metastore_role_service = build_metastore_role_service ( & hive, & resolved_product_image) ?;
433
+ let metastore_role_service = build_metastore_role_service ( hive, & resolved_product_image) ?;
420
434
421
435
// we have to get the assigned ports
422
436
let metastore_role_service = cluster_resources
423
437
. add ( client, metastore_role_service)
424
438
. await
425
439
. context ( ApplyRoleServiceSnafu ) ?;
426
440
427
- let vector_aggregator_address = resolve_vector_aggregator_address ( & hive, client)
441
+ let vector_aggregator_address = resolve_vector_aggregator_address ( hive, client)
428
442
. await
429
443
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
430
444
@@ -437,9 +451,9 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
437
451
. merged_config ( & HiveRole :: MetaStore , & rolegroup)
438
452
. context ( FailedToResolveResourceConfigSnafu ) ?;
439
453
440
- let rg_service = build_rolegroup_service ( & hive, & resolved_product_image, & rolegroup) ?;
454
+ let rg_service = build_rolegroup_service ( hive, & resolved_product_image, & rolegroup) ?;
441
455
let rg_configmap = build_metastore_rolegroup_config_map (
442
- & hive,
456
+ hive,
443
457
& hive_namespace,
444
458
& resolved_product_image,
445
459
& rolegroup,
@@ -449,7 +463,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
449
463
vector_aggregator_address. as_deref ( ) ,
450
464
) ?;
451
465
let rg_statefulset = build_metastore_rolegroup_statefulset (
452
- & hive,
466
+ hive,
453
467
& hive_role,
454
468
& resolved_product_image,
455
469
& rolegroup,
@@ -488,7 +502,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
488
502
pod_disruption_budget : pdb,
489
503
} ) = role_config
490
504
{
491
- add_pdbs ( pdb, & hive, & hive_role, client, & mut cluster_resources)
505
+ add_pdbs ( pdb, hive, & hive_role, client, & mut cluster_resources)
492
506
. await
493
507
. context ( FailedToCreatePdbSnafu ) ?;
494
508
}
@@ -498,8 +512,8 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
498
512
let mut discovery_hash = FnvHasher :: with_key ( 0 ) ;
499
513
for discovery_cm in discovery:: build_discovery_configmaps (
500
514
client,
501
- & * hive,
502
- & hive,
515
+ hive,
516
+ hive,
503
517
& resolved_product_image,
504
518
& metastore_role_service,
505
519
None ,
@@ -523,14 +537,11 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
523
537
// Serialize as a string to discourage users from trying to parse the value,
524
538
// and to keep things flexible if we end up changing the hasher at some point.
525
539
discovery_hash : Some ( discovery_hash. finish ( ) . to_string ( ) ) ,
526
- conditions : compute_conditions (
527
- hive. as_ref ( ) ,
528
- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
529
- ) ,
540
+ conditions : compute_conditions ( hive, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
530
541
} ;
531
542
532
543
client
533
- . apply_patch_status ( OPERATOR_NAME , & * hive, & status)
544
+ . apply_patch_status ( OPERATOR_NAME , hive, & status)
534
545
. await
535
546
. context ( ApplyStatusSnafu ) ?;
536
547
@@ -1117,8 +1128,16 @@ fn env_var_from_secret(var_name: &str, secret: &str, secret_key: &str) -> EnvVar
1117
1128
}
1118
1129
}
1119
1130
1120
- pub fn error_policy ( _obj : Arc < HiveCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1121
- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1131
+ pub fn error_policy (
1132
+ _obj : Arc < DeserializeGuard < HiveCluster > > ,
1133
+ error : & Error ,
1134
+ _ctx : Arc < Ctx > ,
1135
+ ) -> Action {
1136
+ match error {
1137
+ // An invalid HBaseCluster was deserialized. Await for it to change.
1138
+ Error :: InvalidHiveCluster { .. } => Action :: await_change ( ) ,
1139
+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1140
+ }
1122
1141
}
1123
1142
1124
1143
pub fn service_ports ( ) -> Vec < ServicePort > {
0 commit comments