|
| 1 | +//! Please be aware that this file is mostly copy/paste from <https://github.com/stackabletech/stackablectl/blob/eda45945cfcf5c6581cf1b88c782d98fada8065f/src/kube.rs#L48-L187> |
| 2 | +//! It does only support services of type NodePort and LoadBalancer. |
| 3 | +//! This code will be removed entirely once listener-op is implemented in all operators, |
| 4 | +//! the endpoints will than be read by some sort of listener-operator API. |
| 5 | +//! |
| 6 | +//! So there is no point in optimizing this code or to blame the ones who have wrote it ;P |
| 7 | +
|
| 8 | +use std::collections::HashMap; |
| 9 | + |
| 10 | +use indexmap::IndexMap; |
| 11 | +use k8s_openapi::api::core::v1::{Service, ServiceSpec}; |
| 12 | +use kube::{api::ListParams, ResourceExt}; |
| 13 | +use snafu::{OptionExt, ResultExt, Snafu}; |
| 14 | +use tracing::{debug, warn}; |
| 15 | + |
| 16 | +use crate::utils::k8s::{KubeClient, KubeClientError, ListParamsExt, ProductLabel}; |
| 17 | + |
| 18 | +#[derive(Debug, Snafu)] |
| 19 | +pub enum ServiceError { |
| 20 | + #[snafu(display("kube error: {source}"))] |
| 21 | + KubeClientError { source: KubeClientError }, |
| 22 | + |
| 23 | + #[snafu(display("missing namespace for service '{service}'"))] |
| 24 | + MissingServiceNamespace { service: String }, |
| 25 | + |
| 26 | + #[snafu(display("missing spec for service '{service}'"))] |
| 27 | + MissingServiceSpec { service: String }, |
| 28 | + |
| 29 | + #[snafu(display("failed to get status of node {node_name}"))] |
| 30 | + GetNodeStatus { node_name: String }, |
| 31 | + |
| 32 | + #[snafu(display("failed to get address of node {node_name}"))] |
| 33 | + GetNodeAddress { node_name: String }, |
| 34 | + |
| 35 | + #[snafu(display("Could not find an ExternalIP or InternalIP for node {node_name}"))] |
| 36 | + NoIpForNode { node_name: String }, |
| 37 | + |
| 38 | + #[snafu(display("failed to find node {node_name} in node_name_ip_mapping"))] |
| 39 | + NodeMissingInIpMapping { node_name: String }, |
| 40 | +} |
| 41 | + |
| 42 | +pub async fn get_service_endpoints( |
| 43 | + product_name: &str, |
| 44 | + object_name: &str, |
| 45 | + object_namespace: &str, |
| 46 | +) -> Result<IndexMap<String, String>, ServiceError> { |
| 47 | + let kube_client = KubeClient::new().await.context(KubeClientSnafu)?; |
| 48 | + |
| 49 | + let service_list_params = |
| 50 | + ListParams::from_product(product_name, Some(object_name), ProductLabel::Name); |
| 51 | + |
| 52 | + let services = kube_client |
| 53 | + .list_services(Some(object_namespace), &service_list_params) |
| 54 | + .await |
| 55 | + .context(KubeClientSnafu)?; |
| 56 | + |
| 57 | + let mut endpoints = IndexMap::new(); |
| 58 | + |
| 59 | + for service in services { |
| 60 | + match get_service_endpoint_urls(&kube_client, &service, object_name).await { |
| 61 | + Ok(urls) => endpoints.extend(urls), |
| 62 | + Err(err) => warn!( |
| 63 | + "Failed to get endpoint_urls of service {service_name}: {err}", |
| 64 | + service_name = service.name_unchecked(), |
| 65 | + ), |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | + Ok(endpoints) |
| 70 | +} |
| 71 | + |
| 72 | +pub async fn get_service_endpoint_urls( |
| 73 | + kube_client: &KubeClient, |
| 74 | + service: &Service, |
| 75 | + referenced_object_name: &str, |
| 76 | +) -> Result<IndexMap<String, String>, ServiceError> { |
| 77 | + let service_name = service.name_unchecked(); |
| 78 | + let service_namespace = service.namespace().context(MissingServiceNamespaceSnafu { |
| 79 | + service: service_name.clone(), |
| 80 | + })?; |
| 81 | + let service_spec = service.spec.as_ref().context(MissingServiceSpecSnafu { |
| 82 | + service: service_name.clone(), |
| 83 | + })?; |
| 84 | + |
| 85 | + let endpoints = match service_spec.type_.as_deref() { |
| 86 | + Some("NodePort") => { |
| 87 | + get_service_endpoint_urls_for_nodeport( |
| 88 | + kube_client, |
| 89 | + &service_name, |
| 90 | + service_spec, |
| 91 | + &service_namespace, |
| 92 | + referenced_object_name, |
| 93 | + ) |
| 94 | + .await? |
| 95 | + } |
| 96 | + Some("LoadBalancer") => { |
| 97 | + get_service_endpoint_urls_for_loadbalancer( |
| 98 | + &service_name, |
| 99 | + service, |
| 100 | + service_spec, |
| 101 | + referenced_object_name, |
| 102 | + ) |
| 103 | + .await? |
| 104 | + } |
| 105 | + _ => IndexMap::new(), |
| 106 | + }; |
| 107 | + |
| 108 | + Ok(endpoints) |
| 109 | +} |
| 110 | + |
| 111 | +pub async fn get_service_endpoint_urls_for_nodeport( |
| 112 | + kube_client: &KubeClient, |
| 113 | + service_name: &str, |
| 114 | + service_spec: &ServiceSpec, |
| 115 | + service_namespace: &str, |
| 116 | + referenced_object_name: &str, |
| 117 | +) -> Result<IndexMap<String, String>, ServiceError> { |
| 118 | + let endpoints = kube_client |
| 119 | + .get_endpoints(service_namespace, service_name) |
| 120 | + .await |
| 121 | + .context(KubeClientSnafu)?; |
| 122 | + |
| 123 | + let node_name = match &endpoints.subsets { |
| 124 | + Some(subsets) if subsets.len() == 1 => match &subsets[0].addresses { |
| 125 | + Some(addresses) if !addresses.is_empty() => match &addresses[0].node_name { |
| 126 | + Some(node_name) => node_name, |
| 127 | + None => { |
| 128 | + warn!("Could not determine the node the endpoint {service_name} is running on because the address of the subset didn't had a node name"); |
| 129 | + return Ok(IndexMap::new()); |
| 130 | + } |
| 131 | + }, |
| 132 | + Some(_) => { |
| 133 | + warn!("Could not determine the node the endpoint {service_name} is running on because the subset had no addresses"); |
| 134 | + return Ok(IndexMap::new()); |
| 135 | + } |
| 136 | + None => { |
| 137 | + warn!("Could not determine the node the endpoint {service_name} is running on because subset had no addresses. Is the service {service_name} up and running?"); |
| 138 | + return Ok(IndexMap::new()); |
| 139 | + } |
| 140 | + }, |
| 141 | + Some(subsets) => { |
| 142 | + warn!("Could not determine the node the endpoint {service_name} is running on because endpoints consists of {num_subsets} subsets", num_subsets=subsets.len()); |
| 143 | + return Ok(IndexMap::new()); |
| 144 | + } |
| 145 | + None => { |
| 146 | + warn!("Could not determine the node the endpoint {service_name} is running on because the endpoint has no subset. Is the service {service_name} up and running?"); |
| 147 | + return Ok(IndexMap::new()); |
| 148 | + } |
| 149 | + }; |
| 150 | + |
| 151 | + let node_ip = get_node_ip(kube_client, node_name).await?; |
| 152 | + |
| 153 | + let mut endpoints = IndexMap::new(); |
| 154 | + for service_port in service_spec.ports.iter().flatten() { |
| 155 | + match service_port.node_port { |
| 156 | + Some(node_port) => { |
| 157 | + let endpoint_name = service_name |
| 158 | + .trim_start_matches(referenced_object_name) |
| 159 | + .trim_start_matches('-'); |
| 160 | + |
| 161 | + let port_name = service_port |
| 162 | + .name |
| 163 | + .clone() |
| 164 | + .unwrap_or_else(|| service_port.port.to_string()); |
| 165 | + let endpoint_name = if endpoint_name.is_empty() { |
| 166 | + port_name.clone() |
| 167 | + } else { |
| 168 | + format!("{endpoint_name}-{port_name}") |
| 169 | + }; |
| 170 | + |
| 171 | + let endpoint = endpoint_url(&node_ip, node_port, &port_name); |
| 172 | + endpoints.insert(endpoint_name, endpoint); |
| 173 | + } |
| 174 | + None => debug!("Could not get endpoint_url as service {service_name} has no nodePort"), |
| 175 | + } |
| 176 | + } |
| 177 | + |
| 178 | + Ok(endpoints) |
| 179 | +} |
| 180 | + |
| 181 | +pub async fn get_service_endpoint_urls_for_loadbalancer( |
| 182 | + service_name: &str, |
| 183 | + service: &Service, |
| 184 | + service_spec: &ServiceSpec, |
| 185 | + referenced_object_name: &str, |
| 186 | +) -> Result<IndexMap<String, String>, ServiceError> { |
| 187 | + let mut endpoints = IndexMap::new(); |
| 188 | + |
| 189 | + let lb_host = service |
| 190 | + .status |
| 191 | + .as_ref() |
| 192 | + .and_then(|s| s.load_balancer.as_ref()) |
| 193 | + .and_then(|l| l.ingress.as_ref()) |
| 194 | + .and_then(|l| l.get(0)); |
| 195 | + |
| 196 | + if let Some(lb_host) = lb_host { |
| 197 | + let lb_host = lb_host.hostname.as_ref().or(lb_host.ip.as_ref()); |
| 198 | + if let Some(lb_host) = lb_host { |
| 199 | + for service_port in service_spec.ports.iter().flatten() { |
| 200 | + let lb_port = service_port.port; |
| 201 | + |
| 202 | + let endpoint_name = service_name |
| 203 | + .trim_start_matches(referenced_object_name) |
| 204 | + .trim_start_matches('-'); |
| 205 | + |
| 206 | + let port_name = service_port |
| 207 | + .name |
| 208 | + .clone() |
| 209 | + .unwrap_or_else(|| lb_port.to_string()); |
| 210 | + let endpoint_name = if endpoint_name.is_empty() { |
| 211 | + port_name.clone() |
| 212 | + } else { |
| 213 | + format!("{endpoint_name}-{port_name}") |
| 214 | + }; |
| 215 | + |
| 216 | + let endpoint = endpoint_url(lb_host, lb_port, &port_name); |
| 217 | + endpoints.insert(endpoint_name, endpoint); |
| 218 | + } |
| 219 | + } |
| 220 | + } |
| 221 | + |
| 222 | + Ok(endpoints) |
| 223 | +} |
| 224 | + |
| 225 | +async fn get_node_ip(kube_client: &KubeClient, node_name: &str) -> Result<String, ServiceError> { |
| 226 | + let node_name_ip_mapping = get_node_name_ip_mapping(kube_client).await?; |
| 227 | + |
| 228 | + match node_name_ip_mapping.get(node_name) { |
| 229 | + Some(node_ip) => Ok(node_ip.to_string()), |
| 230 | + None => NodeMissingInIpMappingSnafu { node_name }.fail(), |
| 231 | + } |
| 232 | +} |
| 233 | + |
| 234 | +// TODO(sbernauer): Add caching. Not going to do so now, as listener-op |
| 235 | +// will replace this code entirely anyway. |
| 236 | +async fn get_node_name_ip_mapping( |
| 237 | + kube_client: &KubeClient, |
| 238 | +) -> Result<HashMap<String, String>, ServiceError> { |
| 239 | + let nodes = kube_client.list_nodes().await.context(KubeClientSnafu)?; |
| 240 | + |
| 241 | + let mut result = HashMap::new(); |
| 242 | + for node in nodes { |
| 243 | + let node_name = node.name_unchecked(); |
| 244 | + let preferred_node_ip = node |
| 245 | + .status |
| 246 | + .context(GetNodeStatusSnafu { |
| 247 | + node_name: node_name.to_string(), |
| 248 | + })? |
| 249 | + .addresses |
| 250 | + .context(GetNodeAddressSnafu { |
| 251 | + node_name: node_name.to_string(), |
| 252 | + })? |
| 253 | + .iter() |
| 254 | + .filter(|address| address.type_ == "InternalIP" || address.type_ == "ExternalIP") |
| 255 | + .min_by_key(|address| &address.type_) // ExternalIP (which we want) is lower than InternalIP |
| 256 | + .map(|address| address.address.clone()) |
| 257 | + .context(NoIpForNodeSnafu { |
| 258 | + node_name: node_name.to_string(), |
| 259 | + })?; |
| 260 | + result.insert(node_name, preferred_node_ip); |
| 261 | + } |
| 262 | + |
| 263 | + Ok(result) |
| 264 | +} |
| 265 | + |
| 266 | +fn endpoint_url(endpoint_host: &str, endpoint_port: i32, port_name: &str) -> String { |
| 267 | + // TODO: Consolidate web-ui port names in operators based on decision in arch meeting from 2022/08/10 |
| 268 | + // For Superset: https://github.com/stackabletech/superset-operator/issues/248 |
| 269 | + // For Airflow: https://github.com/stackabletech/airflow-operator/issues/146 |
| 270 | + // As we still support older operator versions we need to also include the "old" way of naming |
| 271 | + if port_name == "http" |
| 272 | + || port_name.starts_with("http-") |
| 273 | + || port_name == "ui" |
| 274 | + || port_name == "airflow" |
| 275 | + || port_name == "superset" |
| 276 | + { |
| 277 | + format!("http://{endpoint_host}:{endpoint_port}") |
| 278 | + } else if port_name == "https" || port_name.starts_with("https-") { |
| 279 | + format!("https://{endpoint_host}:{endpoint_port}") |
| 280 | + } else { |
| 281 | + format!("{endpoint_host}:{endpoint_port}") |
| 282 | + } |
| 283 | +} |
0 commit comments