Skip to content

Listener: Use headless and metrics service #651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add support for airflow `2.10.5` ([#625]).
- Add experimental support for airflow `3.0.1` ([#630]).
- "airflow.task" logger defaults to log level 'INFO' instead of 'NOTSET' ([#649]).
- Add internal headless service in addition to the metrics service ([#651]).

### Changed

Expand Down Expand Up @@ -52,6 +53,7 @@
[#636]: https://github.com/stackabletech/airflow-operator/pull/636
[#645]: https://github.com/stackabletech/airflow-operator/pull/645
[#649]: https://github.com/stackabletech/airflow-operator/pull/649
[#651]: https://github.com/stackabletech/airflow-operator/pull/651

## [25.3.0] - 2025-03-21

Expand Down
117 changes: 56 additions & 61 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use stackable_operator::{
api::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, Service, ServiceAccount,
ServicePort, ServiceSpec, TCPSocketAction,
ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, ServiceAccount,
TCPSocketAction,
},
},
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
Expand Down Expand Up @@ -101,6 +101,10 @@ use crate::{
pdb::add_pdbs,
},
product_logging::extend_config_map_with_log_config,
service::{
build_rolegroup_headless_service, build_rolegroup_metrics_service,
stateful_set_service_name,
},
};

pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster";
Expand Down Expand Up @@ -339,6 +343,9 @@ pub enum Error {
ApplyGroupListener {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("failed to configure service"))]
ServiceConfiguration { source: crate::service::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -491,12 +498,53 @@ pub async fn reconcile_airflow(
)
.context(InvalidGitSyncSpecSnafu)?;

let rg_service = build_rolegroup_service(airflow, &resolved_product_image, &rolegroup)?;
cluster_resources.add(client, rg_service).await.context(
ApplyRoleGroupServiceSnafu {
let role_group_service_recommended_labels = build_recommended_labels(
airflow,
AIRFLOW_CONTROLLER_NAME,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
);

let role_group_service_selector = Labels::role_group_selector(
airflow,
APP_NAME,
&rolegroup.role,
&rolegroup.role_group,
)
.context(LabelBuildSnafu)?;

// Only apply headless service for something exposing an HTTP port
if airflow_role.get_http_port().is_some() {
let rg_headless_service = build_rolegroup_headless_service(
airflow,
&rolegroup,
role_group_service_recommended_labels.clone(),
role_group_service_selector.clone().into(),
)
.context(ServiceConfigurationSnafu)?;

cluster_resources
.add(client, rg_headless_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
})?;
}

let rg_metrics_service = build_rolegroup_metrics_service(
airflow,
&rolegroup,
role_group_service_recommended_labels,
role_group_service_selector.into(),
)
.context(ServiceConfigurationSnafu)?;
cluster_resources
.add(client, rg_metrics_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
},
)?;
})?;

let rg_statefulset = build_server_rolegroup_statefulset(
airflow,
Expand Down Expand Up @@ -767,53 +815,6 @@ fn build_rolegroup_config_map(
})
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_rolegroup_service(
airflow: &v1alpha1::AirflowCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::AirflowCluster>,
) -> Result<Service> {
let ports = vec![ServicePort {
name: Some(METRICS_PORT_NAME.into()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..Default::default()
}];

let prometheus_label =
Label::try_from(("prometheus.io/scrape", "true")).context(BuildLabelSnafu)?;

let metadata = build_rolegroup_metadata(
airflow,
&resolved_product_image,
&rolegroup,
prometheus_label,
format!("{name}-metrics", name = rolegroup.object_name()),
)?;

let service_selector_labels =
Labels::role_group_selector(airflow, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(BuildLabelSnafu)?;

let service_spec = ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(ports),
selector: Some(service_selector_labels.into()),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
};

Ok(Service {
metadata,
spec: Some(service_spec),
status: None,
})
}

fn build_rolegroup_metadata(
airflow: &v1alpha1::AirflowCluster,
resolved_product_image: &&ResolvedProductImage,
Expand Down Expand Up @@ -874,8 +875,6 @@ fn listener_ports() -> Vec<listener::v1alpha1::ListenerPort> {
}

/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]).
#[allow(clippy::too_many_arguments)]
fn build_server_rolegroup_statefulset(
airflow: &v1alpha1::AirflowCluster,
Expand Down Expand Up @@ -1160,10 +1159,7 @@ fn build_server_rolegroup_statefulset(
match_labels: Some(statefulset_match_labels.into()),
..LabelSelector::default()
},
service_name: Some(format!(
"{name}-metrics",
name = rolegroup_ref.object_name()
)),
service_name: stateful_set_service_name(airflow_role, rolegroup_ref),
template: pod_template,
volume_claim_templates: pvcs,
..StatefulSetSpec::default()
Expand Down Expand Up @@ -1349,7 +1345,6 @@ pub fn error_policy(
_ => Action::requeue(*Duration::from_secs(10)),
}
}
// I want to add secret volumes right here

fn add_authentication_volumes_and_volume_mounts(
authentication_config: &AirflowClientAuthenticationDetailsResolved,
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod crd;
mod env_vars;
mod operations;
mod product_logging;
mod service;
mod util;

mod built_info {
Expand Down
153 changes: 153 additions & 0 deletions rust/operator-binary/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::collections::BTreeMap;

use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::meta::ObjectMetaBuilder,
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
kvp::{Label, ObjectLabels},
role_utils::RoleGroupRef,
};

use crate::crd::{
AirflowRole, HTTP_PORT, HTTP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1,
};

pub const METRICS_SERVICE_SUFFIX: &str = "metrics";
pub const HEADLESS_SERVICE_SUFFIX: &str = "headless";

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},

#[snafu(display("failed to build Metadata"))]
MetadataBuild {
source: stackable_operator::builder::meta::Error,
},

#[snafu(display("failed to build Labels"))]
LabelBuild {
source: stackable_operator::kvp::LabelError,
},
}

/// The rolegroup headless [`Service`] is a service that allows direct access to the instances of a certain rolegroup
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
pub fn build_rolegroup_headless_service(
airflow: &v1alpha1::AirflowCluster,
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
object_labels: ObjectLabels<v1alpha1::AirflowCluster>,
selector: BTreeMap<String, String>,
) -> Result<Service, Error> {
let ports = headless_service_ports();

let metadata = ObjectMetaBuilder::new()
.name_and_namespace(airflow)
.name(rolegroup_headless_service_name(
&rolegroup_ref.object_name(),
))
.ownerreference_from_resource(airflow, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(object_labels)
.context(MetadataBuildSnafu)?
.build();

let service_spec = ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(ports),
selector: Some(selector),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
};

Ok(Service {
metadata,
spec: Some(service_spec),
status: None,
})
}

/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label.
pub fn build_rolegroup_metrics_service(
airflow: &v1alpha1::AirflowCluster,
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
object_labels: ObjectLabels<v1alpha1::AirflowCluster>,
selector: BTreeMap<String, String>,
) -> Result<Service, Error> {
let ports = metrics_service_ports();

let prometheus_label =
Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?;

let metadata = ObjectMetaBuilder::new()
.name_and_namespace(airflow)
.name(rolegroup_metrics_service_name(&rolegroup_ref.object_name()))
.ownerreference_from_resource(airflow, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(object_labels)
.context(MetadataBuildSnafu)?
.with_label(prometheus_label)
.build();

let service_spec = ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(ports),
selector: Some(selector),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
};

Ok(Service {
metadata,
spec: Some(service_spec),
status: None,
})
}

pub fn stateful_set_service_name(
airflow_role: &AirflowRole,
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
) -> Option<String> {
match airflow_role {
AirflowRole::Webserver => Some(rolegroup_headless_service_name(
&rolegroup_ref.object_name(),
)),
AirflowRole::Scheduler | AirflowRole::Worker => None,
}
}

/// Returns the metrics rolegroup service name `<cluster>-<role>-<rolegroup>-<METRICS_SERVICE_SUFFIX>`.
// TODO: Replace by operator.rs functions
fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String {
format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}")
}

/// Returns the headless rolegroup service name `<cluster>-<role>-<rolegroup>-<HEADLESS_SERVICE_SUFFIX>`.
// TODO: Replace by operator.rs functions
fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String {
format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}")
}

fn headless_service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(HTTP_PORT_NAME.to_string()),
port: HTTP_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn metrics_service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}