@@ -44,6 +44,7 @@ use stackable_operator::{
44
44
DeepMerge ,
45
45
} ,
46
46
kube:: {
47
+ core:: { error_boundary, DeserializeGuard } ,
47
48
runtime:: { controller:: Action , reflector:: ObjectRef } ,
48
49
Resource , ResourceExt ,
49
50
} ,
@@ -331,6 +332,11 @@ pub enum Error {
331
332
AddVolumeMount {
332
333
source : builder:: pod:: container:: Error ,
333
334
} ,
335
+
336
+ #[ snafu( display( "invalid TrinoCluster object" ) ) ]
337
+ InvalidTrinoCluster {
338
+ source : error_boundary:: InvalidObject ,
339
+ } ,
334
340
}
335
341
336
342
type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -341,9 +347,17 @@ impl ReconcilerError for Error {
341
347
}
342
348
}
343
349
344
- pub async fn reconcile_trino ( trino : Arc < TrinoCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
350
+ pub async fn reconcile_trino (
351
+ trino : Arc < DeserializeGuard < TrinoCluster > > ,
352
+ ctx : Arc < Ctx > ,
353
+ ) -> Result < Action > {
345
354
tracing:: info!( "Starting reconcile" ) ;
346
355
356
+ let trino = trino
357
+ . 0
358
+ . as_ref ( )
359
+ . map_err ( error_boundary:: InvalidObject :: clone)
360
+ . context ( InvalidTrinoClusterSnafu ) ?;
347
361
let client = & ctx. client ;
348
362
349
363
let resolved_product_image: ResolvedProductImage = trino
@@ -387,7 +401,7 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
387
401
}
388
402
389
403
let validated_config = validated_product_config (
390
- & trino,
404
+ trino,
391
405
// The Trino version is a single number like 396.
392
406
// The product config expects semver formatted version strings.
393
407
// That is why we just add minor and patch version 0 here.
@@ -405,7 +419,7 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
405
419
. context ( CreateClusterResourcesSnafu ) ?;
406
420
407
421
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
408
- trino. as_ref ( ) ,
422
+ trino,
409
423
APP_NAME ,
410
424
cluster_resources
411
425
. get_required_labels ( )
@@ -425,23 +439,23 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
425
439
426
440
let trino_opa_config = match trino. get_opa_config ( ) {
427
441
Some ( opa_config) => Some (
428
- TrinoOpaConfig :: from_opa_config ( client, & trino, opa_config)
442
+ TrinoOpaConfig :: from_opa_config ( client, trino, opa_config)
429
443
. await
430
444
. context ( InvalidOpaConfigSnafu ) ?,
431
445
) ,
432
446
None => None ,
433
447
} ;
434
448
435
- let coordinator_role_service = build_coordinator_role_service ( & trino, & resolved_product_image) ?;
449
+ let coordinator_role_service = build_coordinator_role_service ( trino, & resolved_product_image) ?;
436
450
437
451
cluster_resources
438
452
. add ( client, coordinator_role_service)
439
453
. await
440
454
. context ( ApplyRoleServiceSnafu ) ?;
441
455
442
- create_shared_internal_secret ( & trino, client) . await ?;
456
+ create_shared_internal_secret ( trino, client) . await ?;
443
457
444
- let vector_aggregator_address = resolve_vector_aggregator_address ( & trino, client)
458
+ let vector_aggregator_address = resolve_vector_aggregator_address ( trino, client)
445
459
. await
446
460
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
447
461
@@ -450,15 +464,15 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
450
464
for ( role, role_config) in validated_config {
451
465
let trino_role = TrinoRole :: from_str ( & role) . context ( FailedToParseRoleSnafu ) ?;
452
466
for ( role_group, config) in role_config {
453
- let rolegroup = trino_role. rolegroup_ref ( & trino, role_group) ;
467
+ let rolegroup = trino_role. rolegroup_ref ( trino, role_group) ;
454
468
455
469
let merged_config = trino
456
470
. merged_config ( & trino_role, & rolegroup, & catalog_definitions)
457
471
. context ( FailedToResolveConfigSnafu ) ?;
458
472
459
- let rg_service = build_rolegroup_service ( & trino, & resolved_product_image, & rolegroup) ?;
473
+ let rg_service = build_rolegroup_service ( trino, & resolved_product_image, & rolegroup) ?;
460
474
let rg_configmap = build_rolegroup_config_map (
461
- & trino,
475
+ trino,
462
476
& resolved_product_image,
463
477
& trino_role,
464
478
& rolegroup,
@@ -470,13 +484,13 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
470
484
& client. kubernetes_cluster_info ,
471
485
) ?;
472
486
let rg_catalog_configmap = build_rolegroup_catalog_config_map (
473
- & trino,
487
+ trino,
474
488
& resolved_product_image,
475
489
& rolegroup,
476
490
& catalogs,
477
491
) ?;
478
492
let rg_stateful_set = build_rolegroup_statefulset (
479
- & trino,
493
+ trino,
480
494
& trino_role,
481
495
& resolved_product_image,
482
496
& rolegroup,
@@ -523,7 +537,7 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
523
537
pod_disruption_budget : pdb,
524
538
} ) = role_config
525
539
{
526
- add_pdbs ( pdb, & trino, & trino_role, client, & mut cluster_resources)
540
+ add_pdbs ( pdb, trino, & trino_role, client, & mut cluster_resources)
527
541
. await
528
542
. context ( FailedToCreatePdbSnafu ) ?;
529
543
}
@@ -534,7 +548,7 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
534
548
535
549
let status = TrinoClusterStatus {
536
550
conditions : compute_conditions (
537
- trino. as_ref ( ) ,
551
+ trino,
538
552
& [ & sts_cond_builder, & cluster_operation_cond_builder] ,
539
553
) ,
540
554
} ;
@@ -544,7 +558,7 @@ pub async fn reconcile_trino(trino: Arc<TrinoCluster>, ctx: Arc<Ctx>) -> Result<
544
558
. await
545
559
. context ( DeleteOrphanedResourcesSnafu ) ?;
546
560
client
547
- . apply_patch_status ( OPERATOR_NAME , & * trino, & status)
561
+ . apply_patch_status ( OPERATOR_NAME , trino, & status)
548
562
. await
549
563
. context ( ApplyStatusSnafu ) ?;
550
564
@@ -1221,8 +1235,15 @@ fn build_rolegroup_service(
1221
1235
} )
1222
1236
}
1223
1237
1224
- pub fn error_policy ( _obj : Arc < TrinoCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1225
- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1238
+ pub fn error_policy (
1239
+ _obj : Arc < DeserializeGuard < TrinoCluster > > ,
1240
+ error : & Error ,
1241
+ _ctx : Arc < Ctx > ,
1242
+ ) -> Action {
1243
+ match error {
1244
+ Error :: InvalidTrinoCluster { .. } => Action :: await_change ( ) ,
1245
+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1246
+ }
1226
1247
}
1227
1248
1228
1249
/// Give a secret name and an optional key in the secret to use.
0 commit comments