@@ -32,6 +32,7 @@ use stackable_operator::{
32
32
} ,
33
33
kube:: {
34
34
api:: ObjectMeta ,
35
+ core:: { error_boundary, DeserializeGuard } ,
35
36
runtime:: { controller:: Action , reflector:: ObjectRef } ,
36
37
Resource , ResourceExt ,
37
38
} ,
@@ -241,6 +242,11 @@ pub enum Error {
241
242
242
243
#[ snafu( display( "invalid OPA configuration" ) ) ]
243
244
InvalidOpaConfig { source : security:: opa:: Error } ,
245
+
246
+ #[ snafu( display( "HdfsCluster object is invalid" ) ) ]
247
+ InvalidHdfsCluster {
248
+ source : error_boundary:: InvalidObject ,
249
+ } ,
244
250
}
245
251
246
252
impl ReconcilerError for Error {
@@ -256,23 +262,32 @@ pub struct Ctx {
256
262
pub product_config : ProductConfigManager ,
257
263
}
258
264
259
- pub async fn reconcile_hdfs ( hdfs : Arc < HdfsCluster > , ctx : Arc < Ctx > ) -> HdfsOperatorResult < Action > {
265
+ pub async fn reconcile_hdfs (
266
+ hdfs : Arc < DeserializeGuard < HdfsCluster > > ,
267
+ ctx : Arc < Ctx > ,
268
+ ) -> HdfsOperatorResult < Action > {
260
269
tracing:: info!( "Starting reconcile" ) ;
270
+
271
+ let hdfs = hdfs
272
+ . 0
273
+ . as_ref ( )
274
+ . map_err ( error_boundary:: InvalidObject :: clone)
275
+ . context ( InvalidHdfsClusterSnafu ) ?;
261
276
let client = & ctx. client ;
262
277
263
278
let resolved_product_image = hdfs
264
279
. spec
265
280
. image
266
281
. resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
267
282
268
- let vector_aggregator_address = resolve_vector_aggregator_address ( & hdfs, client)
283
+ let vector_aggregator_address = resolve_vector_aggregator_address ( hdfs, client)
269
284
. await
270
285
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
271
286
272
287
let validated_config = validate_all_roles_and_groups_config (
273
288
& resolved_product_image. product_version ,
274
289
& transform_all_roles_to_config (
275
- hdfs. as_ref ( ) ,
290
+ hdfs,
276
291
hdfs. build_role_properties ( )
277
292
. context ( BuildRolePropertiesSnafu ) ?,
278
293
)
@@ -302,7 +317,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
302
317
303
318
// The service account and rolebinding will be created per cluster
304
319
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
305
- hdfs. as_ref ( ) ,
320
+ hdfs,
306
321
APP_NAME ,
307
322
cluster_resources
308
323
. get_required_labels ( )
@@ -321,7 +336,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
321
336
322
337
let hdfs_opa_config = match & hdfs. spec . cluster_config . authorization {
323
338
Some ( opa_config) => Some (
324
- HdfsOpaConfig :: from_opa_config ( client, & hdfs, opa_config)
339
+ HdfsOpaConfig :: from_opa_config ( client, hdfs, opa_config)
325
340
. await
326
341
. context ( InvalidOpaConfigSnafu ) ?,
327
342
) ,
@@ -354,9 +369,9 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
354
369
continue ;
355
370
} ;
356
371
357
- if let Some ( content) = build_invalid_replica_message ( & hdfs, & role, dfs_replication) {
372
+ if let Some ( content) = build_invalid_replica_message ( hdfs, & role, dfs_replication) {
358
373
publish_event (
359
- & hdfs,
374
+ hdfs,
360
375
client,
361
376
"Reconcile" ,
362
377
"Invalid replicas" ,
@@ -368,7 +383,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
368
383
369
384
for ( rolegroup_name, rolegroup_config) in group_config. iter ( ) {
370
385
let merged_config = role
371
- . merged_config ( & hdfs, rolegroup_name)
386
+ . merged_config ( hdfs, rolegroup_name)
372
387
. context ( ConfigMergeSnafu ) ?;
373
388
374
389
let env_overrides = rolegroup_config. get ( & PropertyNameKind :: Env ) ;
@@ -379,25 +394,25 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
379
394
// to avoid the compiler error "E0716 (temporary value dropped while borrowed)".
380
395
let mut metadata = ObjectMetaBuilder :: new ( ) ;
381
396
let metadata = metadata
382
- . name_and_namespace ( hdfs. as_ref ( ) )
397
+ . name_and_namespace ( hdfs)
383
398
. name ( rolegroup_ref. object_name ( ) )
384
- . ownerreference_from_resource ( hdfs. as_ref ( ) , None , Some ( true ) )
399
+ . ownerreference_from_resource ( hdfs, None , Some ( true ) )
385
400
. with_context ( |_| ObjectMissingMetadataForOwnerRefSnafu {
386
- obj_ref : ObjectRef :: from_obj ( hdfs. as_ref ( ) ) ,
401
+ obj_ref : ObjectRef :: from_obj ( hdfs) ,
387
402
} ) ?
388
403
. with_recommended_labels ( build_recommended_labels (
389
- hdfs. as_ref ( ) ,
404
+ hdfs,
390
405
RESOURCE_MANAGER_HDFS_CONTROLLER ,
391
406
& resolved_product_image. app_version_label ,
392
407
& rolegroup_ref. role ,
393
408
& rolegroup_ref. role_group ,
394
409
) )
395
410
. context ( ObjectMetaSnafu ) ?;
396
411
397
- let rg_service = rolegroup_service ( & hdfs, metadata, & role, & rolegroup_ref) ?;
412
+ let rg_service = rolegroup_service ( hdfs, metadata, & role, & rolegroup_ref) ?;
398
413
399
414
let rg_configmap = rolegroup_config_map (
400
- & hdfs,
415
+ hdfs,
401
416
& client. kubernetes_cluster_info ,
402
417
metadata,
403
418
& rolegroup_ref,
@@ -410,7 +425,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
410
425
) ?;
411
426
412
427
let rg_statefulset = rolegroup_statefulset (
413
- & hdfs,
428
+ hdfs,
414
429
& client. kubernetes_cluster_info ,
415
430
metadata,
416
431
& role,
@@ -463,7 +478,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
463
478
pod_disruption_budget : pdb,
464
479
} ) = role_config
465
480
{
466
- add_pdbs ( pdb, & hdfs, & role, client, & mut cluster_resources)
481
+ add_pdbs ( pdb, hdfs, & role, client, & mut cluster_resources)
467
482
. await
468
483
. context ( FailedToCreatePdbSnafu ) ?;
469
484
}
@@ -472,7 +487,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
472
487
// Discovery CM will fail to build until the rest of the cluster has been deployed, so do it last
473
488
// so that failure won't inhibit the rest of the cluster from booting up.
474
489
let discovery_cm = build_discovery_configmap (
475
- & hdfs,
490
+ hdfs,
476
491
& client. kubernetes_cluster_info ,
477
492
HDFS_CONTROLLER ,
478
493
& hdfs
@@ -496,10 +511,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
496
511
ClusterOperationsConditionBuilder :: new ( & hdfs. spec . cluster_operation ) ;
497
512
498
513
let status = HdfsClusterStatus {
499
- conditions : compute_conditions (
500
- hdfs. as_ref ( ) ,
501
- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
502
- ) ,
514
+ conditions : compute_conditions ( hdfs, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
503
515
// FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized
504
516
deployed_product_version : Some (
505
517
hdfs. status
@@ -539,7 +551,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
539
551
. context ( DeleteOrphanedResourcesSnafu ) ?;
540
552
}
541
553
client
542
- . apply_patch_status ( OPERATOR_NAME , & * hdfs, & status)
554
+ . apply_patch_status ( OPERATOR_NAME , hdfs, & status)
543
555
. await
544
556
. context ( ApplyStatusSnafu ) ?;
545
557
@@ -893,8 +905,15 @@ fn rolegroup_statefulset(
893
905
} )
894
906
}
895
907
896
- pub fn error_policy ( _obj : Arc < HdfsCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
897
- Action :: requeue ( * Duration :: from_secs ( 5 ) )
908
+ pub fn error_policy (
909
+ _obj : Arc < DeserializeGuard < HdfsCluster > > ,
910
+ error : & Error ,
911
+ _ctx : Arc < Ctx > ,
912
+ ) -> Action {
913
+ match error {
914
+ Error :: InvalidHdfsCluster { .. } => Action :: await_change ( ) ,
915
+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
916
+ }
898
917
}
899
918
900
919
#[ cfg( test) ]
0 commit comments