@@ -38,6 +38,7 @@ use stackable_operator::{
38
38
apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
39
39
DeepMerge ,
40
40
} ,
41
+ kube:: core:: { error_boundary, DeserializeGuard } ,
41
42
kube:: { runtime:: controller:: Action , Resource } ,
42
43
kvp:: { Label , LabelError , Labels , ObjectLabels } ,
43
44
logging:: controller:: ReconcilerError ,
@@ -291,6 +292,11 @@ pub enum Error {
291
292
292
293
#[ snafu( display( "authorization is only supported from HBase 2.6 onwards" ) ) ]
293
294
AuthorizationNotSupported ,
295
+
296
+ #[ snafu( display( "HBaseCluster object is invalid" ) ) ]
297
+ InvalidHBaseCluster {
298
+ source : error_boundary:: InvalidObject ,
299
+ } ,
294
300
}
295
301
296
302
type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -301,31 +307,39 @@ impl ReconcilerError for Error {
301
307
}
302
308
}
303
309
304
- pub async fn reconcile_hbase ( hbase : Arc < HbaseCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
310
+ pub async fn reconcile_hbase (
311
+ hbase : Arc < DeserializeGuard < HbaseCluster > > ,
312
+ ctx : Arc < Ctx > ,
313
+ ) -> Result < Action > {
305
314
tracing:: info!( "Starting reconcile" ) ;
306
315
316
+ let hbase = hbase
317
+ . 0
318
+ . as_ref ( )
319
+ . map_err ( error_boundary:: InvalidObject :: clone)
320
+ . context ( InvalidHBaseClusterSnafu ) ?;
321
+
307
322
let client = & ctx. client ;
308
323
309
- validate_cr ( & hbase) ?;
324
+ validate_cr ( hbase) ?;
310
325
311
326
let resolved_product_image = hbase
312
327
. spec
313
328
. image
314
329
. resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
315
- let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( & hbase, client)
330
+ let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( hbase, client)
316
331
. await
317
332
. context ( RetrieveZookeeperConnectionInformationSnafu ) ?;
318
333
319
- let vector_aggregator_address = resolve_vector_aggregator_address ( & hbase, client)
334
+ let vector_aggregator_address = resolve_vector_aggregator_address ( hbase, client)
320
335
. await
321
336
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
322
337
323
- let roles = build_roles ( & hbase) ?;
338
+ let roles = build_roles ( hbase) ?;
324
339
325
340
let validated_config = validate_all_roles_and_groups_config (
326
341
& resolved_product_image. app_version_label ,
327
- & transform_all_roles_to_config ( hbase. as_ref ( ) , roles)
328
- . context ( GenerateProductConfigSnafu ) ?,
342
+ & transform_all_roles_to_config ( hbase, roles) . context ( GenerateProductConfigSnafu ) ?,
329
343
& ctx. product_config ,
330
344
false ,
331
345
false ,
@@ -334,7 +348,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
334
348
335
349
let hbase_opa_config = match & hbase. spec . cluster_config . authorization {
336
350
Some ( opa_config) => Some (
337
- HbaseOpaConfig :: from_opa_config ( client, & hbase, opa_config)
351
+ HbaseOpaConfig :: from_opa_config ( client, hbase, opa_config)
338
352
. await
339
353
. context ( InvalidOpaConfigSnafu ) ?,
340
354
) ,
@@ -351,15 +365,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
351
365
. context ( CreateClusterResourcesSnafu ) ?;
352
366
353
367
let region_server_role_service =
354
- build_region_server_role_service ( & hbase, & resolved_product_image) ?;
368
+ build_region_server_role_service ( hbase, & resolved_product_image) ?;
355
369
cluster_resources
356
370
. add ( client, region_server_role_service)
357
371
. await
358
372
. context ( ApplyRoleServiceSnafu ) ?;
359
373
360
374
// discovery config map
361
375
let discovery_cm = build_discovery_configmap (
362
- & hbase,
376
+ hbase,
363
377
& zookeeper_connection_information,
364
378
& resolved_product_image,
365
379
)
@@ -370,7 +384,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
370
384
. context ( ApplyDiscoveryConfigMapSnafu ) ?;
371
385
372
386
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
373
- hbase. as_ref ( ) ,
387
+ hbase,
374
388
APP_NAME ,
375
389
cluster_resources
376
390
. get_required_labels ( )
@@ -404,9 +418,9 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
404
418
. context ( FailedToResolveConfigSnafu ) ?;
405
419
406
420
let rg_service =
407
- build_rolegroup_service ( & hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
421
+ build_rolegroup_service ( hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
408
422
let rg_configmap = build_rolegroup_config_map (
409
- & hbase,
423
+ hbase,
410
424
& rolegroup,
411
425
rolegroup_config,
412
426
& zookeeper_connection_information,
@@ -416,7 +430,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
416
430
vector_aggregator_address. as_deref ( ) ,
417
431
) ?;
418
432
let rg_statefulset = build_rolegroup_statefulset (
419
- & hbase,
433
+ hbase,
420
434
& hbase_role,
421
435
& rolegroup,
422
436
rolegroup_config,
@@ -450,7 +464,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
450
464
pod_disruption_budget : pdb,
451
465
} ) = role_config
452
466
{
453
- add_pdbs ( pdb, & hbase, & hbase_role, client, & mut cluster_resources)
467
+ add_pdbs ( pdb, hbase, & hbase_role, client, & mut cluster_resources)
454
468
. await
455
469
. context ( FailedToCreatePdbSnafu ) ?;
456
470
}
@@ -460,18 +474,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
460
474
ClusterOperationsConditionBuilder :: new ( & hbase. spec . cluster_operation ) ;
461
475
462
476
let status = HbaseClusterStatus {
463
- conditions : compute_conditions (
464
- hbase. as_ref ( ) ,
465
- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
466
- ) ,
477
+ conditions : compute_conditions ( hbase, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
467
478
} ;
468
479
469
480
cluster_resources
470
481
. delete_orphaned_resources ( client)
471
482
. await
472
483
. context ( DeleteOrphanedResourcesSnafu ) ?;
473
484
client
474
- . apply_patch_status ( OPERATOR_NAME , hbase. as_ref ( ) , & status)
485
+ . apply_patch_status ( OPERATOR_NAME , hbase, & status)
475
486
. await
476
487
. context ( ApplyStatusSnafu ) ?;
477
488
@@ -1071,8 +1082,16 @@ where
1071
1082
} )
1072
1083
}
1073
1084
1074
- pub fn error_policy ( _obj : Arc < HbaseCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1075
- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1085
+ pub fn error_policy (
1086
+ _obj : Arc < DeserializeGuard < HbaseCluster > > ,
1087
+ error : & Error ,
1088
+ _ctx : Arc < Ctx > ,
1089
+ ) -> Action {
1090
+ match error {
1091
+ // root object is invalid, will be requed when modified
1092
+ Error :: InvalidHBaseCluster { .. } => Action :: await_change ( ) ,
1093
+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1094
+ }
1076
1095
}
1077
1096
1078
1097
pub fn build_recommended_labels < ' a > (
0 commit comments