Skip to content

fix: Readiness probe to use broker listener #833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ All notable changes to this project will be documented in this file.
- Bump Kafka 3.7.1 to 3.7.2 in tests and getting_started, and bump upgrade testing from 3.7.1->3.8.0 to 3.8.0->3.9.0 ([#822]).
- docs: Update supported versions list ([#835]).

### Fixed

- Readiness probe fixed if Kerberos is enabled ([#833]).

[#796]: https://github.com/stackabletech/kafka-operator/pull/796
[#803]: https://github.com/stackabletech/kafka-operator/pull/803
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
Expand All @@ -33,6 +37,7 @@ All notable changes to this project will be documented in this file.
[#819]: https://github.com/stackabletech/kafka-operator/pull/819
[#822]: https://github.com/stackabletech/kafka-operator/pull/822
[#830]: https://github.com/stackabletech/kafka-operator/pull/830
[#833]: https://github.com/stackabletech/kafka-operator/pull/833
[#835]: https://github.com/stackabletech/kafka-operator/pull/835

## [24.11.1] - 2025-01-10
Expand Down
6 changes: 3 additions & 3 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use snafu::{OptionExt, Snafu};
use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo};
use strum::{EnumDiscriminants, EnumString};

use crate::crd::{security::KafkaTlsSecurity, v1alpha1, STACKABLE_LISTENER_BROKER_DIR};
use crate::crd::{
security::KafkaTlsSecurity, v1alpha1, LISTENER_BROKER_VOLUME_NAME,
STACKABLE_LISTENER_BROKER_DIR,
};

const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0";

Expand Down Expand Up @@ -258,6 +261,17 @@ pub fn pod_fqdn(
))
}

pub fn pod_kcat(
kafka: &v1alpha1::KafkaCluster,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Ok(format!(
"$POD_NAME-{LISTENER_BROKER_VOLUME_NAME}.{namespace}.svc.{cluster_domain}",
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
cluster_domain = cluster_info.cluster_domain
))
}

#[cfg(test)]
mod tests {
use stackable_operator::{
Expand Down
29 changes: 19 additions & 10 deletions rust/operator-binary/src/crd/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl KafkaTlsSecurity {
}

/// Returns the commands for the kcat readiness probe.
pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec<String> {
pub fn kcat_prober_container_commands(&self, pod_kcat: &String) -> Vec<String> {
let mut args = vec![];
let port = self.client_port();

Expand All @@ -293,6 +293,7 @@ impl KafkaTlsSecurity {
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR));
args.push("-L".to_string());
} else if self.has_kerberos_enabled() {
let service_name = KafkaRole::Broker.kerberos_service_name();
// here we need to specify a shell so that variable substitution will work
Expand All @@ -302,33 +303,41 @@ impl KafkaTlsSecurity {
args.push("-euo".to_string());
args.push("pipefail".to_string());
args.push("-c".to_string());
args.push(

// the entire command needs to be subject to the -c directive
// to prevent short-circuiting
let mut bash_args = vec![];
bash_args.push(
format!(
"export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});",
STACKABLE_KERBEROS_KRB5_PATH
)
.to_string(),
);
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("{pod_fqdn}:{port}"));
args.extend(Self::kcat_client_sasl_ssl(
bash_args.push("/stackable/kcat".to_string());
bash_args.push("-b".to_string());
bash_args.push(format!("{pod_kcat}:{port}"));
bash_args.extend(Self::kcat_client_sasl_ssl(
Self::STACKABLE_TLS_KCAT_DIR,
service_name,
pod_fqdn,
pod_kcat,
));
bash_args.push("-L".to_string());

args.push(bash_args.join(" "));
} else if self.tls_server_secret_class().is_some() {
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR));
args.push("-L".to_string());
} else {
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.push("-L".to_string());
}

args.push("-L".to_string());
args
}

Expand Down Expand Up @@ -658,7 +667,7 @@ impl KafkaTlsSecurity {
fn kcat_client_sasl_ssl(
cert_directory: &str,
service_name: &str,
pod_fqdn: &String,
pod_kcat: &String,
) -> Vec<String> {
vec![
"-X".to_string(),
Expand All @@ -672,7 +681,7 @@ impl KafkaTlsSecurity {
"-X".to_string(),
format!("sasl.kerberos.service.name={service_name}"),
"-X".to_string(),
format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"),
format!("sasl.kerberos.principal={service_name}/{pod_kcat}@$KERBEROS_REALM"),
]
}
}
7 changes: 3 additions & 4 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use crate::{
config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args},
crd::{
listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError},
listener::{get_kafka_listener_config, pod_kcat, KafkaListenerError},
security::KafkaTlsSecurity,
v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME,
DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS,
Expand Down Expand Up @@ -999,8 +999,7 @@ fn build_broker_rolegroup_statefulset(
.context(AddVolumeMountSnafu)?
.resources(merged_config.resources.clone().into());

let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info)
.context(ResolveNamespaceSnafu)?;
let pod_kcat = pod_kcat(kafka, cluster_info).context(ResolveNamespaceSnafu)?;
// Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of
// unacceptable perf overhead
cb_kcat_prober
Expand Down Expand Up @@ -1030,7 +1029,7 @@ fn build_broker_rolegroup_statefulset(
.readiness_probe(Probe {
exec: Some(ExecAction {
// If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point
command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)),
command: Some(kafka_security.kcat_prober_container_commands(&pod_kcat)),
}),
timeout_seconds: Some(5),
period_seconds: Some(2),
Expand Down