diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java similarity index 91% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java rename to flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java index adb37e7c27..5f5392d581 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.config; +package org.apache.flink.kubernetes.operator.api; -import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; /** The mode of {@link FlinkDeployment}. */ @@ -45,7 +44,7 @@ public static Mode getMode(FlinkDeployment flinkApp) { : getMode(lastReconciledSpec); } - private static Mode getMode(FlinkDeploymentSpec spec) { + public static Mode getMode(FlinkDeploymentSpec spec) { return spec.getJob() != null ? APPLICATION : SESSION; } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java index 136d3415f3..fd2780f37d 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java @@ -21,6 +21,7 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Condition; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -28,7 +29,9 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Last observed status of the Flink deployment. */ @@ -55,4 +58,7 @@ public class FlinkDeploymentStatus extends CommonStatus { /** Information about the TaskManagers for the scale subresource. */ private TaskManagerInfo taskManager; + + /** Condition of the CR . */ + private List conditions = new ArrayList<>(); } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java index 54a0181bc0..faecf29f85 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java @@ -21,18 +21,36 @@ public enum JobManagerDeploymentStatus { /** JobManager is running and ready to receive REST API calls. */ - READY, + READY("JobManagerReady", "JobManager is running and ready to receive REST API calls"), /** JobManager is running but not ready yet to receive REST API calls. */ - DEPLOYED_NOT_READY, + DEPLOYED_NOT_READY( + "DeployedNotReady", + "JobManager is running but not yet ready to receive REST API calls"), /** JobManager process is starting up. */ - DEPLOYING, + DEPLOYING("JobManagerIsDeploying", "JobManager process is starting up"), /** JobManager deployment not found, probably not started or killed by user. */ // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup - MISSING, + MISSING("JobManagerDeploymentMissing", "JobManager deployment not found"), /** Deployment in terminal error, requires spec change for reconciliation to continue. */ - ERROR; + ERROR("Error", "JobManager deployment failed"); + + private String reason; + private String message; + + JobManagerDeploymentStatus(String reason, String message) { + this.reason = reason; + this.message = message; + } + + public String getReason() { + return reason; + } + + public String getMessage() { + return message; + } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java new file mode 100644 index 0000000000..6e1098a40d --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionUtils.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.utils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.Mode; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; + +import io.fabric8.kubernetes.api.model.Condition; +import io.fabric8.kubernetes.api.model.ConditionBuilder; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus.READY; + +/** Creates a condition object with the type, status, message and reason. */ +public class ConditionUtils { + public static final String CONDITION_TYPE_RUNNING = "Running"; + + /** + * Creates a List of Condition object based on the provided FlinkDeploymentStatus. + * + * @param flinkDeploymentStatus the FlinkDeploymentStatus object containing job status + * information + * @return a list of Condition object representing the current status of the Flink deployment + */ + public static List createConditionFromStatus( + FlinkDeploymentStatus flinkDeploymentStatus) { + + FlinkDeploymentReconciliationStatus reconciliationStatus = + flinkDeploymentStatus.getReconciliationStatus(); + Condition conditionToAdd = null; + + if (reconciliationStatus != null) { + FlinkDeploymentSpec deploymentSpec = + reconciliationStatus.deserializeLastReconciledSpec(); + + if (deploymentSpec != null) { + switch (Mode.getMode(deploymentSpec)) { + case APPLICATION: + conditionToAdd = + getApplicationModeCondition( + flinkDeploymentStatus.getJobStatus().getState()); + break; + case SESSION: + conditionToAdd = + getSessionModeCondition( + flinkDeploymentStatus.getJobManagerDeploymentStatus()); + } + updateLastTransitionTime(flinkDeploymentStatus.getConditions(), conditionToAdd); + } + } + return conditionToAdd == null ? List.of() : List.of(conditionToAdd); + } + + private static void updateLastTransitionTime(List conditions, Condition condition) { + if (condition == null) { + return; + } + Condition existingCondition = conditions.isEmpty() ? null : conditions.get(0); + condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition, condition)); + } + + private static Condition getApplicationModeCondition(JobStatus jobStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jobStatus == RUNNING ? "True" : "False") + .withReason(toCamelCase(jobStatus.name())) + .withMessage("Job state " + jobStatus.name()) + .build(); + } + + private static Condition getSessionModeCondition(JobManagerDeploymentStatus jmStatus) { + return new ConditionBuilder() + .withType(CONDITION_TYPE_RUNNING) + .withStatus(jmStatus == READY ? "True" : "False") + .withReason(jmStatus.getReason()) + .withMessage(jmStatus.getMessage()) + .build(); + } + + /** + * Reason in the condition object should be a CamelCase string, so need to convert JobStatus as + * all the keywords are one noun, so we only need to upper case the first letter. + * + * @return CamelCase reason as String + */ + private static String toCamelCase(String reason) { + reason = reason.toLowerCase(); + return reason.substring(0, 1).toUpperCase() + reason.substring(1); + } + + private static boolean isLastTransitionTimeStampUpdateRequired( + Condition existingCondition, Condition newCondition) { + return existingCondition == null + || !existingCondition.getStatus().equals(newCondition.getStatus()); + } + + /** + * get the last transition time for the condition , returns the current time if there is no + * existing condition or if the condition status has changed, otherwise returns existing + * condition LastTransitionTime. + * + * @param existingCondition The current condition object, may be null. + * @param condition The new condition object to compare against the existing one. + * @return A string representing the last transition time in the format + * "yyyy-MM-dd'T'HH:mm:ss'Z'". Returns a new timestamp if the existing condition is null or + * the status has changed, otherwise returns the last transition time of the existing + * condition. + */ + private static String getLastTransitionTimeStamp( + Condition existingCondition, Condition condition) { + String lastTransitionTime; + if (existingCondition == null + || !existingCondition.getStatus().equals(condition.getStatus())) { + lastTransitionTime = + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date()); + } else { + lastTransitionTime = existingCondition.getLastTransitionTime(); + } + return lastTransitionTime; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 32a3109d12..2fa6cae1ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; @@ -165,6 +166,8 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex throw new ReconciliationException(e); } + flinkApp.getStatus() + .setConditions(ConditionUtils.createConditionFromStatus(flinkApp.getStatus())); LOG.debug("End of reconciliation"); statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient()); return ReconciliationUtils.toUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java index c5e1124184..6facfb16ec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java @@ -19,8 +19,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index 429caaa657..3a389a79e7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -20,10 +20,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index 7c483f7f42..ac3dab3382 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 4d379139b6..65cdfaf050 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -130,7 +130,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(7, testController.getInternalStatusUpdateCount()); + assertEquals(8, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); FlinkDeploymentReconciliationStatus reconciliationStatus = @@ -277,6 +277,8 @@ public void verifyFailedDeployment() throws Exception { validatingResponseProvider.assertValidated(); + validateConditionStatus(appCluster, "Reconciling"); + // Validate status assertNotNull(appCluster.getStatus().getError()); @@ -359,7 +361,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro appCluster.getStatus().getJobManagerDeploymentStatus()); assertEquals( "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); - + validateConditionStatus(appCluster, "Finished"); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); testController.reconcile(appCluster, context); @@ -618,6 +620,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception // jobStatus has not been set at this time assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState()); + validateConditionStatus(appCluster, "Reconciling"); // Switches operator mode to SESSION appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used @@ -639,6 +642,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId()); assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName()); assertEquals(expectedJobStatus.getJobState(), jobStatus.getState()); + validateConditionStatus(appCluster, "Running"); } @Test @@ -652,6 +656,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); + validateConditionStatus(appCluster, "JobManagerIsDeploying"); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -662,6 +667,8 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except // jobStatus has not been set at this time assertNull(jobStatus.getState()); + validateConditionStatus(appCluster, "DeployedNotReady"); + // Switches operator mode to APPLICATION appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob()); // Validation fails and JobObserver should still be used @@ -676,6 +683,8 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except .getError() .contains("Cannot switch from session to job cluster")); assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob()); + + validateConditionStatus(appCluster, "JobManagerReady"); } private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception { @@ -1169,7 +1178,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( @@ -1184,7 +1193,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E assertEquals( org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); - assertEquals(6, testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( @@ -1254,4 +1263,12 @@ private String getIngressHost(HasMetadata ingress) { return ingressRuleV1beta1.getHost(); } } + + private void validateConditionStatus(FlinkDeployment appCluster, String reason) { + assertThat(appCluster.getStatus().getConditions()).isNotNull(); + assertThat(appCluster.getStatus().getConditions()) + .hasSize(1) + .extracting("reason") + .contains(reason); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java index fde2b12421..a871466bd0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java @@ -24,11 +24,11 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.Mode; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; import org.apache.flink.util.concurrent.Executors; diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 33ab49e82d..c81b2b10f2 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -10685,6 +10685,23 @@ spec: additionalProperties: type: string type: object + conditions: + items: + properties: + lastTransitionTime: + type: string + message: + type: string + observedGeneration: + type: integer + reason: + type: string + status: + type: string + type: + type: string + type: object + type: array error: type: string jobManagerDeploymentStatus: