@@ -302,14 +302,13 @@ private Future<KafkaRebalance> updateStatus(Reconciliation reconciliation,
302
302
if (desiredStatus .getConditions () != null ) {
303
303
previous = desiredStatus .getConditions ().stream ().filter (condition -> condition != cond ).collect (Collectors .toList ());
304
304
}
305
- String rebalanceType = rebalanceStateConditionType (desiredStatus );
306
305
307
306
// If a throwable is supplied, it is set in the status with priority
308
307
if (e != null ) {
309
308
StatusUtils .setStatusConditionAndObservedGeneration (kafkaRebalance , desiredStatus , KafkaRebalanceState .NotReady .toString (), e );
310
309
desiredStatus .setConditions (Stream .concat (desiredStatus .getConditions ().stream (), previous .stream ()).collect (Collectors .toList ()));
311
- } else if (rebalanceType != null ) {
312
- StatusUtils .setStatusConditionAndObservedGeneration (kafkaRebalance , desiredStatus , rebalanceType );
310
+ } else if (cond != null ) {
311
+ StatusUtils .setStatusConditionAndObservedGeneration (kafkaRebalance , desiredStatus , cond );
313
312
desiredStatus .setConditions (Stream .concat (desiredStatus .getConditions ().stream (), previous .stream ()).collect (Collectors .toList ()));
314
313
} else {
315
314
throw new IllegalArgumentException ("Status related exception and the Status condition's type cannot both be null" );
@@ -416,67 +415,72 @@ private Future<Void> reconcile(Reconciliation reconciliation, String host,
416
415
return updateStatus (reconciliation , kafkaRebalance , new KafkaRebalanceStatus (), new InvalidResourceException (error )).mapEmpty ();
417
416
}
418
417
419
- AbstractRebalanceOptions .AbstractRebalanceOptionsBuilder <?, ?> rebalanceOptionsBuilder = convertRebalanceSpecToRebalanceOptions (kafkaRebalance .getSpec ());
420
-
421
- return computeNextStatus (reconciliation , host , apiClient , kafkaRebalance , currentState , rebalanceAnnotation , rebalanceOptionsBuilder )
422
- .compose (desiredStatusAndMap -> {
423
- // More events related to resource modification might be queued with a stale state. (potentially updated by the rebalance holding the lock)
424
- // Due to possible long rebalancing operations that take the lock for the entire period,
425
- // do a new get to retrieve the current resource state.
426
- return kafkaRebalanceOperator .getAsync (reconciliation .namespace (), reconciliation .name ())
427
- .compose (currentKafkaRebalance -> {
428
- if (currentKafkaRebalance != null ) {
429
- return configMapOperator .reconcile (reconciliation , kafkaRebalance .getMetadata ().getNamespace (),
430
- kafkaRebalance .getMetadata ().getName (), desiredStatusAndMap .getLoadMap ())
431
- .compose (i -> updateStatus (reconciliation , currentKafkaRebalance , desiredStatusAndMap .getStatus (), null ))
432
- .compose (updatedKafkaRebalance -> {
433
- String message = "State updated to [{}] " ;
434
- if (rawRebalanceAnnotation (updatedKafkaRebalance ) == null ) {
435
- LOGGER .infoCr (reconciliation , message + "and annotation {} is not set " ,
436
- rebalanceStateConditionType (updatedKafkaRebalance .getStatus ()),
437
- ANNO_STRIMZI_IO_REBALANCE );
438
- } else {
439
- LOGGER .infoCr (reconciliation , message + "with annotation {}={} " ,
440
- rebalanceStateConditionType (updatedKafkaRebalance .getStatus ()),
441
- ANNO_STRIMZI_IO_REBALANCE ,
442
- rawRebalanceAnnotation (updatedKafkaRebalance )
443
- );
444
- }
445
- if (hasRebalanceAnnotation (updatedKafkaRebalance )) {
446
- if (currentState != KafkaRebalanceState .ReconciliationPaused && rebalanceAnnotation != KafkaRebalanceAnnotation .none && !currentState .isValidateAnnotation (rebalanceAnnotation )) {
447
- return Future .succeededFuture ();
418
+ try {
419
+ AbstractRebalanceOptions .AbstractRebalanceOptionsBuilder <?, ?> rebalanceOptionsBuilder = convertRebalanceSpecToRebalanceOptions (kafkaRebalance .getSpec ());
420
+
421
+ return computeNextStatus (reconciliation , host , apiClient , kafkaRebalance , currentState , rebalanceAnnotation , rebalanceOptionsBuilder )
422
+ .compose (desiredStatusAndMap -> {
423
+ // More events related to resource modification might be queued with a stale state. (potentially updated by the rebalance holding the lock)
424
+ // Due to possible long rebalancing operations that take the lock for the entire period,
425
+ // do a new get to retrieve the current resource state.
426
+ return kafkaRebalanceOperator .getAsync (reconciliation .namespace (), reconciliation .name ())
427
+ .compose (currentKafkaRebalance -> {
428
+ if (currentKafkaRebalance != null ) {
429
+ return configMapOperator .reconcile (reconciliation , kafkaRebalance .getMetadata ().getNamespace (),
430
+ kafkaRebalance .getMetadata ().getName (), desiredStatusAndMap .getLoadMap ())
431
+ .compose (i -> updateStatus (reconciliation , currentKafkaRebalance , desiredStatusAndMap .getStatus (), null ))
432
+ .compose (updatedKafkaRebalance -> {
433
+ String message = "State updated to [{}] " ;
434
+ if (rawRebalanceAnnotation (updatedKafkaRebalance ) == null ) {
435
+ LOGGER .infoCr (reconciliation , message + "and annotation {} is not set " ,
436
+ rebalanceStateConditionType (updatedKafkaRebalance .getStatus ()),
437
+ ANNO_STRIMZI_IO_REBALANCE );
448
438
} else {
449
- LOGGER .infoCr (reconciliation , "Removing annotation {}={}" ,
439
+ LOGGER .infoCr (reconciliation , message + "with annotation {}={} " ,
440
+ rebalanceStateConditionType (updatedKafkaRebalance .getStatus ()),
450
441
ANNO_STRIMZI_IO_REBALANCE ,
451
- rawRebalanceAnnotation (updatedKafkaRebalance ));
452
- // Updated KafkaRebalance has rebalance annotation removed as
453
- // action specified by user has been completed.
454
- KafkaRebalance patchedKafkaRebalance = new KafkaRebalanceBuilder (updatedKafkaRebalance )
455
- .editMetadata ()
442
+ rawRebalanceAnnotation (updatedKafkaRebalance )
443
+ );
444
+ }
445
+ if (hasRebalanceAnnotation (updatedKafkaRebalance )) {
446
+ if (currentState != KafkaRebalanceState .ReconciliationPaused && rebalanceAnnotation != KafkaRebalanceAnnotation .none && !currentState .isValidateAnnotation (rebalanceAnnotation )) {
447
+ return Future .succeededFuture ();
448
+ } else {
449
+ LOGGER .infoCr (reconciliation , "Removing annotation {}={}" ,
450
+ ANNO_STRIMZI_IO_REBALANCE ,
451
+ rawRebalanceAnnotation (updatedKafkaRebalance ));
452
+ // Updated KafkaRebalance has rebalance annotation removed as
453
+ // action specified by user has been completed.
454
+ KafkaRebalance patchedKafkaRebalance = new KafkaRebalanceBuilder (updatedKafkaRebalance )
455
+ .editMetadata ()
456
456
.removeFromAnnotations (ANNO_STRIMZI_IO_REBALANCE )
457
- .endMetadata ()
458
- .build ();
459
- return kafkaRebalanceOperator .patchAsync (reconciliation , patchedKafkaRebalance );
457
+ .endMetadata ()
458
+ .build ();
459
+ return kafkaRebalanceOperator .patchAsync (reconciliation , patchedKafkaRebalance );
460
+ }
461
+ } else {
462
+ LOGGER .debugCr (reconciliation , "No annotation {}" , ANNO_STRIMZI_IO_REBALANCE );
463
+ return Future .succeededFuture ();
460
464
}
461
- } else {
462
- LOGGER .debugCr (reconciliation , "No annotation {}" , ANNO_STRIMZI_IO_REBALANCE );
463
- return Future .succeededFuture ();
464
- }
465
- })
466
- .mapEmpty ();
467
- } else {
468
- return Future .succeededFuture ();
469
- }
470
- }, exception -> {
465
+ })
466
+ .mapEmpty ();
467
+ } else {
468
+ return Future .succeededFuture ();
469
+ }
470
+ }, exception -> {
471
471
LOGGER .errorCr (reconciliation , "Status updated to [NotReady] due to error: {}" , exception .getMessage ());
472
472
return updateStatus (reconciliation , kafkaRebalance , new KafkaRebalanceStatus (), exception )
473
473
.mapEmpty ();
474
- }); },
475
- exception -> {
476
- LOGGER .errorCr (reconciliation , "Status updated to [NotReady] due to error: {}" , exception .getMessage ());
477
- return updateStatus (reconciliation , kafkaRebalance , new KafkaRebalanceStatus (), exception )
478
- .mapEmpty ();
479
- });
474
+ });
475
+ }, exception -> {
476
+ LOGGER .errorCr (reconciliation , "Status updated to [NotReady] due to error: {}" , exception .getMessage ());
477
+ return updateStatus (reconciliation , kafkaRebalance , new KafkaRebalanceStatus (), exception )
478
+ .mapEmpty ();
479
+ });
480
+ } catch (IllegalArgumentException e ) {
481
+ LOGGER .errorCr (reconciliation , "Status updated to [NotReady] due to error: {}" , e .getMessage ());
482
+ return updateStatus (reconciliation , kafkaRebalance , new KafkaRebalanceStatus (), e ).mapEmpty ();
483
+ }
480
484
}
481
485
482
486
/**
@@ -776,10 +780,10 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onNotReady(Reconci
776
780
// CC is activated from the disabled state or the user has fixed the error on the resource and want to 'refresh'
777
781
return onNew (reconciliation , host , apiClient , kafkaRebalance , rebalanceOptionsBuilder );
778
782
} else {
779
- // Stay in the current NotReady state, returning null as next state
783
+ // Stay in the current NotReady state
780
784
Set <Condition > conditions = validate (reconciliation , kafkaRebalance );
781
785
validateAnnotation (conditions , KafkaRebalanceState .NotReady , rebalanceAnnotation );
782
- return Future .succeededFuture (buildRebalanceStatus (null , KafkaRebalanceState . NotReady , conditions ));
786
+ return Future .succeededFuture (new MapAndStatus <> (null , buildRebalanceStatusFromPreviousStatus ( kafkaRebalance . getStatus () , conditions ) ));
783
787
}
784
788
}
785
789
@@ -1261,10 +1265,6 @@ private MapAndStatus<ConfigMap, KafkaRebalanceStatus> handleRebalanceResponse(Re
1261
1265
// If rebalance proposal is still being processed, we need to re-request the proposal at a later time
1262
1266
// with the corresponding session-id so we move to the PendingProposal State.
1263
1267
return buildRebalanceStatus (response .getUserTaskId (), KafkaRebalanceState .PendingProposal , validate (reconciliation , kafkaRebalance ));
1264
- } else if (response .isBrokersNotExist ()) {
1265
- // If there are some specified brokers which don't exist, it's an error at the Cruise Control level
1266
- // Need to move to NotReady state, having the user fixing the brokers list and refresh
1267
- return buildRebalanceStatus (null , KafkaRebalanceState .NotReady , validate (reconciliation , kafkaRebalance ));
1268
1268
}
1269
1269
} else {
1270
1270
if (response .isNotEnoughDataForProposal ()) {
@@ -1276,10 +1276,6 @@ private MapAndStatus<ConfigMap, KafkaRebalanceStatus> handleRebalanceResponse(Re
1276
1276
// soon as it is ready, so set the state to rebalancing.
1277
1277
// In the onRebalancing method the optimization proposal will be added when it is ready.
1278
1278
return buildRebalanceStatus (response .getUserTaskId (), KafkaRebalanceState .Rebalancing , validate (reconciliation , kafkaRebalance ));
1279
- } else if (response .isBrokersNotExist ()) {
1280
- // If there are some specified brokers which don't exist, it's an error at the Cruise Control level
1281
- // Need to move to NotReady state, having the user fixing the brokers list and refresh
1282
- return buildRebalanceStatus (null , KafkaRebalanceState .NotReady , validate (reconciliation , kafkaRebalance ));
1283
1279
}
1284
1280
}
1285
1281
0 commit comments