Skip to content

Commit f23ff3d

Browse files
committed
imports, formatting
1 parent 07878ca commit f23ff3d

File tree

3 files changed

+19
-9
lines changed

3 files changed

+19
-9
lines changed

rust/operator-binary/src/crd/listener.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,17 @@ pub fn pod_fqdn(
258258
))
259259
}
260260

261+
pub fn pod_kcat(
262+
kafka: &v1alpha1::KafkaCluster,
263+
cluster_info: &KubernetesClusterInfo,
264+
) -> Result<String, KafkaListenerError> {
265+
Ok(format!(
266+
"$POD_NAME-listener-broker.{namespace}.svc.{cluster_domain}",
267+
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
268+
cluster_domain = cluster_info.cluster_domain
269+
))
270+
}
271+
261272
#[cfg(test)]
262273
mod tests {
263274
use stackable_operator::{

rust/operator-binary/src/crd/security.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ impl KafkaTlsSecurity {
284284
}
285285

286286
/// Returns the commands for the kcat readiness probe.
287-
pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec<String> {
287+
pub fn kcat_prober_container_commands(&self, pod_kcat: &String) -> Vec<String> {
288288
let mut args = vec![];
289289
let port = self.client_port();
290290

@@ -314,11 +314,11 @@ impl KafkaTlsSecurity {
314314
);
315315
bash_args.push("/stackable/kcat".to_string());
316316
bash_args.push("-b".to_string());
317-
bash_args.push(format!("{pod_fqdn}:{port}"));
317+
bash_args.push(format!("{pod_kcat}:{port}"));
318318
bash_args.extend(Self::kcat_client_sasl_ssl(
319319
Self::STACKABLE_TLS_KCAT_DIR,
320320
service_name,
321-
pod_fqdn,
321+
pod_kcat,
322322
));
323323
bash_args.push("-L".to_string());
324324

@@ -665,7 +665,7 @@ impl KafkaTlsSecurity {
665665
fn kcat_client_sasl_ssl(
666666
cert_directory: &str,
667667
service_name: &str,
668-
pod_fqdn: &String,
668+
pod_kcat: &String,
669669
) -> Vec<String> {
670670
vec![
671671
"-X".to_string(),
@@ -679,7 +679,7 @@ impl KafkaTlsSecurity {
679679
"-X".to_string(),
680680
format!("sasl.kerberos.service.name={service_name}"),
681681
"-X".to_string(),
682-
format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"),
682+
format!("sasl.kerberos.principal={service_name}/{pod_kcat}@$KERBEROS_REALM"),
683683
]
684684
}
685685
}

rust/operator-binary/src/kafka_controller.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
7676
use crate::{
7777
config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args},
7878
crd::{
79-
listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError},
79+
listener::{get_kafka_listener_config, pod_kcat, KafkaListenerError},
8080
security::KafkaTlsSecurity,
8181
v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME,
8282
DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS,
@@ -999,8 +999,7 @@ fn build_broker_rolegroup_statefulset(
999999
.context(AddVolumeMountSnafu)?
10001000
.resources(merged_config.resources.clone().into());
10011001

1002-
let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info)
1003-
.context(ResolveNamespaceSnafu)?;
1002+
let pod_kcat = pod_kcat(kafka, cluster_info).context(ResolveNamespaceSnafu)?;
10041003
// Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of
10051004
// unacceptable perf overhead
10061005
cb_kcat_prober
@@ -1030,7 +1029,7 @@ fn build_broker_rolegroup_statefulset(
10301029
.readiness_probe(Probe {
10311030
exec: Some(ExecAction {
10321031
// If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point
1033-
command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)),
1032+
command: Some(kafka_security.kcat_prober_container_commands(&pod_kcat)),
10341033
}),
10351034
timeout_seconds: Some(5),
10361035
period_seconds: Some(2),

0 commit comments

Comments
 (0)