From 07878ca40720c4f3779f62cc4fa44512aa140b4c Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 4 Mar 2025 10:32:22 +0100 Subject: [PATCH 01/13] wip --- CHANGELOG.md | 4 ++++ rust/operator-binary/src/crd/security.rs | 19 +++++++++++++------ rust/operator-binary/src/kerberos.rs | 2 ++ tests/test-definition.yaml | 6 ------ 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ada6c5b0..172bbb0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,10 @@ All notable changes to this project will be documented in this file. - Default to OCI for image metadata and product image selection ([#810]). - Bump Kafka 3.7.1 to 3.7.2 in tests and getting_started, and bump upgrade testing from 3.7.1->3.8.0 to 3.8.0->3.9.0 ([#822]). +### Fixed + +- Readiness probe fixed if Kerberos is enabled + [#796]: https://github.com/stackabletech/kafka-operator/pull/796 [#803]: https://github.com/stackabletech/kafka-operator/pull/803 [#809]: https://github.com/stackabletech/kafka-operator/pull/809 diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index bf98b082..dcb80bf6 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -293,6 +293,7 @@ impl KafkaTlsSecurity { args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); // here we need to specify a shell so that variable substitution will work @@ -302,33 +303,39 @@ impl KafkaTlsSecurity { args.push("-euo".to_string()); args.push("pipefail".to_string()); args.push("-c".to_string()); - args.push( + + let mut bash_args = vec![]; + bash_args.push( format!( "export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});", STACKABLE_KERBEROS_KRB5_PATH ) .to_string(), ); - args.push("/stackable/kcat".to_string()); - args.push("-b".to_string()); - args.push(format!("{pod_fqdn}:{port}")); - args.extend(Self::kcat_client_sasl_ssl( + bash_args.push("/stackable/kcat".to_string()); + bash_args.push("-b".to_string()); + bash_args.push(format!("{pod_fqdn}:{port}")); + bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, pod_fqdn, )); + bash_args.push("-L".to_string()); + + args.push(bash_args.join(" ")); } else if self.tls_server_secret_class().is_some() { args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + args.push("-L".to_string()); } else { args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); + args.push("-L".to_string()); } - args.push("-L".to_string()); args } diff --git a/rust/operator-binary/src/kerberos.rs b/rust/operator-binary/src/kerberos.rs index fb18b80b..dd395303 100644 --- a/rust/operator-binary/src/kerberos.rs +++ b/rust/operator-binary/src/kerberos.rs @@ -45,6 +45,8 @@ pub fn add_kerberos_pod_config( SecretOperatorVolumeSourceBuilder::new(kerberos_secret_class) .with_listener_volume_scope(LISTENER_BROKER_VOLUME_NAME) .with_listener_volume_scope(LISTENER_BOOTSTRAP_VOLUME_NAME) + // The pod scope is required for the kcat-prober. + .with_pod_scope() .with_kerberos_service_name(role.kerberos_service_name()) .build() .context(KerberosSecretVolumeSnafu)?; diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f3462fa5..8b2269ae 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -6,9 +6,6 @@ dimensions: - name: kafka values: - - 3.7.1 - - 3.7.2 - - 3.8.1 - 3.9.0 # Alternatively, if you want to use a custom image, append a comma and the full image name to the product version # as in the example below. @@ -34,11 +31,9 @@ dimensions: - name: use-client-tls values: - "true" - - "false" - name: use-client-auth-tls values: - "true" - - "false" - name: openshift values: - "false" @@ -47,7 +42,6 @@ dimensions: - 1.21.1 - name: kerberos-realm values: - - "CLUSTER.LOCAL" - "PROD.MYCORP" - name: kerberos-backend values: From f23ff3dece139cc1e46b8922c4d07c9cb130e722 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 5 Mar 2025 09:42:49 +0100 Subject: [PATCH 02/13] imports, formatting --- rust/operator-binary/src/crd/listener.rs | 11 +++++++++++ rust/operator-binary/src/crd/security.rs | 10 +++++----- rust/operator-binary/src/kafka_controller.rs | 7 +++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 38908da9..a2abb935 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -258,6 +258,17 @@ pub fn pod_fqdn( )) } +pub fn pod_kcat( + kafka: &v1alpha1::KafkaCluster, + cluster_info: &KubernetesClusterInfo, +) -> Result { + Ok(format!( + "$POD_NAME-listener-broker.{namespace}.svc.{cluster_domain}", + namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, + cluster_domain = cluster_info.cluster_domain + )) +} + #[cfg(test)] mod tests { use stackable_operator::{ diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index dcb80bf6..7451b6c7 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -284,7 +284,7 @@ impl KafkaTlsSecurity { } /// Returns the commands for the kcat readiness probe. - pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec { + pub fn kcat_prober_container_commands(&self, pod_kcat: &String) -> Vec { let mut args = vec![]; let port = self.client_port(); @@ -314,11 +314,11 @@ impl KafkaTlsSecurity { ); bash_args.push("/stackable/kcat".to_string()); bash_args.push("-b".to_string()); - bash_args.push(format!("{pod_fqdn}:{port}")); + bash_args.push(format!("{pod_kcat}:{port}")); bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, - pod_fqdn, + pod_kcat, )); bash_args.push("-L".to_string()); @@ -665,7 +665,7 @@ impl KafkaTlsSecurity { fn kcat_client_sasl_ssl( cert_directory: &str, service_name: &str, - pod_fqdn: &String, + pod_kcat: &String, ) -> Vec { vec![ "-X".to_string(), @@ -679,7 +679,7 @@ impl KafkaTlsSecurity { "-X".to_string(), format!("sasl.kerberos.service.name={service_name}"), "-X".to_string(), - format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"), + format!("sasl.kerberos.principal={service_name}/{pod_kcat}@$KERBEROS_REALM"), ] } } diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 0374e572..4c670123 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args}, crd::{ - listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError}, + listener::{get_kafka_listener_config, pod_kcat, KafkaListenerError}, security::KafkaTlsSecurity, v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, @@ -999,8 +999,7 @@ fn build_broker_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .resources(merged_config.resources.clone().into()); - let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info) - .context(ResolveNamespaceSnafu)?; + let pod_kcat = pod_kcat(kafka, cluster_info).context(ResolveNamespaceSnafu)?; // Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of // unacceptable perf overhead cb_kcat_prober @@ -1030,7 +1029,7 @@ fn build_broker_rolegroup_statefulset( .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)), + command: Some(kafka_security.kcat_prober_container_commands(&pod_kcat)), }), timeout_seconds: Some(5), period_seconds: Some(2), From 72c71b4a7944b8228bb6a5c785b364e97d198c71 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 5 Mar 2025 10:30:05 +0100 Subject: [PATCH 03/13] wip: provisional fix with pod_kcat instead of pod_fqdn --- rust/operator-binary/src/crd/listener.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index a2abb935..f3a831e2 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -263,6 +263,7 @@ pub fn pod_kcat( cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( + // TODO where does this prefix come from? Is it safe to hard-code it? "$POD_NAME-listener-broker.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain From a4e80e58c4c12af7af3c2a57909c9c647d74ab34 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 5 Mar 2025 11:42:29 +0100 Subject: [PATCH 04/13] wip: use correct suffix --- rust/operator-binary/src/crd/listener.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index f3a831e2..4efe469f 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -7,7 +7,10 @@ use snafu::{OptionExt, Snafu}; use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo}; use strum::{EnumDiscriminants, EnumString}; -use crate::crd::{security::KafkaTlsSecurity, v1alpha1, STACKABLE_LISTENER_BROKER_DIR}; +use crate::crd::{ + security::KafkaTlsSecurity, v1alpha1, LISTENER_BROKER_VOLUME_NAME, + STACKABLE_LISTENER_BROKER_DIR, +}; const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0"; @@ -263,8 +266,7 @@ pub fn pod_kcat( cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( - // TODO where does this prefix come from? Is it safe to hard-code it? - "$POD_NAME-listener-broker.{namespace}.svc.{cluster_domain}", + "$POD_NAME-{LISTENER_BROKER_VOLUME_NAME}.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain )) From 6c4b93e3d51d569521ada6a5d7778b69cbb784c3 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 12 Mar 2025 11:12:49 +0100 Subject: [PATCH 05/13] changelog, restored test defs, added comment --- CHANGELOG.md | 3 ++- rust/operator-binary/src/crd/security.rs | 2 ++ tests/test-definition.yaml | 5 +++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 172bbb0b..b5657f03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ All notable changes to this project will be documented in this file. ### Fixed -- Readiness probe fixed if Kerberos is enabled +- Readiness probe fixed if Kerberos is enabled ([#833]). [#796]: https://github.com/stackabletech/kafka-operator/pull/796 [#803]: https://github.com/stackabletech/kafka-operator/pull/803 @@ -34,6 +34,7 @@ All notable changes to this project will be documented in this file. [#813]: https://github.com/stackabletech/kafka-operator/pull/813 [#819]: https://github.com/stackabletech/kafka-operator/pull/819 [#822]: https://github.com/stackabletech/kafka-operator/pull/822 +[#833]: https://github.com/stackabletech/kafka-operator/pull/833 ## [24.11.1] - 2025-01-10 diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 7451b6c7..8be968fe 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -304,6 +304,8 @@ impl KafkaTlsSecurity { args.push("pipefail".to_string()); args.push("-c".to_string()); + // the entire command needs to be subject to the -euo directive + // to prevent short-circuiting let mut bash_args = vec![]; bash_args.push( format!( diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 8b2269ae..8101d6ac 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -6,6 +6,9 @@ dimensions: - name: kafka values: + - 3.7.1 + - 3.7.2 + - 3.8.1 - 3.9.0 # Alternatively, if you want to use a custom image, append a comma and the full image name to the product version # as in the example below. @@ -31,6 +34,7 @@ dimensions: - name: use-client-tls values: - "true" + - "false" - name: use-client-auth-tls values: - "true" @@ -42,6 +46,7 @@ dimensions: - 1.21.1 - name: kerberos-realm values: + - "CLUSTER.LOCAL" - "PROD.MYCORP" - name: kerberos-backend values: From 67d180e125d58b49a21352b6523a038da504af42 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 12 Mar 2025 11:13:59 +0100 Subject: [PATCH 06/13] restored test defs II --- tests/test-definition.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 8101d6ac..f3462fa5 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -38,6 +38,7 @@ dimensions: - name: use-client-auth-tls values: - "true" + - "false" - name: openshift values: - "false" From f25958f19c10132888238511ae85a0a6a3de8879 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 12 Mar 2025 13:04:46 +0100 Subject: [PATCH 07/13] removed pod_scope --- Cargo.nix | 6 +++--- rust/operator-binary/src/kerberos.rs | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index cc300ed4..343be2e7 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -6030,10 +6030,10 @@ rec { }; "ring" = rec { crateName = "ring"; - version = "0.17.11"; + version = "0.17.13"; edition = "2021"; - links = "ring_core_0_17_11_"; - sha256 = "0wzyhdbf71ndd14kkpyj2a6nvczvli2mndzv2al7r26k4yp4jlys"; + links = "ring_core_0_17_13_"; + sha256 = "1vjhhlmpqqd9lc53ffjj1yk203188n2km27g3myvssm15a1mvb3h"; dependencies = [ { name = "cfg-if"; diff --git a/rust/operator-binary/src/kerberos.rs b/rust/operator-binary/src/kerberos.rs index dd395303..fb18b80b 100644 --- a/rust/operator-binary/src/kerberos.rs +++ b/rust/operator-binary/src/kerberos.rs @@ -45,8 +45,6 @@ pub fn add_kerberos_pod_config( SecretOperatorVolumeSourceBuilder::new(kerberos_secret_class) .with_listener_volume_scope(LISTENER_BROKER_VOLUME_NAME) .with_listener_volume_scope(LISTENER_BOOTSTRAP_VOLUME_NAME) - // The pod scope is required for the kcat-prober. - .with_pod_scope() .with_kerberos_service_name(role.kerberos_service_name()) .build() .context(KerberosSecretVolumeSnafu)?; From 26db30c1f6fdb96de7eb4253bd5113bf3e1c97ab Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 13 Mar 2025 16:17:14 +0100 Subject: [PATCH 08/13] corrected comment --- rust/operator-binary/src/crd/security.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 8be968fe..82befc64 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -304,7 +304,7 @@ impl KafkaTlsSecurity { args.push("pipefail".to_string()); args.push("-c".to_string()); - // the entire command needs to be subject to the -euo directive + // the entire command needs to be subject to the -c directive // to prevent short-circuiting let mut bash_args = vec![]; bash_args.push( From 30cca43d919d4fde957edc1be44e4158bc7dd450 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 17 Mar 2025 17:26:33 +0100 Subject: [PATCH 09/13] port to be be dependent on broker listener class --- rust/operator-binary/src/crd/listener.rs | 18 ++----------- rust/operator-binary/src/crd/security.rs | 28 +++++++++++++------- rust/operator-binary/src/kafka_controller.rs | 15 ++++++++--- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 4efe469f..c27c5d57 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -7,10 +7,7 @@ use snafu::{OptionExt, Snafu}; use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo}; use strum::{EnumDiscriminants, EnumString}; -use crate::crd::{ - security::KafkaTlsSecurity, v1alpha1, LISTENER_BROKER_VOLUME_NAME, - STACKABLE_LISTENER_BROKER_DIR, -}; +use crate::crd::{security::KafkaTlsSecurity, v1alpha1, STACKABLE_LISTENER_BROKER_DIR}; const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0"; @@ -244,7 +241,7 @@ pub fn node_address_cmd(directory: &str) -> String { format!("$(cat {directory}/default-address/address)") } -fn node_port_cmd(directory: &str, port_name: &str) -> String { +pub fn node_port_cmd(directory: &str, port_name: &str) -> String { format!("$(cat {directory}/default-address/ports/{port_name})") } @@ -261,17 +258,6 @@ pub fn pod_fqdn( )) } -pub fn pod_kcat( - kafka: &v1alpha1::KafkaCluster, - cluster_info: &KubernetesClusterInfo, -) -> Result { - Ok(format!( - "$POD_NAME-{LISTENER_BROKER_VOLUME_NAME}.{namespace}.svc.{cluster_domain}", - namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, - cluster_domain = cluster_info.cluster_domain - )) -} - #[cfg(test)] mod tests { use stackable_operator::{ diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 82befc64..9fc8083d 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -27,6 +27,7 @@ use stackable_operator::{ utils::COMMON_BASH_TRAP_FUNCTIONS, }; +use super::listener::node_port_cmd; use crate::crd::{ authentication::{self, ResolvedAuthenticationClasses}, listener::{self, node_address_cmd, KafkaListenerConfig}, @@ -284,7 +285,7 @@ impl KafkaTlsSecurity { } /// Returns the commands for the kcat readiness probe. - pub fn kcat_prober_container_commands(&self, pod_kcat: &String) -> Vec { + pub fn kcat_prober_container_commands(&self, broker_listener_class: &String) -> Vec { let mut args = vec![]; let port = self.client_port(); @@ -296,6 +297,13 @@ impl KafkaTlsSecurity { args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); + let broker_port = match broker_listener_class.as_str() { + // for cluster-internal, kcat will connect using the broker name and the internal port + "cluster-internal" => port.to_string(), + // for other cases we will use the IP address and the externally-mapped TLS port + _ => node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()), + }; + tracing::info!("Port {broker_port}: listener {broker_listener_class}"); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); @@ -314,13 +322,19 @@ impl KafkaTlsSecurity { ) .to_string(), ); + bash_args.push( + format!( + "export POD_BROKER_LISTENER_ADDRESS={};", + node_address_cmd(STACKABLE_LISTENER_BROKER_DIR) + ) + .to_string(), + ); bash_args.push("/stackable/kcat".to_string()); bash_args.push("-b".to_string()); - bash_args.push(format!("{pod_kcat}:{port}")); + bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}")); bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, - pod_kcat, )); bash_args.push("-L".to_string()); @@ -664,11 +678,7 @@ impl KafkaTlsSecurity { ] } - fn kcat_client_sasl_ssl( - cert_directory: &str, - service_name: &str, - pod_kcat: &String, - ) -> Vec { + fn kcat_client_sasl_ssl(cert_directory: &str, service_name: &str) -> Vec { vec![ "-X".to_string(), "security.protocol=SASL_SSL".to_string(), @@ -681,7 +691,7 @@ impl KafkaTlsSecurity { "-X".to_string(), format!("sasl.kerberos.service.name={service_name}"), "-X".to_string(), - format!("sasl.kerberos.principal={service_name}/{pod_kcat}@$KERBEROS_REALM"), + format!("sasl.kerberos.principal={service_name}/$POD_BROKER_LISTENER_ADDRESS@$KERBEROS_REALM"), ] } } diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 4c670123..008e28fe 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args}, crd::{ - listener::{get_kafka_listener_config, pod_kcat, KafkaListenerError}, + listener::{get_kafka_listener_config, KafkaListenerError}, security::KafkaTlsSecurity, v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, @@ -999,7 +999,6 @@ fn build_broker_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .resources(merged_config.resources.clone().into()); - let pod_kcat = pod_kcat(kafka, cluster_info).context(ResolveNamespaceSnafu)?; // Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of // unacceptable perf overhead cb_kcat_prober @@ -1024,12 +1023,22 @@ fn build_broker_rolegroup_statefulset( .with_memory_limit("128Mi") .build(), ) + .add_volume_mount( + LISTENER_BOOTSTRAP_VOLUME_NAME, + STACKABLE_LISTENER_BOOTSTRAP_DIR, + ) + .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_BROKER_VOLUME_NAME, STACKABLE_LISTENER_BROKER_DIR) + .context(AddVolumeMountSnafu)? // Only allow the global load balancing service to send traffic to pods that are members of the quorum // This also acts as a hint to the StatefulSet controller to wait for each pod to enter quorum before taking down the next .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(kafka_security.kcat_prober_container_commands(&pod_kcat)), + command: Some( + kafka_security + .kcat_prober_container_commands(&merged_config.broker_listener_class), + ), }), timeout_seconds: Some(5), period_seconds: Some(2), From 857ac5ab5edded2c3287b315602cea7685c98c13 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 17 Mar 2025 17:49:20 +0100 Subject: [PATCH 10/13] extend test to check all 3 bootstrap listener possibilities --- .../templates/kuttl/kerberos/20-install-kafka.yaml.j2 | 2 ++ tests/test-definition.yaml | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 index fc3a958a..8781de96 100644 --- a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 @@ -49,6 +49,8 @@ commands: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + brokerListenerClass: {{ test_scenario['values']['listener-class'] }} + bootstrapListenerClass: {{ test_scenario['values']['listener-class'] }} roleGroups: default: replicas: 3 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 3da10894..c80e09db 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -56,6 +56,14 @@ dimensions: # Requires manual setup, see create-kerberos-secretclass.yaml # This will *not* respect the kerberos-realm test attribute, but instead use a hard-coded realm # - activeDirectory + - name: broker-listener-class + values: + - "cluster-internal" + - name: bootstrap-listener-class + values: + - "cluster-internal" + - "external-stable" + - "external-unstable" tests: - name: smoke dimensions: @@ -106,6 +114,8 @@ tests: - kerberos-realm - kerberos-backend - openshift + - broker-listener-class + - bootstrap-listener-class suites: - name: nightly From 2cbe4a75f5e286ac7b4d71b11f7145471edad7c4 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy <1712947+adwk67@users.noreply.github.com> Date: Thu, 20 Mar 2025 13:26:31 +0100 Subject: [PATCH 11/13] Update tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 Co-authored-by: Siegfried Weber --- tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 index 8781de96..a03d03ed 100644 --- a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 @@ -49,8 +49,8 @@ commands: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} - brokerListenerClass: {{ test_scenario['values']['listener-class'] }} - bootstrapListenerClass: {{ test_scenario['values']['listener-class'] }} + brokerListenerClass: {{ test_scenario['values']['broker-listener-class'] }} + bootstrapListenerClass: {{ test_scenario['values']['bootstrap-listener-class'] }} roleGroups: default: replicas: 3 From 0cd08a1befbc79f40c59fb3f63ed70b7243e5524 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 20 Mar 2025 13:50:26 +0100 Subject: [PATCH 12/13] review feedback --- rust/operator-binary/src/crd/security.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 9fc8083d..7f6fccec 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -297,13 +297,10 @@ impl KafkaTlsSecurity { args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); - let broker_port = match broker_listener_class.as_str() { - // for cluster-internal, kcat will connect using the broker name and the internal port - "cluster-internal" => port.to_string(), - // for other cases we will use the IP address and the externally-mapped TLS port - _ => node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()), - }; - tracing::info!("Port {broker_port}: listener {broker_listener_class}"); + let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()); + tracing::debug!( + "Port for listener class {broker_listener_class} is specified by {broker_port}" + ); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); From a7568ec49dc40ef7ba806aa43eb7a043e269fe91 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 20 Mar 2025 14:21:35 +0100 Subject: [PATCH 13/13] removed unecessary log statement --- rust/operator-binary/src/crd/security.rs | 5 +---- rust/operator-binary/src/kafka_controller.rs | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 7f6fccec..bff15f18 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -285,7 +285,7 @@ impl KafkaTlsSecurity { } /// Returns the commands for the kcat readiness probe. - pub fn kcat_prober_container_commands(&self, broker_listener_class: &String) -> Vec { + pub fn kcat_prober_container_commands(&self) -> Vec { let mut args = vec![]; let port = self.client_port(); @@ -298,9 +298,6 @@ impl KafkaTlsSecurity { } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()); - tracing::debug!( - "Port for listener class {broker_listener_class} is specified by {broker_port}" - ); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 008e28fe..bdd94137 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -1035,10 +1035,7 @@ fn build_broker_rolegroup_statefulset( .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some( - kafka_security - .kcat_prober_container_commands(&merged_config.broker_listener_class), - ), + command: Some(kafka_security.kcat_prober_container_commands()), }), timeout_seconds: Some(5), period_seconds: Some(2),