diff --git a/src/config.rs b/src/config.rs index 7cf10fe..e2e60e5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,11 @@ pub struct OperatorConfig { #[arg(short = 't', long, env = "ROBOTLB_HCLOUD_TOKEN")] pub hcloud_token: String, + /// If this flag is enabled, the operator will try to find target nodes + /// based on where target pods are actually deployed. + #[arg(long, env = "ROBOTLB_DYNAMIC_NODE_SELECTOR", default_value = "true")] + pub dynamic_node_selector: bool, + /// Default load balancer healthcheck retries cound. #[arg(long, env = "ROBOTLB_DEFAULT_LB_RETRIES", default_value = "3")] pub default_lb_retries: i32, diff --git a/src/error.rs b/src/error.rs index ae1f82f..fb9d6b9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,6 +20,8 @@ pub enum LBTrackerError { KubeError(#[from] kube::Error), #[error("Unknown LoadBalancing alorithm")] UnknownLBAlgorithm, + #[error("Cannot get target nodes, because the service has no selector")] + ServiceWithoutSelector, // HCloud API errors #[error("Cannot attach load balancer to a network. Reason: {0}")] diff --git a/src/lb.rs b/src/lb.rs index 74bcbb7..3fc1ad5 100644 --- a/src/lb.rs +++ b/src/lb.rs @@ -158,6 +158,7 @@ impl LoadBalancer { /// The target will receive the traffic from the services. /// The target is identified by its IP address. pub fn add_target(&mut self, ip: &str) { + tracing::debug!("Adding target {}", ip); self.targets.push(ip.to_string()); } diff --git a/src/main.rs b/src/main.rs index ab05903..f2d086c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,7 @@ use error::{LBTrackerError, LBTrackerResult}; use futures::StreamExt; use hcloud::apis::configuration::Configuration as HCloudConfig; use k8s_openapi::{ - api::core::v1::{Node, Service}, + api::core::v1::{Node, Pod, Service}, serde_json::json, }; use kube::{ @@ -32,7 +32,7 @@ use kube::{ }; use label_filter::LabelFilter; use lb::LoadBalancer; -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration}; pub mod config; pub mod consts; @@ -155,21 +155,68 @@ pub async fn reconcile_service( reconcile_load_balancer(lb, svc.clone(), context).await } -/// Reconcile the `LoadBalancer` type of service. -/// This function will find the nodes based on the node selector -/// and create or update the load balancer. -pub async fn reconcile_load_balancer( - mut lb: LoadBalancer, - svc: Arc, - context: Arc, -) -> LBTrackerResult { - let label_filter = svc +/// Method to get nodes dynamically based on the pods. +/// This method will find the nodes where the target pods are deployed. +/// It will use the pod selector to find the pods and then get the nodes. +async fn get_nodes_dynamically( + svc: &Arc, + context: &Arc, +) -> LBTrackerResult> { + let pod_api = kube::Api::::namespaced( + context.client.clone(), + svc.namespace() + .as_ref() + .map(String::as_str) + .unwrap_or_else(|| context.client.default_namespace()), + ); + + let Some(pod_selector) = svc.spec.as_ref().and_then(|spec| spec.selector.clone()) else { + return Err(LBTrackerError::ServiceWithoutSelector); + }; + + let label_selector = pod_selector + .iter() + .map(|(key, val)| format!("{key}={val}")) + .collect::>() + .join(","); + + let pods = pod_api + .list(&ListParams { + label_selector: Some(label_selector), + ..Default::default() + }) + .await?; + + let target_nodes = pods + .iter() + .map(|pod| pod.spec.clone().unwrap_or_default().node_name) + .flatten() + .collect::>(); + + let nodes_api = kube::Api::::all(context.client.clone()); + let nodes = nodes_api + .list(&ListParams::default()) + .await? + .into_iter() + .filter(|node| target_nodes.contains(&node.name_any())) + .collect::>(); + + Ok(nodes) +} + +/// Get nodes based on the node selector. +/// This method will find the nodes based on the node selector +/// from the service annotations. +async fn get_nodes_by_selector( + svc: &Arc, + context: &Arc, +) -> LBTrackerResult> { + let node_selector = svc .annotations() .get(consts::LB_NODE_SELECTOR) .map(String::as_str) - .map(LabelFilter::from_str) - .transpose()? - .unwrap_or_default(); + .ok_or(LBTrackerError::ServiceWithoutSelector)?; + let label_filter = LabelFilter::from_str(node_selector)?; let nodes_api = kube::Api::::all(context.client.clone()); let nodes = nodes_api .list(&ListParams::default()) @@ -177,12 +224,28 @@ pub async fn reconcile_load_balancer( .into_iter() .filter(|node| label_filter.check(node.labels())) .collect::>(); + Ok(nodes) +} +/// Reconcile the `LoadBalancer` type of service. +/// This function will find the nodes based on the node selector +/// and create or update the load balancer. +pub async fn reconcile_load_balancer( + mut lb: LoadBalancer, + svc: Arc, + context: Arc, +) -> LBTrackerResult { let mut node_ip_type = "InternalIP"; if lb.network_name.is_none() { node_ip_type = "ExternalIP"; } + let nodes = if context.config.dynamic_node_selector { + get_nodes_dynamically(&svc, &context).await? + } else { + get_nodes_by_selector(&svc, &context).await? + }; + for node in nodes { let Some(status) = node.status else { continue;