Skip to content

Commit 6a2d2cc

Browse files
authored
Listener: Use headless and metrics service (#651)
* split headless and metrics service * docs * adapted changelog * add todo for op-rs replacement
1 parent 8690c3a commit 6a2d2cc

File tree

4 files changed

+212
-61
lines changed

4 files changed

+212
-61
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- Add support for airflow `2.10.5` ([#625]).
1313
- Add experimental support for airflow `3.0.1` ([#630]).
1414
- "airflow.task" logger defaults to log level 'INFO' instead of 'NOTSET' ([#649]).
15+
- Add internal headless service in addition to the metrics service ([#651]).
1516

1617
### Changed
1718

@@ -52,6 +53,7 @@
5253
[#636]: https://github.com/stackabletech/airflow-operator/pull/636
5354
[#645]: https://github.com/stackabletech/airflow-operator/pull/645
5455
[#649]: https://github.com/stackabletech/airflow-operator/pull/649
56+
[#651]: https://github.com/stackabletech/airflow-operator/pull/651
5557

5658
## [25.3.0] - 2025-03-21
5759

rust/operator-binary/src/airflow_controller.rs

Lines changed: 56 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ use stackable_operator::{
4141
api::{
4242
apps::v1::{StatefulSet, StatefulSetSpec},
4343
core::v1::{
44-
ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, Service, ServiceAccount,
45-
ServicePort, ServiceSpec, TCPSocketAction,
44+
ConfigMap, PersistentVolumeClaim, PodTemplateSpec, Probe, ServiceAccount,
45+
TCPSocketAction,
4646
},
4747
},
4848
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
@@ -101,6 +101,10 @@ use crate::{
101101
pdb::add_pdbs,
102102
},
103103
product_logging::extend_config_map_with_log_config,
104+
service::{
105+
build_rolegroup_headless_service, build_rolegroup_metrics_service,
106+
stateful_set_service_name,
107+
},
104108
};
105109

106110
pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster";
@@ -339,6 +343,9 @@ pub enum Error {
339343
ApplyGroupListener {
340344
source: stackable_operator::cluster_resources::Error,
341345
},
346+
347+
#[snafu(display("failed to configure service"))]
348+
ServiceConfiguration { source: crate::service::Error },
342349
}
343350

344351
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -491,12 +498,53 @@ pub async fn reconcile_airflow(
491498
)
492499
.context(InvalidGitSyncSpecSnafu)?;
493500

494-
let rg_service = build_rolegroup_service(airflow, &resolved_product_image, &rolegroup)?;
495-
cluster_resources.add(client, rg_service).await.context(
496-
ApplyRoleGroupServiceSnafu {
501+
let role_group_service_recommended_labels = build_recommended_labels(
502+
airflow,
503+
AIRFLOW_CONTROLLER_NAME,
504+
&resolved_product_image.app_version_label,
505+
&rolegroup.role,
506+
&rolegroup.role_group,
507+
);
508+
509+
let role_group_service_selector = Labels::role_group_selector(
510+
airflow,
511+
APP_NAME,
512+
&rolegroup.role,
513+
&rolegroup.role_group,
514+
)
515+
.context(LabelBuildSnafu)?;
516+
517+
// Only apply headless service for something exposing an HTTP port
518+
if airflow_role.get_http_port().is_some() {
519+
let rg_headless_service = build_rolegroup_headless_service(
520+
airflow,
521+
&rolegroup,
522+
role_group_service_recommended_labels.clone(),
523+
role_group_service_selector.clone().into(),
524+
)
525+
.context(ServiceConfigurationSnafu)?;
526+
527+
cluster_resources
528+
.add(client, rg_headless_service)
529+
.await
530+
.context(ApplyRoleGroupServiceSnafu {
531+
rolegroup: rolegroup.clone(),
532+
})?;
533+
}
534+
535+
let rg_metrics_service = build_rolegroup_metrics_service(
536+
airflow,
537+
&rolegroup,
538+
role_group_service_recommended_labels,
539+
role_group_service_selector.into(),
540+
)
541+
.context(ServiceConfigurationSnafu)?;
542+
cluster_resources
543+
.add(client, rg_metrics_service)
544+
.await
545+
.context(ApplyRoleGroupServiceSnafu {
497546
rolegroup: rolegroup.clone(),
498-
},
499-
)?;
547+
})?;
500548

501549
let rg_statefulset = build_server_rolegroup_statefulset(
502550
airflow,
@@ -767,53 +815,6 @@ fn build_rolegroup_config_map(
767815
})
768816
}
769817

770-
/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
771-
///
772-
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
773-
fn build_rolegroup_service(
774-
airflow: &v1alpha1::AirflowCluster,
775-
resolved_product_image: &ResolvedProductImage,
776-
rolegroup: &RoleGroupRef<v1alpha1::AirflowCluster>,
777-
) -> Result<Service> {
778-
let ports = vec![ServicePort {
779-
name: Some(METRICS_PORT_NAME.into()),
780-
port: METRICS_PORT.into(),
781-
protocol: Some("TCP".to_string()),
782-
..Default::default()
783-
}];
784-
785-
let prometheus_label =
786-
Label::try_from(("prometheus.io/scrape", "true")).context(BuildLabelSnafu)?;
787-
788-
let metadata = build_rolegroup_metadata(
789-
airflow,
790-
&resolved_product_image,
791-
&rolegroup,
792-
prometheus_label,
793-
format!("{name}-metrics", name = rolegroup.object_name()),
794-
)?;
795-
796-
let service_selector_labels =
797-
Labels::role_group_selector(airflow, APP_NAME, &rolegroup.role, &rolegroup.role_group)
798-
.context(BuildLabelSnafu)?;
799-
800-
let service_spec = ServiceSpec {
801-
// Internal communication does not need to be exposed
802-
type_: Some("ClusterIP".to_string()),
803-
cluster_ip: Some("None".to_string()),
804-
ports: Some(ports),
805-
selector: Some(service_selector_labels.into()),
806-
publish_not_ready_addresses: Some(true),
807-
..ServiceSpec::default()
808-
};
809-
810-
Ok(Service {
811-
metadata,
812-
spec: Some(service_spec),
813-
status: None,
814-
})
815-
}
816-
817818
fn build_rolegroup_metadata(
818819
airflow: &v1alpha1::AirflowCluster,
819820
resolved_product_image: &&ResolvedProductImage,
@@ -874,8 +875,6 @@ fn listener_ports() -> Vec<listener::v1alpha1::ListenerPort> {
874875
}
875876

876877
/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
877-
///
878-
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]).
879878
#[allow(clippy::too_many_arguments)]
880879
fn build_server_rolegroup_statefulset(
881880
airflow: &v1alpha1::AirflowCluster,
@@ -1160,10 +1159,7 @@ fn build_server_rolegroup_statefulset(
11601159
match_labels: Some(statefulset_match_labels.into()),
11611160
..LabelSelector::default()
11621161
},
1163-
service_name: Some(format!(
1164-
"{name}-metrics",
1165-
name = rolegroup_ref.object_name()
1166-
)),
1162+
service_name: stateful_set_service_name(airflow_role, rolegroup_ref),
11671163
template: pod_template,
11681164
volume_claim_templates: pvcs,
11691165
..StatefulSetSpec::default()
@@ -1349,7 +1345,6 @@ pub fn error_policy(
13491345
_ => Action::requeue(*Duration::from_secs(10)),
13501346
}
13511347
}
1352-
// I want to add secret volumes right here
13531348

13541349
fn add_authentication_volumes_and_volume_mounts(
13551350
authentication_config: &AirflowClientAuthenticationDetailsResolved,

rust/operator-binary/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod crd;
3737
mod env_vars;
3838
mod operations;
3939
mod product_logging;
40+
mod service;
4041
mod util;
4142

4243
mod built_info {

rust/operator-binary/src/service.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use std::collections::BTreeMap;
2+
3+
use snafu::{ResultExt, Snafu};
4+
use stackable_operator::{
5+
builder::meta::ObjectMetaBuilder,
6+
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
7+
kvp::{Label, ObjectLabels},
8+
role_utils::RoleGroupRef,
9+
};
10+
11+
use crate::crd::{
12+
AirflowRole, HTTP_PORT, HTTP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1,
13+
};
14+
15+
pub const METRICS_SERVICE_SUFFIX: &str = "metrics";
16+
pub const HEADLESS_SERVICE_SUFFIX: &str = "headless";
17+
18+
#[derive(Snafu, Debug)]
19+
pub enum Error {
20+
#[snafu(display("object is missing metadata to build owner reference"))]
21+
ObjectMissingMetadataForOwnerRef {
22+
source: stackable_operator::builder::meta::Error,
23+
},
24+
25+
#[snafu(display("failed to build Metadata"))]
26+
MetadataBuild {
27+
source: stackable_operator::builder::meta::Error,
28+
},
29+
30+
#[snafu(display("failed to build Labels"))]
31+
LabelBuild {
32+
source: stackable_operator::kvp::LabelError,
33+
},
34+
}
35+
36+
/// The rolegroup headless [`Service`] is a service that allows direct access to the instances of a certain rolegroup
37+
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
38+
pub fn build_rolegroup_headless_service(
39+
airflow: &v1alpha1::AirflowCluster,
40+
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
41+
object_labels: ObjectLabels<v1alpha1::AirflowCluster>,
42+
selector: BTreeMap<String, String>,
43+
) -> Result<Service, Error> {
44+
let ports = headless_service_ports();
45+
46+
let metadata = ObjectMetaBuilder::new()
47+
.name_and_namespace(airflow)
48+
.name(rolegroup_headless_service_name(
49+
&rolegroup_ref.object_name(),
50+
))
51+
.ownerreference_from_resource(airflow, None, Some(true))
52+
.context(ObjectMissingMetadataForOwnerRefSnafu)?
53+
.with_recommended_labels(object_labels)
54+
.context(MetadataBuildSnafu)?
55+
.build();
56+
57+
let service_spec = ServiceSpec {
58+
// Internal communication does not need to be exposed
59+
type_: Some("ClusterIP".to_string()),
60+
cluster_ip: Some("None".to_string()),
61+
ports: Some(ports),
62+
selector: Some(selector),
63+
publish_not_ready_addresses: Some(true),
64+
..ServiceSpec::default()
65+
};
66+
67+
Ok(Service {
68+
metadata,
69+
spec: Some(service_spec),
70+
status: None,
71+
})
72+
}
73+
74+
/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label.
75+
pub fn build_rolegroup_metrics_service(
76+
airflow: &v1alpha1::AirflowCluster,
77+
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
78+
object_labels: ObjectLabels<v1alpha1::AirflowCluster>,
79+
selector: BTreeMap<String, String>,
80+
) -> Result<Service, Error> {
81+
let ports = metrics_service_ports();
82+
83+
let prometheus_label =
84+
Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?;
85+
86+
let metadata = ObjectMetaBuilder::new()
87+
.name_and_namespace(airflow)
88+
.name(rolegroup_metrics_service_name(&rolegroup_ref.object_name()))
89+
.ownerreference_from_resource(airflow, None, Some(true))
90+
.context(ObjectMissingMetadataForOwnerRefSnafu)?
91+
.with_recommended_labels(object_labels)
92+
.context(MetadataBuildSnafu)?
93+
.with_label(prometheus_label)
94+
.build();
95+
96+
let service_spec = ServiceSpec {
97+
// Internal communication does not need to be exposed
98+
type_: Some("ClusterIP".to_string()),
99+
cluster_ip: Some("None".to_string()),
100+
ports: Some(ports),
101+
selector: Some(selector),
102+
publish_not_ready_addresses: Some(true),
103+
..ServiceSpec::default()
104+
};
105+
106+
Ok(Service {
107+
metadata,
108+
spec: Some(service_spec),
109+
status: None,
110+
})
111+
}
112+
113+
pub fn stateful_set_service_name(
114+
airflow_role: &AirflowRole,
115+
rolegroup_ref: &RoleGroupRef<v1alpha1::AirflowCluster>,
116+
) -> Option<String> {
117+
match airflow_role {
118+
AirflowRole::Webserver => Some(rolegroup_headless_service_name(
119+
&rolegroup_ref.object_name(),
120+
)),
121+
AirflowRole::Scheduler | AirflowRole::Worker => None,
122+
}
123+
}
124+
125+
/// Returns the metrics rolegroup service name `<cluster>-<role>-<rolegroup>-<METRICS_SERVICE_SUFFIX>`.
126+
// TODO: Replace by operator.rs functions
127+
fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String {
128+
format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}")
129+
}
130+
131+
/// Returns the headless rolegroup service name `<cluster>-<role>-<rolegroup>-<HEADLESS_SERVICE_SUFFIX>`.
132+
// TODO: Replace by operator.rs functions
133+
fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String {
134+
format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}")
135+
}
136+
137+
fn headless_service_ports() -> Vec<ServicePort> {
138+
vec![ServicePort {
139+
name: Some(HTTP_PORT_NAME.to_string()),
140+
port: HTTP_PORT.into(),
141+
protocol: Some("TCP".to_string()),
142+
..ServicePort::default()
143+
}]
144+
}
145+
146+
fn metrics_service_ports() -> Vec<ServicePort> {
147+
vec![ServicePort {
148+
name: Some(METRICS_PORT_NAME.to_string()),
149+
port: METRICS_PORT.into(),
150+
protocol: Some("TCP".to_string()),
151+
..ServicePort::default()
152+
}]
153+
}

0 commit comments

Comments
 (0)