Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ on:
push:
branches:
- master
paths-ignore:
- '**.md'
pull_request:
types:
- opened
- synchronize
paths-ignore:
- '**.md'

jobs:
build-docker:
Expand Down Expand Up @@ -57,6 +64,26 @@ jobs:
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache

fmt:
name: Check format ${{ matrix.rust }}
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}

- name: cargo fmt
run: |
cargo fmt --check

test:
name: Test ${{ matrix.rust }} on ${{ matrix.os }}
needs: build-docker
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "gordo-controller"
version = "2.1.2"
version = "2.1.3"
authors = ["Miles Granger <miles59923@gmail.com>", "Serhii Koropets <skoro@equinor.com>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# gordo-controller
Gordo controller
[Gordo](https://github.com/equinor/gordo) controller

[![CI](https://github.com/equinor/gordo-controller/workflows/CI/badge.svg)](https://github.com/equinor/gordo-controller/actions)

Expand Down
5 changes: 1 addition & 4 deletions examples/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
assert!(resp.is_empty());

let body = reqwest::get("http://0.0.0.0:8888/metrics")
.await?
.text()
.await?;
let body = reqwest::get("http://0.0.0.0:8888/metrics").await?.text().await?;

assert!(body.contains("gordo_controller_http_requests_total"));

Expand Down
5 changes: 2 additions & 3 deletions src/crd/argo/argo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use kube::CustomResource;
use serde::{Deserialize, Serialize};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

// Origin here https://github.com/argoproj/argo/blob/master/pkg/apis/workflow/v1alpha1/workflow_types.go#L34
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
Expand Down Expand Up @@ -30,8 +30,7 @@ impl Default for ArgoWorkflowPhase {
#[kube(group = "argoproj.io", version = "v1alpha1", kind = "Workflow", namespaced)]
#[kube(shortname = "wf")]
#[kube(status = "ArgoWorkflowStatus")]
pub struct ArgoWorkflowSpec {
}
pub struct ArgoWorkflowSpec {}

#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
pub struct ArgoWorkflowStatus {
Expand Down
148 changes: 89 additions & 59 deletions src/crd/argo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,46 @@
pub mod argo;
pub use argo::*;

use log::{error, info, warn};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status};
use crate::crd::pod::{POD_MATCH_LABELS, FAILED};
use crate::crd::metrics::warning_happened;
use k8s_openapi::api::core::v1::ContainerStateTerminated;
use chrono::{DateTime, Utc};
use k8s_openapi::{
api::core::v1::Pod,
use crate::crd::model::{
patch_model_status, patch_model_with_default_status, Model, ModelPhase, ModelPodTerminatedStatus,
};
use crate::crd::pod::{FAILED, POD_MATCH_LABELS};
use chrono::{DateTime, Utc};
use k8s_openapi::api::core::v1::ContainerStateTerminated;
use k8s_openapi::api::core::v1::Pod;
use kube::api::Api;
use log::{error, info, warn};

pub const WF_MATCH_LABELS: &'static [&'static str] = &[
"applications.gordo.equinor.com/project-name",
"applications.gordo.equinor.com/project-revision",
"applications.gordo.equinor.com/project-name",
"applications.gordo.equinor.com/project-revision",
];

pub const WF_NUMBER_LABEL: &str = "applications.gordo.equinor.com/project-workflow";

fn some_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec<ArgoWorkflowPhase>) -> bool {
workflows.iter()
.any(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
workflows.iter().any(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
}

fn all_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec<ArgoWorkflowPhase>) -> bool {
workflows.iter()
.all(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
workflows.iter().all(|wf| match &wf.status {
Some(status) => match &status.phase {
Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(),
None => false,
},
_ => false,
})
}

fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec<&'a Workflow> {
//TODO for performance reason we supposed to reimplement this algorithm with BTreeMap
//TODO for performance reason we supposed to reimplement this algorithm with BTreeMap
workflows
.iter()
.filter(|workflow| {
Expand Down Expand Up @@ -75,25 +73,23 @@ fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec<

fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec<Pod>) -> Vec<&'a ContainerStateTerminated> {
pods.iter()
.filter(|pod| {
match &pod.status {
Some(status) => match &status.phase {
Some(phase) => phase == FAILED,
None => false,
},
.filter(|pod| match &pod.status {
Some(status) => match &status.phase {
Some(phase) => phase == FAILED,
None => false,
}
},
None => false,
})
.filter(|pod| {
let pod_labels = &pod.metadata.labels;
let model_labels = &model.metadata.labels;
POD_MATCH_LABELS
.iter()
.all(|&label_name| {
match (model_labels, pod_labels) {
(Some(model_labels), Some(pod_labels)) => model_labels.get(label_name) == pod_labels.get(label_name),
_ => false,
.all(|&label_name| match (model_labels, pod_labels) {
(Some(model_labels), Some(pod_labels)) => {
model_labels.get(label_name) == pod_labels.get(label_name)
}
_ => false,
})
})
.flat_map(|pod| pod.status.as_ref())
Expand All @@ -104,10 +100,13 @@ fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec<Pod>) ->
.collect()
}

fn last_container_terminated_status(terminated_statuses: Vec<&ContainerStateTerminated>) -> Option<&ContainerStateTerminated> {
fn last_container_terminated_status(
terminated_statuses: Vec<&ContainerStateTerminated>,
) -> Option<&ContainerStateTerminated> {
if terminated_statuses.len() > 0 {
let min_date_time = DateTime::<Utc>::MIN_UTC.clone();
let last_terminated_state_ind = terminated_statuses.iter()
let last_terminated_state_ind = terminated_statuses
.iter()
.enumerate()
.max_by_key(|(_, terminated_state)| match &terminated_state.finished_at {
Some(finished_at) => finished_at.0,
Expand All @@ -129,7 +128,7 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
None => {
warn!("Model labels field is empty");
continue;
},
}
};
let model_name = match &model.metadata.name {
Some(model_name) => model_name,
Expand All @@ -139,17 +138,27 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
}
};
match &model.status {
Some(model_status) => {
let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) {
Some(model_status) => {
let is_reapplied_model = match (
&model_status.revision,
labels.get("applications.gordo.equinor.com/project-revision"),
) {
(Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision,
_ => false,
};
if !is_reapplied_model {
if !is_reapplied_model {
match &model_status.phase {
ModelPhase::InProgress | ModelPhase::Unknown => {
let found_workflows = find_model_workflows(&model, &workflows);
let mut new_model_phase: Option<ModelPhase> = None;
if some_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Error, ArgoWorkflowPhase::Failed, ArgoWorkflowPhase::Skipped]) {
if some_of_workflows_in_phases(
&found_workflows,
vec![
ArgoWorkflowPhase::Error,
ArgoWorkflowPhase::Failed,
ArgoWorkflowPhase::Skipped,
],
) {
new_model_phase = Some(ModelPhase::Failed);
} else if all_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Succeeded]) {
new_model_phase = Some(ModelPhase::Succeeded);
Expand All @@ -162,22 +171,31 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
if let Some(model_name) = labels.get("applications.gordo.equinor.com/model-name") {
let terminated_statuses = failed_pods_terminated_statuses(&model, &pods);
info!("Found {} failed pods in terminated status which is relates to the model '{}'", terminated_statuses.len(), model_name);
if let Some(terminated_status) = last_container_terminated_status(terminated_statuses) {
if let Some(terminated_status) =
last_container_terminated_status(terminated_statuses)
{
new_model_status.code = Some(terminated_status.exit_code);
if let Some(message) = &terminated_status.message {
let trimmed_message = message.trim_end();
if !trimmed_message.is_empty() {
let result: serde_json::Result<ModelPodTerminatedStatus> = serde_json::from_str(&trimmed_message);
let result: serde_json::Result<ModelPodTerminatedStatus> =
serde_json::from_str(&trimmed_message);
match result {
Ok(terminated_status_message) => {
info!("Last terminated status message {:?} for model '{}'", terminated_status_message, model_name);
new_model_status.error_type = terminated_status_message.error_type.clone();
new_model_status.message = terminated_status_message.message.clone();
new_model_status.traceback = terminated_status_message.traceback.clone();
},
info!(
"Last terminated status message {:?} for model '{}'",
terminated_status_message, model_name
);
new_model_status.error_type =
terminated_status_message.error_type.clone();
new_model_status.message =
terminated_status_message.message.clone();
new_model_status.traceback =
terminated_status_message.traceback.clone();
}
Err(err) => {
warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err);
warning_happened("parse_terminated_message")
warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err);
warning_happened("parse_terminated_message")
}
}
}
Expand All @@ -188,29 +206,41 @@ pub async fn monitor_wf(model_api: &Api<Model>, workflows: &Vec<Workflow>, model
if model_phase != model_status.phase {
match patch_model_status(&model_api, &model_name, &new_model_status).await {
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to {:?}", model_name, model.status, new_model.status);
info!(
"Patching Model '{}' from status {:?} to {:?}",
model_name, model.status, new_model.status
);
}
Err(err) => {
error!( "Failed to patch status of Model '{}' - error: {:?}", model_name, err);
error!(
"Failed to patch status of Model '{}' - error: {:?}",
model_name, err
);
}
}
}
}
},
}
_ => (),
}
} else {
match patch_model_with_default_status(&model_api, &model).await {
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to default status {:?}", model_name, model.status, new_model.status);
info!(
"Patching Model '{}' from status {:?} to default status {:?}",
model_name, model.status, new_model.status
);
}
Err(err) => {
error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model_name, err);
error!(
"Failed to patch status of Model '{}' with default status - error: {:?}",
model_name, err
);
}
}
}
}
_ => (),
};
}
}
}
Loading
Loading