Skip to content

Commit 28e17da

Browse files
committed
chore: Version KafkaCluster
1 parent 3a43ee6 commit 28e17da

File tree

14 files changed

+397
-131
lines changed

14 files changed

+397
-131
lines changed

Cargo.lock

Lines changed: 272 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,22 @@ edition = "2021"
1010
repository = "https://github.com/stackabletech/kafka-operator"
1111

1212
[workspace.dependencies]
13+
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
15+
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
16+
1317
anyhow = "1.0"
1418
built = { version = "0.7", features = ["chrono", "git2"] }
1519
clap = "4.5"
1620
const_format = "0.2"
17-
futures = { version = "0.3" }
21+
futures = "0.3"
1822
indoc = "2.0"
19-
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
2023
rstest = "0.24"
2124
semver = "1.0"
2225
serde = { version = "1.0", features = ["derive"] }
2326
serde_json = "1.0"
2427
serde_yaml = "0.9"
2528
snafu = "0.8"
26-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
2729
strum = { version = "0.26", features = ["derive"] }
2830
tokio = { version = "1.40", features = ["full"] }
2931
tracing = "0.1"

rust/operator-binary/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@ repository.workspace = true
99
publish = false
1010

1111
[dependencies]
12+
stackable-versioned.workspace = true
13+
stackable-operator.workspace = true
14+
product-config.workspace = true
15+
1216
indoc.workspace = true
1317
anyhow.workspace = true
1418
clap.workspace = true
1519
const_format.workspace = true
1620
futures.workspace = true
17-
product-config.workspace = true
1821
serde_json.workspace = true
1922
serde.workspace = true
2023
snafu.workspace = true
21-
stackable-operator.workspace = true
2224
strum.workspace = true
2325
tokio.workspace = true
2426
tracing.workspace = true

rust/operator-binary/src/config/jvm.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ fn is_heap_jvm_argument(jvm_argument: &str) -> bool {
9999
#[cfg(test)]
100100
mod tests {
101101
use super::*;
102-
use crate::crd::{KafkaCluster, KafkaRole};
102+
use crate::crd::{v1alpha1, KafkaRole};
103103

104104
#[test]
105105
fn test_construct_jvm_arguments_defaults() {
@@ -188,7 +188,8 @@ mod tests {
188188
Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
189189
String,
190190
) {
191-
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
191+
let kafka: v1alpha1::KafkaCluster =
192+
serde_yaml::from_str(kafka_cluster).expect("illegal test input");
192193

193194
let kafka_role = KafkaRole::Broker;
194195
let rolegroup_ref = kafka.broker_rolegroup_ref("default");

rust/operator-binary/src/crd/affinity.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod tests {
3333
};
3434

3535
use super::*;
36-
use crate::KafkaCluster;
36+
use crate::crd::v1alpha1;
3737

3838
#[rstest]
3939
#[case(KafkaRole::Broker)]
@@ -54,7 +54,8 @@ mod tests {
5454
replicas: 1
5555
"#;
5656

57-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
57+
let kafka: v1alpha1::KafkaCluster =
58+
serde_yaml::from_str(input).expect("illegal test input");
5859
let merged_config = kafka
5960
.merged_config(&role, &role.rolegroup_ref(&kafka, "default"))
6061
.unwrap();

rust/operator-binary/src/crd/listener.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use snafu::{OptionExt, Snafu};
77
use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo};
88
use strum::{EnumDiscriminants, EnumString};
99

10-
use crate::crd::{security::KafkaTlsSecurity, KafkaCluster, STACKABLE_LISTENER_BROKER_DIR};
10+
use crate::crd::{security::KafkaTlsSecurity, v1alpha1, STACKABLE_LISTENER_BROKER_DIR};
1111

1212
const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0";
1313

@@ -96,7 +96,7 @@ impl Display for KafkaListener {
9696
}
9797

9898
pub fn get_kafka_listener_config(
99-
kafka: &KafkaCluster,
99+
kafka: &v1alpha1::KafkaCluster,
100100
kafka_security: &KafkaTlsSecurity,
101101
object_name: &str,
102102
cluster_info: &KubernetesClusterInfo,
@@ -246,7 +246,7 @@ fn node_port_cmd(directory: &str, port_name: &str) -> String {
246246
}
247247

248248
pub fn pod_fqdn(
249-
kafka: &KafkaCluster,
249+
kafka: &v1alpha1::KafkaCluster,
250250
object_name: &str,
251251
cluster_info: &KubernetesClusterInfo,
252252
) -> Result<String, KafkaListenerError> {
@@ -303,7 +303,8 @@ mod tests {
303303
serverSecretClass: tls
304304
zookeeperConfigMapName: xyz
305305
"#;
306-
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
306+
let kafka: v1alpha1::KafkaCluster =
307+
serde_yaml::from_str(kafka_cluster).expect("illegal test input");
307308
let kafka_security = KafkaTlsSecurity::new(
308309
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
309310
metadata: ObjectMetaBuilder::new().name("auth-class").build(),
@@ -479,7 +480,8 @@ mod tests {
479480
serverSecretClass: tls
480481
zookeeperConfigMapName: xyz
481482
"#;
482-
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
483+
let kafka: v1alpha1::KafkaCluster =
484+
serde_yaml::from_str(kafka_cluster).expect("illegal test input");
483485
let kafka_security = KafkaTlsSecurity::new(
484486
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
485487
metadata: ObjectMetaBuilder::new().name("auth-class").build(),

rust/operator-binary/src/crd/mod.rs

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use stackable_operator::{
3737
time::Duration,
3838
utils::cluster_info::KubernetesClusterInfo,
3939
};
40+
use stackable_versioned::versioned;
4041
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
4142

4243
use crate::crd::{authorization::KafkaAuthorization, tls::KafkaTls};
@@ -76,7 +77,7 @@ pub enum Error {
7677

7778
#[snafu(display("failed to validate config of rolegroup {rolegroup}"))]
7879
RoleGroupValidation {
79-
rolegroup: RoleGroupRef<KafkaCluster>,
80+
rolegroup: RoleGroupRef<v1alpha1::KafkaCluster>,
8081
source: ValidationError,
8182
},
8283

@@ -103,39 +104,41 @@ pub enum Error {
103104
FragmentValidationFailure { source: ValidationError },
104105
}
105106

106-
/// A Kafka cluster stacklet. This resource is managed by the Stackable operator for Apache Kafka.
107-
/// Find more information on how to use it and the resources that the operator generates in the
108-
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/kafka/).
109-
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
110-
#[kube(
111-
group = "kafka.stackable.tech",
112-
version = "v1alpha1",
113-
kind = "KafkaCluster",
114-
plural = "kafkaclusters",
115-
status = "KafkaClusterStatus",
116-
shortname = "kafka",
117-
namespaced,
118-
crates(
119-
kube_core = "stackable_operator::kube::core",
120-
k8s_openapi = "stackable_operator::k8s_openapi",
121-
schemars = "stackable_operator::schemars"
122-
)
123-
)]
124-
#[serde(rename_all = "camelCase")]
125-
pub struct KafkaClusterSpec {
126-
// no doc - docs in ProductImage struct.
127-
pub image: ProductImage,
128-
129-
// no doc - docs in Role struct.
130-
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,
131-
132-
/// Kafka settings that affect all roles and role groups.
133-
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
134-
pub cluster_config: KafkaClusterConfig,
135-
136-
// no doc - docs in ClusterOperation struct.
137-
#[serde(default)]
138-
pub cluster_operation: ClusterOperation,
107+
#[versioned(version(name = "v1alpha1"))]
108+
pub mod versioned {
109+
/// A Kafka cluster stacklet. This resource is managed by the Stackable operator for Apache Kafka.
110+
/// Find more information on how to use it and the resources that the operator generates in the
111+
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/kafka/).
112+
#[versioned(k8s(
113+
group = "kafka.stackable.tech",
114+
kind = "KafkaCluster",
115+
plural = "kafkaclusters",
116+
status = "KafkaClusterStatus",
117+
shortname = "kafka",
118+
namespaced,
119+
crates(
120+
kube_core = "stackable_operator::kube::core",
121+
k8s_openapi = "stackable_operator::k8s_openapi",
122+
schemars = "stackable_operator::schemars"
123+
)
124+
))]
125+
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
126+
#[serde(rename_all = "camelCase")]
127+
pub struct KafkaClusterSpec {
128+
// no doc - docs in ProductImage struct.
129+
pub image: ProductImage,
130+
131+
// no doc - docs in Role struct.
132+
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,
133+
134+
/// Kafka settings that affect all roles and role groups.
135+
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
136+
pub cluster_config: KafkaClusterConfig,
137+
138+
// no doc - docs in ClusterOperation struct.
139+
#[serde(default)]
140+
pub cluster_operation: ClusterOperation,
141+
}
139142
}
140143

141144
#[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)]
@@ -170,18 +173,15 @@ pub struct KafkaClusterConfig {
170173
pub zookeeper_config_map_name: String,
171174
}
172175

173-
impl KafkaCluster {
176+
impl v1alpha1::KafkaCluster {
174177
/// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this
175178
/// to get a list of broker addresses and will use those to transmit data to the correct broker.
176179
pub fn bootstrap_service_name(&self, rolegroup: &RoleGroupRef<Self>) -> String {
177180
format!("{}-bootstrap", rolegroup.object_name())
178181
}
179182

180183
/// Metadata about a broker rolegroup
181-
pub fn broker_rolegroup_ref(
182-
&self,
183-
group_name: impl Into<String>,
184-
) -> RoleGroupRef<KafkaCluster> {
184+
pub fn broker_rolegroup_ref(&self, group_name: impl Into<String>) -> RoleGroupRef<Self> {
185185
RoleGroupRef {
186186
cluster: ObjectRef::from_obj(self),
187187
role: KafkaRole::Broker.to_string(),
@@ -203,7 +203,7 @@ impl KafkaCluster {
203203

204204
pub fn rolegroup(
205205
&self,
206-
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
206+
rolegroup_ref: &RoleGroupRef<Self>,
207207
) -> Result<&RoleGroup<KafkaConfigFragment, JavaCommonConfig>, Error> {
208208
let role_variant =
209209
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
@@ -324,9 +324,9 @@ impl KafkaRole {
324324
/// Metadata about a rolegroup
325325
pub fn rolegroup_ref(
326326
&self,
327-
kafka: &KafkaCluster,
327+
kafka: &v1alpha1::KafkaCluster,
328328
group_name: impl Into<String>,
329-
) -> RoleGroupRef<KafkaCluster> {
329+
) -> RoleGroupRef<v1alpha1::KafkaCluster> {
330330
RoleGroupRef {
331331
cluster: ObjectRef::from_obj(kafka),
332332
role: self.to_string(),
@@ -474,7 +474,7 @@ impl KafkaConfig {
474474
}
475475

476476
impl Configuration for KafkaConfigFragment {
477-
type Configurable = KafkaCluster;
477+
type Configurable = v1alpha1::KafkaCluster;
478478

479479
fn compute_env(
480480
&self,
@@ -528,7 +528,7 @@ pub struct KafkaClusterStatus {
528528
pub conditions: Vec<ClusterCondition>,
529529
}
530530

531-
impl HasStatusCondition for KafkaCluster {
531+
impl HasStatusCondition for v1alpha1::KafkaCluster {
532532
fn conditions(&self) -> Vec<ClusterCondition> {
533533
match &self.status {
534534
Some(status) => status.conditions.clone(),
@@ -541,7 +541,7 @@ impl HasStatusCondition for KafkaCluster {
541541
mod tests {
542542
use super::*;
543543

544-
fn get_server_secret_class(kafka: &KafkaCluster) -> Option<String> {
544+
fn get_server_secret_class(kafka: &v1alpha1::KafkaCluster) -> Option<String> {
545545
kafka
546546
.spec
547547
.cluster_config
@@ -550,7 +550,7 @@ mod tests {
550550
.and_then(|tls| tls.server_secret_class.clone())
551551
}
552552

553-
fn get_internal_secret_class(kafka: &KafkaCluster) -> String {
553+
fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> String {
554554
kafka
555555
.spec
556556
.cluster_config
@@ -574,7 +574,8 @@ mod tests {
574574
clusterConfig:
575575
zookeeperConfigMapName: xyz
576576
"#;
577-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
577+
let kafka: v1alpha1::KafkaCluster =
578+
serde_yaml::from_str(input).expect("illegal test input");
578579
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
579580
assert_eq!(
580581
get_internal_secret_class(&kafka),
@@ -595,7 +596,8 @@ mod tests {
595596
zookeeperConfigMapName: xyz
596597
597598
"#;
598-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
599+
let kafka: v1alpha1::KafkaCluster =
600+
serde_yaml::from_str(input).expect("illegal test input");
599601
assert_eq!(
600602
get_server_secret_class(&kafka).unwrap(),
601603
"simple-kafka-server-tls".to_string()
@@ -618,7 +620,8 @@ mod tests {
618620
serverSecretClass: null
619621
zookeeperConfigMapName: xyz
620622
"#;
621-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
623+
let kafka: v1alpha1::KafkaCluster =
624+
serde_yaml::from_str(input).expect("illegal test input");
622625
assert_eq!(get_server_secret_class(&kafka), None);
623626
assert_eq!(
624627
get_internal_secret_class(&kafka),
@@ -639,7 +642,8 @@ mod tests {
639642
internalSecretClass: simple-kafka-internal-tls
640643
zookeeperConfigMapName: xyz
641644
"#;
642-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
645+
let kafka: v1alpha1::KafkaCluster =
646+
serde_yaml::from_str(input).expect("illegal test input");
643647
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
644648
assert_eq!(
645649
get_internal_secret_class(&kafka),
@@ -660,7 +664,8 @@ mod tests {
660664
clusterConfig:
661665
zookeeperConfigMapName: xyz
662666
"#;
663-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
667+
let kafka: v1alpha1::KafkaCluster =
668+
serde_yaml::from_str(input).expect("illegal test input");
664669
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
665670
assert_eq!(
666671
get_internal_secret_class(&kafka),
@@ -680,7 +685,8 @@ mod tests {
680685
internalSecretClass: simple-kafka-internal-tls
681686
zookeeperConfigMapName: xyz
682687
"#;
683-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
688+
let kafka: v1alpha1::KafkaCluster =
689+
serde_yaml::from_str(input).expect("illegal test input");
684690
assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default());
685691
assert_eq!(
686692
get_internal_secret_class(&kafka),
@@ -700,7 +706,8 @@ mod tests {
700706
serverSecretClass: simple-kafka-server-tls
701707
zookeeperConfigMapName: xyz
702708
"#;
703-
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
709+
let kafka: v1alpha1::KafkaCluster =
710+
serde_yaml::from_str(input).expect("illegal test input");
704711
assert_eq!(
705712
get_server_secret_class(&kafka),
706713
Some("simple-kafka-server-tls".to_string())

rust/operator-binary/src/crd/security.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use stackable_operator::{
3030
use crate::crd::{
3131
authentication::{self, ResolvedAuthenticationClasses},
3232
listener::{self, node_address_cmd, KafkaListenerConfig},
33-
tls, KafkaCluster, KafkaRole, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME,
33+
tls, v1alpha1, KafkaRole, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME,
3434
SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH,
3535
STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, STACKABLE_LOG_DIR,
3636
};
@@ -155,7 +155,7 @@ impl KafkaTlsSecurity {
155155
/// all provided `AuthenticationClass` references.
156156
pub async fn new_from_kafka_cluster(
157157
client: &Client,
158-
kafka: &KafkaCluster,
158+
kafka: &v1alpha1::KafkaCluster,
159159
) -> Result<Self, Error> {
160160
Ok(KafkaTlsSecurity {
161161
resolved_authentication_classes: ResolvedAuthenticationClasses::from_references(

0 commit comments

Comments
 (0)