diff --git a/CHANGELOG.md b/CHANGELOG.md index 09a2d491..1c26f8f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Add support for airflow `2.10.5` ([#625]). - Add experimental support for airflow `3.0.1` ([#630]). - "airflow.task" logger defaults to log level 'INFO' instead of 'NOTSET' ([#649]). +- Add internal headless service in addition to the metrics service ([#651]). ### Changed @@ -52,6 +53,7 @@ [#636]: https://github.com/stackabletech/airflow-operator/pull/636 [#645]: https://github.com/stackabletech/airflow-operator/pull/645 [#649]: https://github.com/stackabletech/airflow-operator/pull/649 +[#651]: https://github.com/stackabletech/airflow-operator/pull/651 ## [25.3.0] - 2025-03-21 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index fb87ed08..9f933a35 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -41,8 +41,8 @@ use stackable_operator::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ - ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, Service, ServiceAccount, - ServicePort, ServiceSpec, TCPSocketAction, + ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, ServiceAccount, + TCPSocketAction, }, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, @@ -101,6 +101,10 @@ use crate::{ pdb::add_pdbs, }, product_logging::extend_config_map_with_log_config, + service::{ + build_rolegroup_headless_service, build_rolegroup_metrics_service, + stateful_set_service_name, + }, }; pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster"; @@ -339,6 +343,9 @@ pub enum Error { ApplyGroupListener { source: stackable_operator::cluster_resources::Error, }, + + #[snafu(display("failed to configure service"))] + ServiceConfiguration { source: crate::service::Error }, } type Result = std::result::Result; @@ -491,12 +498,53 @@ pub async fn reconcile_airflow( ) .context(InvalidGitSyncSpecSnafu)?; - let rg_service = build_rolegroup_service(airflow, &resolved_product_image, &rolegroup)?; - cluster_resources.add(client, rg_service).await.context( - ApplyRoleGroupServiceSnafu { + let role_group_service_recommended_labels = build_recommended_labels( + airflow, + AIRFLOW_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + ); + + let role_group_service_selector = Labels::role_group_selector( + airflow, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)?; + + // Only apply headless service for something exposing an HTTP port + if airflow_role.get_http_port().is_some() { + let rg_headless_service = build_rolegroup_headless_service( + airflow, + &rolegroup, + role_group_service_recommended_labels.clone(), + role_group_service_selector.clone().into(), + ) + .context(ServiceConfigurationSnafu)?; + + cluster_resources + .add(client, rg_headless_service) + .await + .context(ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + } + + let rg_metrics_service = build_rolegroup_metrics_service( + airflow, + &rolegroup, + role_group_service_recommended_labels, + role_group_service_selector.into(), + ) + .context(ServiceConfigurationSnafu)?; + cluster_resources + .add(client, rg_metrics_service) + .await + .context(ApplyRoleGroupServiceSnafu { rolegroup: rolegroup.clone(), - }, - )?; + })?; let rg_statefulset = build_server_rolegroup_statefulset( airflow, @@ -767,53 +815,6 @@ fn build_rolegroup_config_map( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup -/// -/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_rolegroup_service( - airflow: &v1alpha1::AirflowCluster, - resolved_product_image: &ResolvedProductImage, - rolegroup: &RoleGroupRef, -) -> Result { - let ports = vec![ServicePort { - name: Some(METRICS_PORT_NAME.into()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..Default::default() - }]; - - let prometheus_label = - Label::try_from(("prometheus.io/scrape", "true")).context(BuildLabelSnafu)?; - - let metadata = build_rolegroup_metadata( - airflow, - &resolved_product_image, - &rolegroup, - prometheus_label, - format!("{name}-metrics", name = rolegroup.object_name()), - )?; - - let service_selector_labels = - Labels::role_group_selector(airflow, APP_NAME, &rolegroup.role, &rolegroup.role_group) - .context(BuildLabelSnafu)?; - - let service_spec = ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), - ports: Some(ports), - selector: Some(service_selector_labels.into()), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }; - - Ok(Service { - metadata, - spec: Some(service_spec), - status: None, - }) -} - fn build_rolegroup_metadata( airflow: &v1alpha1::AirflowCluster, resolved_product_image: &&ResolvedProductImage, @@ -874,8 +875,6 @@ fn listener_ports() -> Vec { } /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. -/// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( airflow: &v1alpha1::AirflowCluster, @@ -1160,10 +1159,7 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(format!( - "{name}-metrics", - name = rolegroup_ref.object_name() - )), + service_name: stateful_set_service_name(airflow_role, rolegroup_ref), template: pod_template, volume_claim_templates: pvcs, ..StatefulSetSpec::default() @@ -1349,7 +1345,6 @@ pub fn error_policy( _ => Action::requeue(*Duration::from_secs(10)), } } -// I want to add secret volumes right here fn add_authentication_volumes_and_volume_mounts( authentication_config: &AirflowClientAuthenticationDetailsResolved, diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index e448f891..d36213fc 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -37,6 +37,7 @@ mod crd; mod env_vars; mod operations; mod product_logging; +mod service; mod util; mod built_info { diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs new file mode 100644 index 00000000..52ae4d26 --- /dev/null +++ b/rust/operator-binary/src/service.rs @@ -0,0 +1,153 @@ +use std::collections::BTreeMap; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Label, ObjectLabels}, + role_utils::RoleGroupRef, +}; + +use crate::crd::{ + AirflowRole, HTTP_PORT, HTTP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1, +}; + +pub const METRICS_SERVICE_SUFFIX: &str = "metrics"; +pub const HEADLESS_SERVICE_SUFFIX: &str = "headless"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, +} + +/// The rolegroup headless [`Service`] is a service that allows direct access to the instances of a certain rolegroup +/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. +pub fn build_rolegroup_headless_service( + airflow: &v1alpha1::AirflowCluster, + rolegroup_ref: &RoleGroupRef, + object_labels: ObjectLabels, + selector: BTreeMap, +) -> Result { + let ports = headless_service_ports(); + + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(airflow) + .name(rolegroup_headless_service_name( + &rolegroup_ref.object_name(), + )) + .ownerreference_from_resource(airflow, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .build(); + + let service_spec = ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(ports), + selector: Some(selector), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }; + + Ok(Service { + metadata, + spec: Some(service_spec), + status: None, + }) +} + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label. +pub fn build_rolegroup_metrics_service( + airflow: &v1alpha1::AirflowCluster, + rolegroup_ref: &RoleGroupRef, + object_labels: ObjectLabels, + selector: BTreeMap, +) -> Result { + let ports = metrics_service_ports(); + + let prometheus_label = + Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?; + + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(airflow) + .name(rolegroup_metrics_service_name(&rolegroup_ref.object_name())) + .ownerreference_from_resource(airflow, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .with_label(prometheus_label) + .build(); + + let service_spec = ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(ports), + selector: Some(selector), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }; + + Ok(Service { + metadata, + spec: Some(service_spec), + status: None, + }) +} + +pub fn stateful_set_service_name( + airflow_role: &AirflowRole, + rolegroup_ref: &RoleGroupRef, +) -> Option { + match airflow_role { + AirflowRole::Webserver => Some(rolegroup_headless_service_name( + &rolegroup_ref.object_name(), + )), + AirflowRole::Scheduler | AirflowRole::Worker => None, + } +} + +/// Returns the metrics rolegroup service name `---`. +// TODO: Replace by operator.rs functions +fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String { + format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}") +} + +/// Returns the headless rolegroup service name `---`. +// TODO: Replace by operator.rs functions +fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String { + format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}") +} + +fn headless_service_ports() -> Vec { + vec![ServicePort { + name: Some(HTTP_PORT_NAME.to_string()), + port: HTTP_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +fn metrics_service_ports() -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +}