Skip to content

feat: expectations #2840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;

public abstract class BaseControl<T extends BaseControl<T>> {
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation;
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationAdapter;
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationContext;

public abstract class BaseControl<T extends BaseControl<T, P>, P extends HasMetadata> {

private Long scheduleDelay = null;
private Expectation<P> expectation;

public T rescheduleAfter(long delay) {
rescheduleAfter(Duration.ofMillis(delay));
Expand All @@ -25,4 +32,16 @@ public T rescheduleAfter(long delay, TimeUnit timeUnit) {
public Optional<Long> getScheduleDelay() {
return Optional.ofNullable(scheduleDelay);
}

public void expect(Expectation<P> expectation) {
this.expectation = expectation;
}

public void expect(BiPredicate<P, ExpectationContext<P>> expectation, Duration timeout) {
this.expectation = new ExpectationAdapter<>(expectation, timeout);
}

public Optional<Expectation<P>> getExpectation() {
return Optional.ofNullable(expectation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;

public interface CacheAware<P extends HasMetadata> {
default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
return getSecondaryResource(expectedType, null);
}

<R> Set<R> getSecondaryResources(Class<R> expectedType);

default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
}

<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();

/**
* Retrieves the primary resource.
*
* @return the primary resource associated with the current reconciliation
*/
P getPrimaryResource();

/**
* Retrieves the primary resource cache.
*
* @return the {@link IndexerResourceCache} associated with the associated {@link Reconciler} for
* this context
*/
@SuppressWarnings("unused")
IndexedResourceCache<P> getPrimaryCache();

EventSourceRetriever<P> eventSourceRetriever();
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;

public interface Context<P extends HasMetadata> {
public interface Context<P extends HasMetadata> extends CacheAware<P> {

Optional<RetryInfo> getRetryInfo();

default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
return getSecondaryResource(expectedType, null);
}

<R> Set<R> getSecondaryResources(Class<R> expectedType);

default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
}

<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();

/**
* Retrieve the {@link ManagedWorkflowAndDependentResourceContext} used to interact with {@link
* io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource}s and associated {@link
Expand All @@ -39,8 +22,6 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
*/
ManagedWorkflowAndDependentResourceContext managedWorkflowAndDependentResourceContext();

EventSourceRetriever<P> eventSourceRetriever();

KubernetesClient getClient();

/** ExecutorService initialized by framework for workflows. Used for workflow standalone mode. */
Expand Down Expand Up @@ -72,4 +53,6 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
* @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise
*/
boolean isNextReconciliationImminent();

Optional<ExpectationResult<P>> expectationResult();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;

public class DefaultCacheAware<P extends HasMetadata> implements CacheAware<P> {

protected final Controller<P> controller;
protected final P primaryResource;

public DefaultCacheAware(Controller<P> controller, P primaryResource) {
this.controller = controller;
this.primaryResource = primaryResource;
}

@Override
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream);
}

@Override
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
try {
return controller
.getEventSourceManager()
.getEventSourceFor(expectedType, eventSourceName)
.getSecondaryResource(primaryResource);
} catch (NoEventSourceForClassException e) {
/*
* If a workflow has an activation condition there can be event sources which are only
* registered if the activation condition holds, but to provide a consistent API we return an
* Optional instead of throwing an exception.
*
* Note that not only the resource which has an activation condition might not be registered
* but dependents which depend on it.
*/
if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) {
return Optional.empty();
} else {
throw e;
}
}
}

@Override
public EventSourceRetriever<P> eventSourceRetriever() {
return controller.getEventSourceManager();
}

@Override
public ControllerConfiguration<P> getControllerConfiguration() {
return controller.getConfiguration();
}

@Override
public P getPrimaryResource() {
return primaryResource;
}

@Override
public IndexedResourceCache<P> getPrimaryCache() {
return controller.getEventSourceManager().getControllerEventSource();
}
}
Original file line number Diff line number Diff line change
@@ -1,95 +1,48 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext;
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {
public class DefaultContext<P extends HasMetadata> extends DefaultCacheAware<P>
implements Context<P> {

private RetryInfo retryInfo;
private final Controller<P> controller;
private final P primaryResource;
private final ControllerConfiguration<P> controllerConfiguration;

private final DefaultManagedWorkflowAndDependentResourceContext<P>
defaultManagedDependentResourceContext;

public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
private final ExpectationResult<P> expectationResult;

public DefaultContext(
RetryInfo retryInfo,
Controller<P> controller,
P primaryResource,
ExpectationResult<P> expectationResult) {
super(controller, primaryResource);
this.retryInfo = retryInfo;
this.controller = controller;
this.primaryResource = primaryResource;
this.controllerConfiguration = controller.getConfiguration();
this.defaultManagedDependentResourceContext =
new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
this.expectationResult = expectationResult;
}

@Override
public Optional<RetryInfo> getRetryInfo() {
return Optional.ofNullable(retryInfo);
}

@Override
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream);
}

@Override
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
try {
return controller
.getEventSourceManager()
.getEventSourceFor(expectedType, eventSourceName)
.getSecondaryResource(primaryResource);
} catch (NoEventSourceForClassException e) {
/*
* If a workflow has an activation condition there can be event sources which are only
* registered if the activation condition holds, but to provide a consistent API we return an
* Optional instead of throwing an exception.
*
* Note that not only the resource which has an activation condition might not be registered
* but dependents which depend on it.
*/
if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) {
return Optional.empty();
} else {
throw e;
}
}
}

@Override
public ControllerConfiguration<P> getControllerConfiguration() {
return controllerConfiguration;
}

@Override
public ManagedWorkflowAndDependentResourceContext managedWorkflowAndDependentResourceContext() {
return defaultManagedDependentResourceContext;
}

@Override
public EventSourceRetriever<P> eventSourceRetriever() {
return controller.getEventSourceManager();
}

@Override
public KubernetesClient getClient() {
return controller.getClient();
Expand All @@ -102,23 +55,18 @@ public ExecutorService getWorkflowExecutorService() {
return controller.getExecutorServiceManager().workflowExecutorService();
}

@Override
public P getPrimaryResource() {
return primaryResource;
}

@Override
public IndexedResourceCache<P> getPrimaryCache() {
return controller.getEventSourceManager().getControllerEventSource();
}

@Override
public boolean isNextReconciliationImminent() {
return controller
.getEventProcessor()
.isNextReconciliationImminent(ResourceID.fromResource(primaryResource));
}

@Override
public Optional<ExpectationResult<P>> expectationResult() {
return Optional.ofNullable(expectationResult);
}

public DefaultContext<P> setRetryInfo(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.javaoperatorsdk.operator.api.reconciler;

public class DeleteControl extends BaseControl<DeleteControl> {
import io.fabric8.kubernetes.api.model.HasMetadata;

public class DeleteControl<P extends HasMetadata> extends BaseControl<DeleteControl<P>, P> {

private final boolean removeFinalizer;

Expand All @@ -27,7 +29,7 @@ public static DeleteControl defaultDelete() {
*
* @return delete control that will not remove finalizer.
*/
public static DeleteControl noFinalizerRemoval() {
public static <T extends HasMetadata> DeleteControl<T> noFinalizerRemoval() {
return new DeleteControl(false);
}

Expand All @@ -36,7 +38,7 @@ public boolean isRemoveFinalizer() {
}

@Override
public DeleteControl rescheduleAfter(long delay) {
public DeleteControl<P> rescheduleAfter(long delay) {
if (removeFinalizer) {
throw new IllegalStateException("Cannot reschedule cleanup if removing finalizer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

public class ErrorStatusUpdateControl<P extends HasMetadata>
extends BaseControl<ErrorStatusUpdateControl<P>> {
extends BaseControl<ErrorStatusUpdateControl<P>, P> {

private final P resource;
private boolean noRetry = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;

public class UpdateControl<P extends HasMetadata> extends BaseControl<UpdateControl<P>> {
public class UpdateControl<P extends HasMetadata> extends BaseControl<UpdateControl<P>, P> {

private final P resource;
private final boolean patchResource;
Expand Down
Loading
Loading