Skip to content

Added dynamic node selector based on service selectors. #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ pub struct OperatorConfig {
#[arg(short = 't', long, env = "ROBOTLB_HCLOUD_TOKEN")]
pub hcloud_token: String,

/// If this flag is enabled, the operator will try to find target nodes
/// based on where target pods are actually deployed.
#[arg(long, env = "ROBOTLB_DYNAMIC_NODE_SELECTOR", default_value = "true")]
pub dynamic_node_selector: bool,

/// Default load balancer healthcheck retries cound.
#[arg(long, env = "ROBOTLB_DEFAULT_LB_RETRIES", default_value = "3")]
pub default_lb_retries: i32,
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum LBTrackerError {
KubeError(#[from] kube::Error),
#[error("Unknown LoadBalancing alorithm")]
UnknownLBAlgorithm,
#[error("Cannot get target nodes, because the service has no selector")]
ServiceWithoutSelector,

// HCloud API errors
#[error("Cannot attach load balancer to a network. Reason: {0}")]
Expand Down
1 change: 1 addition & 0 deletions src/lb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl LoadBalancer {
/// The target will receive the traffic from the services.
/// The target is identified by its IP address.
pub fn add_target(&mut self, ip: &str) {
tracing::debug!("Adding target {}", ip);
self.targets.push(ip.to_string());
}

Expand Down
91 changes: 77 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use error::{LBTrackerError, LBTrackerResult};
use futures::StreamExt;
use hcloud::apis::configuration::Configuration as HCloudConfig;
use k8s_openapi::{
api::core::v1::{Node, Service},
api::core::v1::{Node, Pod, Service},
serde_json::json,
};
use kube::{
Expand All @@ -32,7 +32,7 @@ use kube::{
};
use label_filter::LabelFilter;
use lb::LoadBalancer;
use std::{str::FromStr, sync::Arc, time::Duration};
use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};

pub mod config;
pub mod consts;
Expand Down Expand Up @@ -155,34 +155,97 @@ pub async fn reconcile_service(
reconcile_load_balancer(lb, svc.clone(), context).await
}

/// Reconcile the `LoadBalancer` type of service.
/// This function will find the nodes based on the node selector
/// and create or update the load balancer.
pub async fn reconcile_load_balancer(
mut lb: LoadBalancer,
svc: Arc<Service>,
context: Arc<CurrentContext>,
) -> LBTrackerResult<Action> {
let label_filter = svc
/// Method to get nodes dynamically based on the pods.
/// This method will find the nodes where the target pods are deployed.
/// It will use the pod selector to find the pods and then get the nodes.
async fn get_nodes_dynamically(
svc: &Arc<Service>,
context: &Arc<CurrentContext>,
) -> LBTrackerResult<Vec<Node>> {
let pod_api = kube::Api::<Pod>::namespaced(
context.client.clone(),
svc.namespace()
.as_ref()
.map(String::as_str)
.unwrap_or_else(|| context.client.default_namespace()),
);

let Some(pod_selector) = svc.spec.as_ref().and_then(|spec| spec.selector.clone()) else {
return Err(LBTrackerError::ServiceWithoutSelector);
};

let label_selector = pod_selector
.iter()
.map(|(key, val)| format!("{key}={val}"))
.collect::<Vec<_>>()
.join(",");

let pods = pod_api
.list(&ListParams {
label_selector: Some(label_selector),
..Default::default()
})
.await?;

let target_nodes = pods
.iter()
.map(|pod| pod.spec.clone().unwrap_or_default().node_name)
.flatten()
.collect::<HashSet<_>>();

let nodes_api = kube::Api::<Node>::all(context.client.clone());
let nodes = nodes_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| target_nodes.contains(&node.name_any()))
.collect::<Vec<_>>();

Ok(nodes)
}

/// Get nodes based on the node selector.
/// This method will find the nodes based on the node selector
/// from the service annotations.
async fn get_nodes_by_selector(
svc: &Arc<Service>,
context: &Arc<CurrentContext>,
) -> LBTrackerResult<Vec<Node>> {
let node_selector = svc
.annotations()
.get(consts::LB_NODE_SELECTOR)
.map(String::as_str)
.map(LabelFilter::from_str)
.transpose()?
.unwrap_or_default();
.ok_or(LBTrackerError::ServiceWithoutSelector)?;
let label_filter = LabelFilter::from_str(node_selector)?;
let nodes_api = kube::Api::<Node>::all(context.client.clone());
let nodes = nodes_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| label_filter.check(node.labels()))
.collect::<Vec<_>>();
Ok(nodes)
}

/// Reconcile the `LoadBalancer` type of service.
/// This function will find the nodes based on the node selector
/// and create or update the load balancer.
pub async fn reconcile_load_balancer(
mut lb: LoadBalancer,
svc: Arc<Service>,
context: Arc<CurrentContext>,
) -> LBTrackerResult<Action> {
let mut node_ip_type = "InternalIP";
if lb.network_name.is_none() {
node_ip_type = "ExternalIP";
}

let nodes = if context.config.dynamic_node_selector {
get_nodes_dynamically(&svc, &context).await?
} else {
get_nodes_by_selector(&svc, &context).await?
};

for node in nodes {
let Some(status) = node.status else {
continue;
Expand Down
Loading