Skip to content

Commit 2bf2ad8

Browse files
authored
fix: Include the bootstrap Kafka service in certificate SANs (#741)
* Include the global Kafka service in certificate SANs * fix clippy lints * changelog * charts * Rename kafka_name to kafka_bootstrap_service_name * changelog * changelog * Use bootstrap_service_name instead of name_any * Fix clippy lint
1 parent cb9d7c6 commit 2bf2ad8

File tree

9 files changed

+57
-25
lines changed

9 files changed

+57
-25
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7+
### Fixed
8+
9+
- Include the global Kafka bootstrap service (not the rolegroup-specific) DNS record as SAN entry in the generated
10+
certificates used by Kafka. This allows you to access Kafka brokers secured using TLS via the global bootstrap
11+
service ([#741]).
12+
13+
[#741]: https://github.com/stackabletech/kafka-operator/pull/741
14+
715
## [24.7.0] - 2024-07-24
816

917
### Added

deploy/helm/kafka-operator/crds/crds.yaml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7296,7 +7296,9 @@ spec:
72967296
72977297
## TLS provider
72987298
7299-
Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`.
7299+
Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs
7300+
7301+
This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`.
73007302
type: string
73017303
required:
73027304
- authenticationClass
@@ -7331,11 +7333,17 @@ spec:
73317333
properties:
73327334
internalSecretClass:
73337335
default: tls
7334-
description: 'The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for internal broker communication. Use mutual verification between brokers (mandatory). This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers - Which ca.crt to use when validating the other brokers Defaults to `tls`'
7336+
description: |-
7337+
The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for internal broker communication. Use mutual verification between brokers (mandatory). This setting controls: - Which cert the brokers should use to authenticate themselves against other brokers - Which ca.crt to use when validating the other brokers
7338+
7339+
Defaults to `tls`
73357340
type: string
73367341
serverSecretClass:
73377342
default: tls
7338-
description: 'The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `tls`.'
7343+
description: |-
7344+
The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client
7345+
7346+
Defaults to `tls`.
73397347
nullable: true
73407348
type: string
73417349
type: object

rust/crd/src/authentication.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct KafkaAuthentication {
4040
/// Only affects client connections. This setting controls:
4141
/// - If clients need to authenticate themselves against the broker via TLS
4242
/// - Which ca.crt to use when validating the provided client certs
43+
///
4344
/// This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`.
4445
pub authentication_class: String,
4546
}

rust/crd/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ pub struct KafkaClusterConfig {
162162
}
163163

164164
impl KafkaCluster {
165-
/// The name of the role-level load-balanced Kubernetes `Service`
166-
pub fn broker_role_service_name(&self) -> Option<String> {
167-
self.metadata.name.clone()
165+
/// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this
166+
/// to get a list of broker addresses and will use those to transmit data to the correct broker.
167+
pub fn bootstrap_service_name(&self) -> String {
168+
self.name_any()
168169
}
169170

170171
/// Metadata about a broker rolegroup

rust/crd/src/listener.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ mod tests {
228228
"#;
229229
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
230230
let kafka_security = KafkaTlsSecurity::new(
231+
&kafka,
231232
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
232233
metadata: ObjectMetaBuilder::new().name("auth-class").build(),
233234
spec: AuthenticationClassSpec {
@@ -294,6 +295,7 @@ mod tests {
294295
"#;
295296
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
296297
let kafka_security = KafkaTlsSecurity::new(
298+
&kafka,
297299
ResolvedAuthenticationClasses::new(vec![]),
298300
"tls".to_string(),
299301
Some("tls".to_string()),
@@ -355,6 +357,7 @@ mod tests {
355357
"#;
356358
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
357359
let kafka_security = KafkaTlsSecurity::new(
360+
&kafka,
358361
ResolvedAuthenticationClasses::new(vec![]),
359362
"".to_string(),
360363
None,

rust/crd/src/security.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ pub enum Error {
4242
}
4343

4444
/// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses
45-
pub struct KafkaTlsSecurity {
45+
pub struct KafkaTlsSecurity<'a> {
46+
kafka: &'a KafkaCluster,
4647
resolved_authentication_classes: ResolvedAuthenticationClasses,
4748
internal_secret_class: String,
4849
server_secret_class: Option<String>,
4950
}
5051

51-
impl KafkaTlsSecurity {
52+
impl<'a> KafkaTlsSecurity<'a> {
5253
// ports
5354
pub const CLIENT_PORT_NAME: &'static str = "kafka";
5455
pub const CLIENT_PORT: u16 = 9092;
@@ -104,12 +105,15 @@ impl KafkaTlsSecurity {
104105
const STACKABLE_TLS_KEYSTORE_INTERNAL_DIR: &'static str = "/stackable/tls_keystore_internal";
105106
const STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME: &'static str = "tls-keystore-internal";
106107

108+
#[cfg(test)]
107109
pub fn new(
110+
kafka: &'a KafkaCluster,
108111
resolved_authentication_classes: ResolvedAuthenticationClasses,
109112
internal_secret_class: String,
110113
server_secret_class: Option<String>,
111114
) -> Self {
112115
Self {
116+
kafka,
113117
resolved_authentication_classes,
114118
internal_secret_class,
115119
server_secret_class,
@@ -120,9 +124,10 @@ impl KafkaTlsSecurity {
120124
/// all provided `AuthenticationClass` references.
121125
pub async fn new_from_kafka_cluster(
122126
client: &Client,
123-
kafka: &KafkaCluster,
127+
kafka: &'a KafkaCluster,
124128
) -> Result<Self, Error> {
125129
Ok(KafkaTlsSecurity {
130+
kafka,
126131
resolved_authentication_classes: ResolvedAuthenticationClasses::from_references(
127132
client,
128133
&kafka.spec.cluster_config.authentication,
@@ -148,6 +153,7 @@ impl KafkaTlsSecurity {
148153
/// Check if TLS encryption is enabled. This could be due to:
149154
/// - A provided server `SecretClass`
150155
/// - A provided client `AuthenticationClass`
156+
///
151157
/// This affects init container commands, Kafka configuration, volume mounts and
152158
/// the Kafka client port
153159
pub fn tls_enabled(&self) -> bool {
@@ -273,6 +279,7 @@ impl KafkaTlsSecurity {
273279
if let Some(tls_server_secret_class) = self.get_tls_secret_class() {
274280
// We have to mount tls pem files for kcat (the mount can be used directly)
275281
pod_builder.add_volume(Self::create_tls_volume(
282+
&self.kafka.bootstrap_service_name(),
276283
Self::STACKABLE_TLS_CERT_SERVER_DIR_NAME,
277284
tls_server_secret_class,
278285
)?);
@@ -282,6 +289,7 @@ impl KafkaTlsSecurity {
282289
);
283290
// Keystores fore the kafka container
284291
pod_builder.add_volume(Self::create_tls_keystore_volume(
292+
&self.kafka.bootstrap_service_name(),
285293
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME,
286294
tls_server_secret_class,
287295
)?);
@@ -293,6 +301,7 @@ impl KafkaTlsSecurity {
293301

294302
if let Some(tls_internal_secret_class) = self.tls_internal_secret_class() {
295303
pod_builder.add_volume(Self::create_tls_keystore_volume(
304+
&self.kafka.bootstrap_service_name(),
296305
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME,
297306
tls_internal_secret_class,
298307
)?);
@@ -426,12 +435,17 @@ impl KafkaTlsSecurity {
426435
}
427436

428437
/// Creates ephemeral volumes to mount the `SecretClass` into the Pods
429-
fn create_tls_volume(volume_name: &str, secret_class_name: &str) -> Result<Volume, Error> {
438+
fn create_tls_volume(
439+
kafka_bootstrap_service_name: &str,
440+
volume_name: &str,
441+
secret_class_name: &str,
442+
) -> Result<Volume, Error> {
430443
Ok(VolumeBuilder::new(volume_name)
431444
.ephemeral(
432445
SecretOperatorVolumeSourceBuilder::new(secret_class_name)
433446
.with_pod_scope()
434447
.with_node_scope()
448+
.with_service_scope(kafka_bootstrap_service_name)
435449
.build()
436450
.context(SecretVolumeBuildSnafu)?,
437451
)
@@ -440,6 +454,7 @@ impl KafkaTlsSecurity {
440454

441455
/// Creates ephemeral volumes to mount the `SecretClass` into the Pods as keystores
442456
fn create_tls_keystore_volume(
457+
kafka_bootstrap_service_name: &str,
443458
volume_name: &str,
444459
secret_class_name: &str,
445460
) -> Result<Volume, Error> {
@@ -448,6 +463,7 @@ impl KafkaTlsSecurity {
448463
SecretOperatorVolumeSourceBuilder::new(secret_class_name)
449464
.with_pod_scope()
450465
.with_node_scope()
466+
.with_service_scope(kafka_bootstrap_service_name)
451467
.with_format(SecretFormat::TlsPkcs12)
452468
.build()
453469
.context(SecretVolumeBuildSnafu)?,

rust/crd/src/tls.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub struct KafkaTls {
1111
/// This setting controls:
1212
/// - Which cert the brokers should use to authenticate themselves against other brokers
1313
/// - Which ca.crt to use when validating the other brokers
14+
///
1415
/// Defaults to `tls`
1516
#[serde(default = "internal_tls_default")]
1617
pub internal_secret_class: String,
1718
/// The [SecretClass](DOCS_BASE_URL_PLACEHOLDER/secret-operator/secretclass.html) to use for
1819
/// client connections. This setting controls:
1920
/// - If TLS encryption is used at all
2021
/// - Which cert the servers should use to authenticate themselves against the client
22+
///
2123
/// Defaults to `tls`.
2224
#[serde(
2325
default = "server_tls_default",

rust/operator-binary/src/discovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub async fn build_discovery_configmaps(
5757
owner: &impl Resource<DynamicType = ()>,
5858
resolved_product_image: &ResolvedProductImage,
5959
client: &stackable_operator::client::Client,
60-
kafka_security: &KafkaTlsSecurity,
60+
kafka_security: &KafkaTlsSecurity<'_>,
6161
svc: &Service,
6262
) -> Result<Vec<ConfigMap>, Error> {
6363
let name = owner.name_unchecked();

rust/operator-binary/src/kafka_controller.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ pub enum Error {
106106
#[snafu(display("object defines no broker role"))]
107107
NoBrokerRole,
108108

109-
#[snafu(display("failed to calculate global service name"))]
110-
GlobalServiceNameNotFound,
111-
112109
#[snafu(display("failed to apply role Service"))]
113110
ApplyRoleService {
114111
source: stackable_operator::cluster_resources::Error,
@@ -324,7 +321,6 @@ impl ReconcilerError for Error {
324321
Error::ObjectHasNoName => None,
325322
Error::ObjectHasNoNamespace => None,
326323
Error::NoBrokerRole => None,
327-
Error::GlobalServiceNameNotFound => None,
328324
Error::ApplyRoleService { .. } => None,
329325
Error::ApplyRoleServiceAccount { .. } => None,
330326
Error::ApplyRoleRoleBinding { .. } => None,
@@ -443,7 +439,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
443439
};
444440

445441
let broker_role_service =
446-
build_broker_role_service(&kafka, &resolved_product_image, &kafka_security)?;
442+
build_bootstrap_service(&kafka, &resolved_product_image, &kafka_security)?;
447443

448444
let broker_role_service = cluster_resources
449445
.add(client, broker_role_service)
@@ -580,21 +576,18 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
580576
Ok(Action::await_change())
581577
}
582578

583-
/// The broker-role service is the primary endpoint that should be used by clients that do not perform internal load balancing,
584-
/// including targets outside of the cluster.
585-
pub fn build_broker_role_service(
579+
/// Kafka clients will use the load-balanced bootstrap service to get a list of broker addresses and will use those to
580+
/// transmit data to the correct broker.
581+
pub fn build_bootstrap_service(
586582
kafka: &KafkaCluster,
587583
resolved_product_image: &ResolvedProductImage,
588584
kafka_security: &KafkaTlsSecurity,
589585
) -> Result<Service> {
590586
let role_name = KafkaRole::Broker.to_string();
591-
let role_svc_name = kafka
592-
.broker_role_service_name()
593-
.context(GlobalServiceNameNotFoundSnafu)?;
594587
Ok(Service {
595588
metadata: ObjectMetaBuilder::new()
596589
.name_and_namespace(kafka)
597-
.name(&role_svc_name)
590+
.name(kafka.bootstrap_service_name())
598591
.ownerreference_from_resource(kafka, None, Some(true))
599592
.context(ObjectMissingMetadataForOwnerRefSnafu)?
600593
.with_recommended_labels(build_recommended_labels(
@@ -717,7 +710,7 @@ fn build_broker_rolegroup_service(
717710
Ok(Service {
718711
metadata: ObjectMetaBuilder::new()
719712
.name_and_namespace(kafka)
720-
.name(&rolegroup.object_name())
713+
.name(rolegroup.object_name())
721714
.ownerreference_from_resource(kafka, None, Some(true))
722715
.context(ObjectMissingMetadataForOwnerRefSnafu)?
723716
.with_recommended_labels(build_recommended_labels(
@@ -1049,7 +1042,7 @@ fn build_broker_rolegroup_statefulset(
10491042
Ok(StatefulSet {
10501043
metadata: ObjectMetaBuilder::new()
10511044
.name_and_namespace(kafka)
1052-
.name(&rolegroup_ref.object_name())
1045+
.name(rolegroup_ref.object_name())
10531046
.ownerreference_from_resource(kafka, None, Some(true))
10541047
.context(ObjectMissingMetadataForOwnerRefSnafu)?
10551048
.with_recommended_labels(build_recommended_labels(

0 commit comments

Comments
 (0)