@@ -30,7 +30,11 @@ use stackable_operator::{
30
30
apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
31
31
DeepMerge ,
32
32
} ,
33
- kube:: { runtime:: controller:: Action , Resource , ResourceExt } ,
33
+ kube:: {
34
+ core:: { error_boundary, DeserializeGuard } ,
35
+ runtime:: controller:: Action ,
36
+ Resource , ResourceExt ,
37
+ } ,
34
38
kvp:: { Labels , ObjectLabels } ,
35
39
logging:: controller:: ReconcilerError ,
36
40
memory:: { BinaryMultiple , MemoryQuantity } ,
@@ -238,6 +242,11 @@ pub enum Error {
238
242
AddVolumeMount {
239
243
source : builder:: pod:: container:: Error ,
240
244
} ,
245
+
246
+ #[ snafu( display( "HelloCluster object is invalid" ) ) ]
247
+ InvalidHelloCluster {
248
+ source : error_boundary:: InvalidObject ,
249
+ } ,
241
250
}
242
251
type Result < T , E = Error > = std:: result:: Result < T , E > ;
243
252
@@ -247,8 +256,18 @@ impl ReconcilerError for Error {
247
256
}
248
257
}
249
258
250
- pub async fn reconcile_hello ( hello : Arc < HelloCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
259
+ pub async fn reconcile_hello (
260
+ hello : Arc < DeserializeGuard < HelloCluster > > ,
261
+ ctx : Arc < Ctx > ,
262
+ ) -> Result < Action > {
251
263
tracing:: info!( "Starting reconcile" ) ;
264
+
265
+ let hello = hello
266
+ . 0
267
+ . as_ref ( )
268
+ . map_err ( error_boundary:: InvalidObject :: clone)
269
+ . context ( InvalidHelloClusterSnafu ) ?;
270
+
252
271
let client = & ctx. client ;
253
272
let resolved_product_image: ResolvedProductImage = hello
254
273
. spec
@@ -259,7 +278,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
259
278
let validated_config = validate_all_roles_and_groups_config (
260
279
& resolved_product_image. product_version ,
261
280
& transform_all_roles_to_config (
262
- hello. as_ref ( ) ,
281
+ hello,
263
282
[ (
264
283
HelloRole :: Server . to_string ( ) ,
265
284
(
@@ -296,7 +315,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
296
315
. context ( CreateClusterResourcesSnafu ) ?;
297
316
298
317
let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
299
- hello. as_ref ( ) ,
318
+ hello,
300
319
APP_NAME ,
301
320
cluster_resources
302
321
. get_required_labels ( )
@@ -313,15 +332,15 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
313
332
. await
314
333
. context ( ApplyRoleBindingSnafu ) ?;
315
334
316
- let server_role_service = build_server_role_service ( & hello, & resolved_product_image) ?;
335
+ let server_role_service = build_server_role_service ( hello, & resolved_product_image) ?;
317
336
318
337
// we have to get the assigned ports
319
338
cluster_resources
320
339
. add ( client, server_role_service)
321
340
. await
322
341
. context ( ApplyRoleServiceSnafu ) ?;
323
342
324
- let vector_aggregator_address = resolve_vector_aggregator_address ( & hello, client)
343
+ let vector_aggregator_address = resolve_vector_aggregator_address ( hello, client)
325
344
. await
326
345
. context ( ResolveVectorAggregatorAddressSnafu ) ?;
327
346
@@ -334,17 +353,17 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
334
353
. merged_config ( & HelloRole :: Server , & role_group_ref)
335
354
. context ( FailedToResolveResourceConfigSnafu ) ?;
336
355
337
- let rg_service = build_rolegroup_service ( & hello, & resolved_product_image, & role_group_ref) ?;
356
+ let rg_service = build_rolegroup_service ( hello, & resolved_product_image, & role_group_ref) ?;
338
357
let rg_configmap = build_server_rolegroup_config_map (
339
- & hello,
358
+ hello,
340
359
& resolved_product_image,
341
360
& role_group_ref,
342
361
rolegroup_config,
343
362
& config,
344
363
vector_aggregator_address. as_deref ( ) ,
345
364
) ?;
346
365
let rg_statefulset = build_server_rolegroup_statefulset (
347
- & hello,
366
+ hello,
348
367
& resolved_product_image,
349
368
& hello_role,
350
369
& role_group_ref,
@@ -382,7 +401,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
382
401
pod_disruption_budget : pdb,
383
402
} ) = role_config
384
403
{
385
- add_pdbs ( pdb, & hello, & hello_role, client, & mut cluster_resources)
404
+ add_pdbs ( pdb, hello, & hello_role, client, & mut cluster_resources)
386
405
. await
387
406
. context ( FailedToCreatePdbSnafu ) ?;
388
407
}
@@ -391,14 +410,11 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
391
410
ClusterOperationsConditionBuilder :: new ( & hello. spec . cluster_operation ) ;
392
411
393
412
let status = HelloClusterStatus {
394
- conditions : compute_conditions (
395
- hello. as_ref ( ) ,
396
- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
397
- ) ,
413
+ conditions : compute_conditions ( hello, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
398
414
} ;
399
415
400
416
client
401
- . apply_patch_status ( OPERATOR_NAME , & * hello, & status)
417
+ . apply_patch_status ( OPERATOR_NAME , hello, & status)
402
418
. await
403
419
. context ( ApplyStatusSnafu ) ?;
404
420
@@ -807,8 +823,15 @@ fn build_server_rolegroup_statefulset(
807
823
} )
808
824
}
809
825
810
- pub fn error_policy ( _obj : Arc < HelloCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
811
- Action :: requeue ( Duration :: from_secs ( 5 ) )
826
+ pub fn error_policy (
827
+ _obj : Arc < DeserializeGuard < HelloCluster > > ,
828
+ error : & Error ,
829
+ _ctx : Arc < Ctx > ,
830
+ ) -> Action {
831
+ match error {
832
+ Error :: InvalidHelloCluster { .. } => Action :: await_change ( ) ,
833
+ _ => Action :: requeue ( Duration :: from_secs ( 5 ) ) ,
834
+ }
812
835
}
813
836
814
837
fn service_ports ( ) -> Vec < ServicePort > {
0 commit comments