diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/BaseControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/BaseControl.java index a5cdb85257..565f2b4d9d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/BaseControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/BaseControl.java @@ -3,10 +3,17 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; -public abstract class BaseControl> { +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationAdapter; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationContext; + +public abstract class BaseControl, P extends HasMetadata> { private Long scheduleDelay = null; + private Expectation

expectation; public T rescheduleAfter(long delay) { rescheduleAfter(Duration.ofMillis(delay)); @@ -25,4 +32,16 @@ public T rescheduleAfter(long delay, TimeUnit timeUnit) { public Optional getScheduleDelay() { return Optional.ofNullable(scheduleDelay); } + + public void expect(Expectation

expectation) { + this.expectation = expectation; + } + + public void expect(BiPredicate> expectation, Duration timeout) { + this.expectation = new ExpectationAdapter<>(expectation, timeout); + } + + public Optional> getExpectation() { + return Optional.ofNullable(expectation); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheAware.java new file mode 100644 index 0000000000..b29732214f --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheAware.java @@ -0,0 +1,44 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; +import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; + +public interface CacheAware

{ + default Optional getSecondaryResource(Class expectedType) { + return getSecondaryResource(expectedType, null); + } + + Set getSecondaryResources(Class expectedType); + + default Stream getSecondaryResourcesAsStream(Class expectedType) { + return getSecondaryResources(expectedType).stream(); + } + + Optional getSecondaryResource(Class expectedType, String eventSourceName); + + ControllerConfiguration

getControllerConfiguration(); + + /** + * Retrieves the primary resource. + * + * @return the primary resource associated with the current reconciliation + */ + P getPrimaryResource(); + + /** + * Retrieves the primary resource cache. + * + * @return the {@link IndexerResourceCache} associated with the associated {@link Reconciler} for + * this context + */ + @SuppressWarnings("unused") + IndexedResourceCache

getPrimaryCache(); + + EventSourceRetriever

eventSourceRetriever(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java index f47deb9734..ddc30224a0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java @@ -1,35 +1,18 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext; -import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; -public interface Context

{ +public interface Context

extends CacheAware

{ Optional getRetryInfo(); - default Optional getSecondaryResource(Class expectedType) { - return getSecondaryResource(expectedType, null); - } - - Set getSecondaryResources(Class expectedType); - - default Stream getSecondaryResourcesAsStream(Class expectedType) { - return getSecondaryResources(expectedType).stream(); - } - - Optional getSecondaryResource(Class expectedType, String eventSourceName); - - ControllerConfiguration

getControllerConfiguration(); - /** * Retrieve the {@link ManagedWorkflowAndDependentResourceContext} used to interact with {@link * io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource}s and associated {@link @@ -39,8 +22,6 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) { */ ManagedWorkflowAndDependentResourceContext managedWorkflowAndDependentResourceContext(); - EventSourceRetriever

eventSourceRetriever(); - KubernetesClient getClient(); /** ExecutorService initialized by framework for workflows. Used for workflow standalone mode. */ @@ -72,4 +53,6 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) { * @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise */ boolean isNextReconciliationImminent(); + + Optional> expectationResult(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultCacheAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultCacheAware.java new file mode 100644 index 0000000000..aad14bd860 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultCacheAware.java @@ -0,0 +1,79 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.Controller; +import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; +import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException; + +public class DefaultCacheAware

implements CacheAware

{ + + protected final Controller

controller; + protected final P primaryResource; + + public DefaultCacheAware(Controller

controller, P primaryResource) { + this.controller = controller; + this.primaryResource = primaryResource; + } + + @Override + public Set getSecondaryResources(Class expectedType) { + return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet()); + } + + @Override + public Stream getSecondaryResourcesAsStream(Class expectedType) { + return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() + .map(es -> es.getSecondaryResources(primaryResource)) + .flatMap(Set::stream); + } + + @Override + public Optional getSecondaryResource(Class expectedType, String eventSourceName) { + try { + return controller + .getEventSourceManager() + .getEventSourceFor(expectedType, eventSourceName) + .getSecondaryResource(primaryResource); + } catch (NoEventSourceForClassException e) { + /* + * If a workflow has an activation condition there can be event sources which are only + * registered if the activation condition holds, but to provide a consistent API we return an + * Optional instead of throwing an exception. + * + * Note that not only the resource which has an activation condition might not be registered + * but dependents which depend on it. + */ + if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) { + return Optional.empty(); + } else { + throw e; + } + } + } + + @Override + public EventSourceRetriever

eventSourceRetriever() { + return controller.getEventSourceManager(); + } + + @Override + public ControllerConfiguration

getControllerConfiguration() { + return controller.getConfiguration(); + } + + @Override + public P getPrimaryResource() { + return primaryResource; + } + + @Override + public IndexedResourceCache

getPrimaryCache() { + return controller.getEventSourceManager().getControllerEventSource(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index 2acf8d13ca..190774767b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -1,37 +1,36 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult; import io.javaoperatorsdk.operator.processing.Controller; -import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; -import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException; import io.javaoperatorsdk.operator.processing.event.ResourceID; -public class DefaultContext

implements Context

{ +public class DefaultContext

extends DefaultCacheAware

+ implements Context

{ private RetryInfo retryInfo; - private final Controller

controller; - private final P primaryResource; - private final ControllerConfiguration

controllerConfiguration; + private final DefaultManagedWorkflowAndDependentResourceContext

defaultManagedDependentResourceContext; - public DefaultContext(RetryInfo retryInfo, Controller

controller, P primaryResource) { + private final ExpectationResult

expectationResult; + + public DefaultContext( + RetryInfo retryInfo, + Controller

controller, + P primaryResource, + ExpectationResult

expectationResult) { + super(controller, primaryResource); this.retryInfo = retryInfo; - this.controller = controller; - this.primaryResource = primaryResource; - this.controllerConfiguration = controller.getConfiguration(); this.defaultManagedDependentResourceContext = new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this); + this.expectationResult = expectationResult; } @Override @@ -39,57 +38,11 @@ public Optional getRetryInfo() { return Optional.ofNullable(retryInfo); } - @Override - public Set getSecondaryResources(Class expectedType) { - return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet()); - } - - @Override - public Stream getSecondaryResourcesAsStream(Class expectedType) { - return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() - .map(es -> es.getSecondaryResources(primaryResource)) - .flatMap(Set::stream); - } - - @Override - public Optional getSecondaryResource(Class expectedType, String eventSourceName) { - try { - return controller - .getEventSourceManager() - .getEventSourceFor(expectedType, eventSourceName) - .getSecondaryResource(primaryResource); - } catch (NoEventSourceForClassException e) { - /* - * If a workflow has an activation condition there can be event sources which are only - * registered if the activation condition holds, but to provide a consistent API we return an - * Optional instead of throwing an exception. - * - * Note that not only the resource which has an activation condition might not be registered - * but dependents which depend on it. - */ - if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) { - return Optional.empty(); - } else { - throw e; - } - } - } - - @Override - public ControllerConfiguration

getControllerConfiguration() { - return controllerConfiguration; - } - @Override public ManagedWorkflowAndDependentResourceContext managedWorkflowAndDependentResourceContext() { return defaultManagedDependentResourceContext; } - @Override - public EventSourceRetriever

eventSourceRetriever() { - return controller.getEventSourceManager(); - } - @Override public KubernetesClient getClient() { return controller.getClient(); @@ -102,16 +55,6 @@ public ExecutorService getWorkflowExecutorService() { return controller.getExecutorServiceManager().workflowExecutorService(); } - @Override - public P getPrimaryResource() { - return primaryResource; - } - - @Override - public IndexedResourceCache

getPrimaryCache() { - return controller.getEventSourceManager().getControllerEventSource(); - } - @Override public boolean isNextReconciliationImminent() { return controller @@ -119,6 +62,11 @@ public boolean isNextReconciliationImminent() { .isNextReconciliationImminent(ResourceID.fromResource(primaryResource)); } + @Override + public Optional> expectationResult() { + return Optional.ofNullable(expectationResult); + } + public DefaultContext

setRetryInfo(RetryInfo retryInfo) { this.retryInfo = retryInfo; return this; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DeleteControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DeleteControl.java index 7160e70830..a7f9104d6d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DeleteControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DeleteControl.java @@ -1,6 +1,8 @@ package io.javaoperatorsdk.operator.api.reconciler; -public class DeleteControl extends BaseControl { +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class DeleteControl

extends BaseControl, P> { private final boolean removeFinalizer; @@ -27,7 +29,7 @@ public static DeleteControl defaultDelete() { * * @return delete control that will not remove finalizer. */ - public static DeleteControl noFinalizerRemoval() { + public static DeleteControl noFinalizerRemoval() { return new DeleteControl(false); } @@ -36,7 +38,7 @@ public boolean isRemoveFinalizer() { } @Override - public DeleteControl rescheduleAfter(long delay) { + public DeleteControl

rescheduleAfter(long delay) { if (removeFinalizer) { throw new IllegalStateException("Cannot reschedule cleanup if removing finalizer"); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ErrorStatusUpdateControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ErrorStatusUpdateControl.java index e9073d613c..9b21d60883 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ErrorStatusUpdateControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ErrorStatusUpdateControl.java @@ -6,7 +6,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; public class ErrorStatusUpdateControl

- extends BaseControl> { + extends BaseControl, P> { private final P resource; private boolean noRetry = false; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/UpdateControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/UpdateControl.java index 1b5eefd7ff..f3ebeb76d9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/UpdateControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/UpdateControl.java @@ -5,7 +5,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.CustomResource; -public class UpdateControl

extends BaseControl> { +public class UpdateControl

extends BaseControl, P> { private final P resource; private final boolean patchResource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/AbstractExpectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/AbstractExpectation.java new file mode 100644 index 0000000000..4821df4236 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/AbstractExpectation.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import java.time.Duration; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public abstract class AbstractExpectation

implements Expectation

{ + + protected final Duration timeout; + + protected AbstractExpectation(Duration timeout) { + this.timeout = timeout; + } + + @Override + public abstract boolean isFulfilled(P primary, ExpectationContext

context); + + @Override + public Duration timeout() { + return timeout; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/DefaultExpectationContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/DefaultExpectationContext.java new file mode 100644 index 0000000000..7c08faea48 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/DefaultExpectationContext.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.DefaultCacheAware; +import io.javaoperatorsdk.operator.processing.Controller; + +public class DefaultExpectationContext

extends DefaultCacheAware

+ implements ExpectationContext

{ + public DefaultExpectationContext(Controller

controller, P primaryResource) { + super(controller, primaryResource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/Expectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/Expectation.java new file mode 100644 index 0000000000..47a97bd188 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/Expectation.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import java.time.Duration; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public interface Expectation

{ + + boolean isFulfilled(P primary, ExpectationContext

context); + + Duration timeout(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationAdapter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationAdapter.java new file mode 100644 index 0000000000..ca0707fc31 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationAdapter.java @@ -0,0 +1,21 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import java.time.Duration; +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class ExpectationAdapter

extends AbstractExpectation

{ + + private final BiPredicate> expectation; + + public ExpectationAdapter(BiPredicate> expectation, Duration timeout) { + super(timeout); + this.expectation = expectation; + } + + @Override + public boolean isFulfilled(P primary, ExpectationContext

context) { + return expectation.test(primary, context); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationContext.java new file mode 100644 index 0000000000..614a58dd41 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationContext.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.CacheAware; + +public interface ExpectationContext

extends CacheAware

{} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationResult.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationResult.java new file mode 100644 index 0000000000..02510d87bc --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationResult.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public record ExpectationResult

( + ExpectationStatus status, Expectation

expectation) {} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationStatus.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationStatus.java new file mode 100644 index 0000000000..5ced5093da --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/expectation/ExpectationStatus.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.api.reconciler.expectation; + +public enum ExpectationStatus { + TIMEOUT, + FULFILLED; +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index bdaf575814..8235044d7e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -17,6 +17,10 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; +import io.javaoperatorsdk.operator.api.reconciler.expectation.DefaultExpectationContext; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationContext; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationStatus; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; @@ -136,6 +140,20 @@ private void submitReconciliationExecution(ResourceState state) { Optional

maybeLatest = cache.get(resourceID); maybeLatest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && maybeLatest.isPresent()) { + ExpectationResult

expectationResult = null; + if (isExpectationPresent(state)) { + var expectationCheckResult = + shouldProceedWithExpectation(state, maybeLatest.orElseThrow()); + if (expectationCheckResult.isEmpty()) { + log.debug( + "Skipping processing since expectation is not fulfilled. ResourceID: {}", + resourceID); + return; + } else { + expectationResult = expectationCheckResult.orElseThrow(); + } + } + var rateLimit = state.getRateLimit(); if (rateLimit == null) { rateLimit = rateLimiter.initState(); @@ -148,7 +166,8 @@ private void submitReconciliationExecution(ResourceState state) { } state.setUnderProcessing(true); final var latest = maybeLatest.get(); - ExecutionScope

executionScope = new ExecutionScope<>(state.getRetry()); + ExecutionScope

executionScope = + new ExecutionScope<>(state.getRetry(), expectationResult); state.unMarkEventReceived(); metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata); log.debug("Executing events for custom resource. Scope: {}", executionScope); @@ -174,6 +193,26 @@ private void submitReconciliationExecution(ResourceState state) { } } + private boolean isExpectationPresent(ResourceState state) { + return state.getExpectationHolder().isPresent(); + } + + @SuppressWarnings("unchecked") + Optional> shouldProceedWithExpectation(ResourceState state, P primary) { + + var holder = state.getExpectationHolder().orElseThrow(); + if (holder.isTimedOut()) { + return Optional.of( + new ExpectationResult

(ExpectationStatus.TIMEOUT, holder.getExpectation())); + } + ExpectationContext

expectationContext = + new DefaultExpectationContext<>(this.eventSourceManager.getController(), primary); + return holder.getExpectation().isFulfilled(primary, expectationContext) + ? Optional.of( + new ExpectationResult

(ExpectationStatus.FULFILLED, holder.getExpectation())) + : Optional.empty(); + } + private void handleEventMarking(Event event, ResourceState state) { final var relatedCustomResourceID = event.getRelatedCustomResourceID(); if (event instanceof ResourceEvent resourceEvent) { @@ -257,6 +296,9 @@ synchronized void eventProcessingFinished( state.markProcessedMarkForDeletion(); metrics.cleanupDoneFor(resourceID, metricsMetadata); } else { + // TODO what should be the relation between re-schedule and expectation + // should we add a flag if trigger if expectation fails + setExpectation(state, postExecutionControl); if (state.eventPresent()) { submitReconciliationExecution(state); } else { @@ -265,6 +307,10 @@ synchronized void eventProcessingFinished( } } + private void setExpectation(ResourceState state, PostExecutionControl

postExecutionControl) { + postExecutionControl.getExpectation().ifPresent(state::setExpectation); + } + /** * In case retry is configured more complex error logging takes place, see handleRetryOnException */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java index 90899a6e1a..809e26952a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java @@ -2,15 +2,18 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; +import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult; class ExecutionScope { // the latest custom resource from cache private R resource; private final RetryInfo retryInfo; + private final ExpectationResult expectationResult; - ExecutionScope(RetryInfo retryInfo) { + ExecutionScope(RetryInfo retryInfo, ExpectationResult expectationResult) { this.retryInfo = retryInfo; + this.expectationResult = expectationResult; } public ExecutionScope setResource(R resource) { @@ -42,4 +45,8 @@ public String toString() { public RetryInfo getRetryInfo() { return retryInfo; } + + public ExpectationResult getExpectationResult() { + return expectationResult; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExpectationHolder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExpectationHolder.java new file mode 100644 index 0000000000..a50ab32615 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExpectationHolder.java @@ -0,0 +1,37 @@ +package io.javaoperatorsdk.operator.processing.event; + +import java.time.LocalDateTime; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation; + +public class ExpectationHolder

{ + + private LocalDateTime expectationCreationTime; + private Expectation

expectation; + + public ExpectationHolder(LocalDateTime expectationCreationTime, Expectation

expectation) { + this.expectationCreationTime = expectationCreationTime; + this.expectation = expectation; + } + + public LocalDateTime getExpectationCreationTime() { + return expectationCreationTime; + } + + public void setExpectationCreationTime(LocalDateTime expectationCreationTime) { + this.expectationCreationTime = expectationCreationTime; + } + + public Expectation getExpectation() { + return expectation; + } + + public void setExpectation(Expectation

expectation) { + this.expectation = expectation; + } + + public boolean isTimedOut() { + return expectationCreationTime.plus(expectation.timeout()).isBefore(LocalDateTime.now()); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/PostExecutionControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/PostExecutionControl.java index 42311c1cb5..3841a80960 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/PostExecutionControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/PostExecutionControl.java @@ -3,6 +3,7 @@ import java.util.Optional; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation; final class PostExecutionControl { @@ -10,6 +11,7 @@ final class PostExecutionControl { private final R updatedCustomResource; private final boolean updateIsStatusPatch; private final Exception runtimeException; + private Expectation expectation; private Long reScheduleDelay = null; @@ -66,6 +68,11 @@ public PostExecutionControl withReSchedule(long delay) { return this; } + public PostExecutionControl withExpectation(Expectation expectation) { + this.expectation = expectation; + return this; + } + public Optional getRuntimeException() { return Optional.ofNullable(runtimeException); } @@ -93,4 +100,8 @@ public String toString() { public boolean isFinalizerRemoved() { return finalizerRemoved; } + + public Optional> getExpectation() { + return Optional.ofNullable(expectation); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index c4b161ef27..c944cb5f3a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -90,7 +90,11 @@ private PostExecutionControl

handleDispatch(ExecutionScope

executionScope) } Context

context = - new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution); + new DefaultContext<>( + executionScope.getRetryInfo(), + controller, + resourceForExecution, + executionScope.getExpectationResult()); if (markedForDeletion) { return handleCleanup(resourceForExecution, originalResource, context); } else { @@ -246,11 +250,18 @@ private PostExecutionControl

createPostExecutionControl( postExecutionControl = PostExecutionControl.defaultDispatch(); } updatePostExecutionControlWithReschedule(postExecutionControl, updateControl); + updatePostExecutionControlWithExpectation(postExecutionControl, updateControl); + return postExecutionControl; } + private void updatePostExecutionControlWithExpectation( + PostExecutionControl

postExecutionControl, BaseControl baseControl) { + baseControl.getExpectation().ifPresent(postExecutionControl::withExpectation); + } + private void updatePostExecutionControlWithReschedule( - PostExecutionControl

postExecutionControl, BaseControl baseControl) { + PostExecutionControl

postExecutionControl, BaseControl baseControl) { baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule); } @@ -262,7 +273,7 @@ private PostExecutionControl

handleCleanup( ResourceID.fromResource(resourceForExecution), getVersion(resourceForExecution)); } - DeleteControl deleteControl = controller.cleanup(resourceForExecution, context); + DeleteControl

deleteControl = controller.cleanup(resourceForExecution, context); final var useFinalizer = controller.useFinalizer(); if (useFinalizer) { // note that we don't reschedule here even if instructed. Removing finalizer means that @@ -299,6 +310,7 @@ private PostExecutionControl

handleCleanup( useFinalizer); PostExecutionControl

postExecutionControl = PostExecutionControl.defaultDispatch(); updatePostExecutionControlWithReschedule(postExecutionControl, deleteControl); + updatePostExecutionControlWithExpectation(postExecutionControl, deleteControl); return postExecutionControl; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java index 5d4e74d681..bad045b261 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -1,5 +1,9 @@ package io.javaoperatorsdk.operator.processing.event; +import java.time.LocalDateTime; +import java.util.Optional; + +import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; @@ -29,6 +33,7 @@ private enum EventingState { private RetryExecution retry; private EventingState eventing; private RateLimitState rateLimit; + private ExpectationHolder expectationHolder; public ResourceState(ResourceID id) { this.id = id; @@ -75,6 +80,18 @@ public boolean processedMarkForDeletionPresent() { return eventing == EventingState.PROCESSED_MARK_FOR_DELETION; } + public void setExpectation(Expectation expectation) { + expectationHolder = new ExpectationHolder(LocalDateTime.now(), expectation); + } + + public void cleanExpectation() { + expectationHolder = null; + } + + public Optional getExpectationHolder() { + return Optional.ofNullable(expectationHolder); + } + public void markEventReceived() { if (deleteEventPresent()) { throw new IllegalStateException("Cannot receive event after a delete event received"); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java index b289d68b22..de3e1b4221 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java @@ -18,7 +18,8 @@ class DefaultContextTest { private final Secret primary = new Secret(); private final Controller mockController = mock(); - private final DefaultContext context = new DefaultContext<>(null, mockController, primary); + private final DefaultContext context = + new DefaultContext<>(null, mockController, primary, null); @Test @SuppressWarnings("unchecked") diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java index 82ecdb111a..8b57506e26 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java @@ -122,7 +122,7 @@ void callsCleanupOnWorkflowWhenHasCleanerAndReconcilerIsNotCleaner( new Controller( reconciler, configuration, MockKubernetesClient.client(Secret.class)); - controller.cleanup(new Secret(), new DefaultContext<>(null, controller, new Secret())); + controller.cleanup(new Secret(), new DefaultContext<>(null, controller, new Secret(), null)); verify(managedWorkflowMock, times(workflowCleanerExecuted ? 1 : 0)).cleanup(any(), any()); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index fe2e6e9514..0f45ad77f7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -129,7 +129,7 @@ void ifExecutionInProgressWaitsUntilItsFinished() { void schedulesAnEventRetryOnException() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null); + ExecutionScope executionScope = new ExecutionScope(null, null); executionScope.setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); @@ -271,7 +271,7 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() { var cr = testCustomResource(crID); eventProcessor.eventProcessingFinished( - new ExecutionScope(null).setResource(cr), PostExecutionControl.defaultDispatch()); + new ExecutionScope(null, null).setResource(cr), PostExecutionControl.defaultDispatch()); verify(retryTimerEventSourceMock, times(1)).cancelOnceSchedule(eq(crID)); } @@ -300,7 +300,7 @@ void startProcessedMarkedEventReceivedBefore() { @Test void notUpdatesEventSourceHandlerIfResourceUpdated() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceStatusPatched(customResource); @@ -313,7 +313,7 @@ void notUpdatesEventSourceHandlerIfResourceUpdated() { void notReschedulesAfterTheFinalizerRemoveProcessed() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); @@ -326,7 +326,7 @@ void notReschedulesAfterTheFinalizerRemoveProcessed() { void skipEventProcessingIfFinalizerRemoveProcessed() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); @@ -343,7 +343,7 @@ void skipEventProcessingIfFinalizerRemoveProcessed() { void newResourceAfterMissedDeleteEvent() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); var newResource = testCustomResource(); @@ -379,7 +379,7 @@ void rateLimitsReconciliationSubmission() { @Test void schedulesRetryForMarReconciliationInterval() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = new ExecutionScope(null, null).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.defaultDispatch(); eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); @@ -401,7 +401,8 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { eventSourceManagerMock, metricsMock)); eventProcessorWithRetry.start(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(testCustomResource()); + ExecutionScope executionScope = + new ExecutionScope(null, null).setResource(testCustomResource()); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException()); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 89f3655356..5e4550474b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -404,7 +404,8 @@ public int getAttemptCount() { public boolean isLastAttempt() { return true; } - }) + }, + null) .setResource(testCustomResource)); ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Context.class); @@ -505,7 +506,8 @@ public int getAttemptCount() { public boolean isLastAttempt() { return true; } - }) + }, + null) .setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); @@ -528,7 +530,7 @@ void callErrorStatusHandlerEvenOnFirstError() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); assertThat(postExecControl.exceptionDuringExecution()).isTrue(); @@ -549,7 +551,7 @@ void errorHandlerCanInstructNoRetryWithUpdate() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); @@ -571,7 +573,7 @@ void errorHandlerCanInstructNoRetryNoUpdate() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); verify(customResourceFacade, times(0)).patchStatus(eq(testCustomResource), any()); @@ -588,7 +590,7 @@ void errorStatusHandlerCanPatchResource() { reconciler.errorHandler = () -> ErrorStatusUpdateControl.patchStatus(testCustomResource); reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); @@ -611,7 +613,7 @@ void ifRetryLimitedToZeroMaxAttemptsErrorHandlerGetsCorrectLastAttempt() { reconciler.errorHandler = () -> ErrorStatusUpdateControl.noStatusUpdate(); reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); verify(reconciler, times(1)) .updateErrorStatus( @@ -675,7 +677,7 @@ void reSchedulesFromErrorHandler() { var res = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null).setResource(testCustomResource)); assertThat(res.getReScheduleDelay()).contains(delay); assertThat(res.getRuntimeException()).isEmpty(); @@ -723,7 +725,7 @@ private void removeFinalizers(CustomResource customResource) { } public ExecutionScope executionScopeWithCREvent(T resource) { - return (ExecutionScope) new ExecutionScope<>(null).setResource(resource); + return (ExecutionScope) new ExecutionScope<>(null, null).setResource(resource); } private class TestReconciler