diff --git a/Cargo.lock b/Cargo.lock index ff58a6d3..10a515e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.8.0" @@ -368,6 +374,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -624,6 +639,15 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -780,7 +804,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags", + "bitflags 2.8.0", "libc", "libgit2-sys", "log", @@ -793,6 +817,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.2" @@ -1171,7 +1205,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1201,6 +1235,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -1237,18 +1280,45 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" +dependencies = [ + "jsonptr 0.4.7", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json-patch" version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "863726d7afb6bc2590eeff7135d923545e5e964f004c2ccf8716c25e70a86f08" dependencies = [ - "jsonptr", + "jsonptr 0.6.3", "serde", "serde_json", "thiserror 1.0.69", ] +[[package]] +name = "jsonpath-rust" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d8fe85bd70ff715f31ce8c739194b423d79811a19602115d611a3ec85d6200" +dependencies = [ + "lazy_static", + "once_cell", + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "jsonpath-rust" version = "0.7.5" @@ -1262,6 +1332,17 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "jsonptr" version = "0.6.3" @@ -1272,6 +1353,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "k8s-openapi" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" +dependencies = [ + "base64 0.22.1", + "chrono", + "schemars", + "serde", + "serde-value", + "serde_json", +] + [[package]] name = "k8s-openapi" version = "0.24.0" @@ -1286,17 +1381,78 @@ dependencies = [ "serde_json", ] +[[package]] +name = "k8s-version" +version = "0.1.2" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "darling", + "regex", + "snafu 0.8.5", +] + +[[package]] +name = "kube" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efffeb3df0bd4ef3e5d65044573499c0e4889b988070b08c50b25b1329289a1f" +dependencies = [ + "k8s-openapi 0.23.0", + "kube-client 0.96.0", + "kube-core 0.96.0", + "kube-derive 0.96.0", + "kube-runtime 0.96.0", +] + [[package]] name = "kube" version = "0.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32053dc495efad4d188c7b33cc7c02ef4a6e43038115348348876efd39a53cba" dependencies = [ - "k8s-openapi", - "kube-client", - "kube-core", - "kube-derive", - "kube-runtime", + "k8s-openapi 0.24.0", + "kube-client 0.98.0", + "kube-core 0.98.0", + "kube-derive 0.98.0", + "kube-runtime 0.98.0", +] + +[[package]] +name = "kube-client" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf471ece8ff8d24735ce78dac4d091e9fcb8d74811aeb6b75de4d1c3f5de0f1" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures 0.3.31", + "home", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust 0.5.1", + "k8s-openapi 0.23.0", + "kube-core 0.96.0", + "pem", + "rustls", + "rustls-pemfile", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", ] [[package]] @@ -1319,9 +1475,9 @@ dependencies = [ "hyper-rustls", "hyper-timeout", "hyper-util", - "jsonpath-rust", - "k8s-openapi", - "kube-core", + "jsonpath-rust 0.7.5", + "k8s-openapi 0.24.0", + "kube-core 0.98.0", "pem", "rustls", "rustls-pemfile", @@ -1337,6 +1493,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "kube-core" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f42346d30bb34d1d7adc5c549b691bce7aa3a1e60254e68fab7e2d7b26fe3d77" +dependencies = [ + "chrono", + "form_urlencoded", + "http", + "json-patch 2.0.0", + "k8s-openapi 0.23.0", + "schemars", + "serde", + "serde-value", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "kube-core" version = "0.98.0" @@ -1346,8 +1520,8 @@ dependencies = [ "chrono", "form_urlencoded", "http", - "json-patch", - "k8s-openapi", + "json-patch 3.0.1", + "k8s-openapi 0.24.0", "schemars", "serde", "serde-value", @@ -1355,6 +1529,19 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "kube-derive" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9364e04cc5e0482136c6ee8b7fb7551812da25802249f35b3def7aaa31e82ad" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.96", +] + [[package]] name = "kube-derive" version = "0.98.0" @@ -1368,6 +1555,34 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "kube-runtime" +version = "0.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fbf1f6ffa98e65f1d2a9a69338bb60605d46be7edf00237784b89e62c9bd44" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "async-trait", + "backoff", + "educe", + "futures 0.3.31", + "hashbrown 0.14.5", + "json-patch 2.0.0", + "jsonptr 0.4.7", + "k8s-openapi 0.23.0", + "kube-client 0.96.0", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "kube-runtime" version = "0.98.0" @@ -1381,12 +1596,12 @@ dependencies = [ "backoff", "educe", "futures 0.3.31", - "hashbrown", + "hashbrown 0.15.2", "hostname", - "json-patch", - "jsonptr", - "k8s-openapi", - "kube-client", + "json-patch 3.0.1", + "jsonptr 0.6.3", + "k8s-openapi 0.24.0", + "kube-client 0.98.0", "parking_lot", "pin-project", "serde", @@ -1856,7 +2071,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags", + "bitflags 2.8.0", ] [[package]] @@ -2102,7 +2317,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -2115,7 +2330,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -2329,23 +2544,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "stackable-hdfs-crd" -version = "0.0.0-dev" -dependencies = [ - "futures 0.3.31", - "product-config", - "rstest", - "semver", - "serde", - "serde_json", - "serde_yaml", - "snafu 0.8.5", - "stackable-operator", - "strum", - "tracing", -] - [[package]] name = "stackable-hdfs-operator" version = "0.0.0-dev" @@ -2362,8 +2560,8 @@ dependencies = [ "serde_json", "serde_yaml", "snafu 0.8.5", - "stackable-hdfs-crd", "stackable-operator", + "stackable-versioned", "strum", "tokio", "tracing", @@ -2384,9 +2582,9 @@ dependencies = [ "either", "futures 0.3.31", "indexmap", - "json-patch", - "k8s-openapi", - "kube", + "json-patch 3.0.1", + "k8s-openapi 0.24.0", + "kube 0.98.0", "opentelemetry-jaeger", "opentelemetry_sdk", "product-config", @@ -2424,13 +2622,37 @@ name = "stackable-shared" version = "0.0.1" source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.85.0#59506c6202778889a27b6ae8153457e60a49c68d" dependencies = [ - "kube", + "kube 0.98.0", "semver", "serde", "serde_yaml", "snafu 0.8.5", ] +[[package]] +name = "stackable-versioned" +version = "0.5.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "stackable-versioned-macros", +] + +[[package]] +name = "stackable-versioned-macros" +version = "0.5.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.5.0#048c7d8befddc2f2c6414444006871c95412d67c" +dependencies = [ + "convert_case", + "darling", + "itertools", + "k8s-openapi 0.23.0", + "k8s-version", + "kube 0.96.0", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2722,7 +2944,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ "base64 0.22.1", - "bitflags", + "bitflags 2.8.0", "bytes", "http", "http-body", @@ -2886,6 +3108,12 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 84c76fd0..8f8cfb29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["rust/crd", "rust/operator-binary"] +members = ["rust/operator-binary"] resolver = "2" [workspace.package] @@ -10,6 +10,10 @@ edition = "2021" repository = "https://github.com/stackabletech/hdfs-operator" [workspace.dependencies] +stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" } +product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } + anyhow = "1.0" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" @@ -22,8 +26,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" snafu = "0.8" -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" } -product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } strum = { version = "0.26", features = ["derive"] } tokio = { version = "1.40", features = ["full"] } tracing = "0.1" diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml deleted file mode 100644 index 618ff59e..00000000 --- a/rust/crd/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "stackable-hdfs-crd" -description = "Contains the Apache Hadoop HDFS CRD structs and utilities" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true -repository.workspace = true -publish = false - -[dependencies] -semver.workspace = true -serde.workspace = true -serde_json.workspace = true -snafu.workspace = true -stackable-operator.workspace = true -product-config.workspace = true -strum.workspace = true -tracing.workspace = true -futures.workspace = true - -[dev-dependencies] -serde_yaml.workspace = true -rstest.workspace = true diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 574641b2..1a22b13f 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -9,18 +9,18 @@ repository.workspace = true publish = false [dependencies] -stackable-hdfs-crd = { path = "../crd" } +stackable-versioned.workspace = true +stackable-operator.workspace = true +product-config.workspace = true anyhow.workspace = true clap.workspace = true const_format.workspace = true futures.workspace = true indoc.workspace = true -product-config.workspace = true serde_json.workspace = true serde.workspace = true snafu.workspace = true -stackable-operator.workspace = true strum.workspace = true tokio.workspace = true tracing-futures.workspace = true diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 6fa36603..e66206ae 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -1,12 +1,14 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{constants::JVM_SECURITY_PROPERTIES_FILE, HdfsCluster, HdfsRole}; use stackable_operator::{ k8s_openapi::api::core::v1::ResourceRequirements, memory::{BinaryMultiple, MemoryQuantity}, role_utils::JvmArgumentOverrides, }; -use crate::security::kerberos::KERBEROS_CONTAINER_PATH; +use crate::{ + crd::{constants::JVM_SECURITY_PROPERTIES_FILE, v1alpha1, HdfsNodeRole}, + security::kerberos::KERBEROS_CONTAINER_PATH, +}; const JVM_HEAP_FACTOR: f32 = 0.8; @@ -19,7 +21,7 @@ pub enum Error { }, #[snafu(display("failed to merge jvm argument overrides"))] - MergeJvmArgumentOverrides { source: stackable_hdfs_crd::Error }, + MergeJvmArgumentOverrides { source: crate::crd::Error }, } // All init or sidecar containers must have access to the following settings. @@ -48,8 +50,8 @@ pub fn construct_global_jvm_args(kerberos_enabled: bool) -> String { } pub fn construct_role_specific_jvm_args( - hdfs: &HdfsCluster, - hdfs_role: &HdfsRole, + hdfs: &v1alpha1::HdfsCluster, + hdfs_role: &HdfsNodeRole, role_group: &str, kerberos_enabled: bool, resources: Option<&ResourceRequirements>, @@ -97,10 +99,9 @@ pub fn construct_role_specific_jvm_args( #[cfg(test)] mod tests { - use stackable_hdfs_crd::{constants::DEFAULT_NAME_NODE_METRICS_PORT, HdfsCluster}; use super::*; - use crate::container::ContainerConfig; + use crate::{container::ContainerConfig, crd::constants::DEFAULT_NAME_NODE_METRICS_PORT}; #[test] fn test_global_jvm_args() { @@ -190,9 +191,10 @@ mod tests { } fn construct_test_role_specific_jvm_args(hdfs_cluster: &str, kerberos_enabled: bool) -> String { - let hdfs: HdfsCluster = serde_yaml::from_str(hdfs_cluster).expect("illegal test input"); + let hdfs: v1alpha1::HdfsCluster = + serde_yaml::from_str(hdfs_cluster).expect("illegal test input"); - let role = HdfsRole::NameNode; + let role = HdfsNodeRole::Name; let merged_config = role.merged_config(&hdfs, "default").unwrap(); let container_config = ContainerConfig::from(role); let resources = container_config.resources(&merged_config); diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 2d59d60e..8d06dfef 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use product_config::writer::to_hadoop_xml; -use stackable_hdfs_crd::{ +use stackable_operator::utils::cluster_info::KubernetesClusterInfo; + +use crate::crd::{ constants::{ DEFAULT_JOURNAL_NODE_RPC_PORT, DEFAULT_NAME_NODE_HTTPS_PORT, DEFAULT_NAME_NODE_HTTP_PORT, DEFAULT_NAME_NODE_RPC_PORT, DFS_DATANODE_DATA_DIR, DFS_HA_NAMENODES, @@ -12,9 +14,8 @@ use stackable_hdfs_crd::{ SERVICE_PORT_NAME_HTTP, SERVICE_PORT_NAME_HTTPS, SERVICE_PORT_NAME_RPC, }, storage::{DataNodeStorageConfig, DataNodeStorageConfigInnerType}, - HdfsCluster, HdfsPodRef, + v1alpha1, HdfsPodRef, }; -use stackable_operator::utils::cluster_info::KubernetesClusterInfo; pub mod jvm; @@ -162,7 +163,7 @@ impl HdfsSiteConfigBuilder { pub fn dfs_namenode_http_address_ha( &mut self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, namenode_podrefs: &[HdfsPodRef], ) -> &mut Self { diff --git a/rust/operator-binary/src/container.rs b/rust/operator-binary/src/container.rs index 929ef118..d0aff95c 100644 --- a/rust/operator-binary/src/container.rs +++ b/rust/operator-binary/src/container.rs @@ -13,20 +13,6 @@ use std::{collections::BTreeMap, str::FromStr}; use indoc::formatdoc; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_hdfs_crd::{ - constants::{ - DEFAULT_DATA_NODE_METRICS_PORT, DEFAULT_JOURNAL_NODE_METRICS_PORT, - DEFAULT_NAME_NODE_METRICS_PORT, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, - LIVENESS_PROBE_FAILURE_THRESHOLD, LIVENESS_PROBE_INITIAL_DELAY_SECONDS, - LIVENESS_PROBE_PERIOD_SECONDS, NAMENODE_ROOT_DATA_DIR, READINESS_PROBE_FAILURE_THRESHOLD, - READINESS_PROBE_INITIAL_DELAY_SECONDS, READINESS_PROBE_PERIOD_SECONDS, - SERVICE_PORT_NAME_HTTP, SERVICE_PORT_NAME_HTTPS, SERVICE_PORT_NAME_IPC, - SERVICE_PORT_NAME_RPC, STACKABLE_ROOT_DATA_DIR, - }, - storage::DataNodeStorageConfig, - AnyNodeConfig, DataNodeContainer, HdfsCluster, HdfsPodRef, HdfsRole, NameNodeContainer, - UpgradeState, -}; use stackable_operator::{ builder::{ self, @@ -71,6 +57,21 @@ use crate::{ self, jvm::{construct_global_jvm_args, construct_role_specific_jvm_args}, }, + crd::{ + constants::{ + DATANODE_ROOT_DATA_DIR_PREFIX, DEFAULT_DATA_NODE_METRICS_PORT, + DEFAULT_JOURNAL_NODE_METRICS_PORT, DEFAULT_NAME_NODE_METRICS_PORT, LISTENER_VOLUME_DIR, + LISTENER_VOLUME_NAME, LIVENESS_PROBE_FAILURE_THRESHOLD, + LIVENESS_PROBE_INITIAL_DELAY_SECONDS, LIVENESS_PROBE_PERIOD_SECONDS, LOG4J_PROPERTIES, + NAMENODE_ROOT_DATA_DIR, READINESS_PROBE_FAILURE_THRESHOLD, + READINESS_PROBE_INITIAL_DELAY_SECONDS, READINESS_PROBE_PERIOD_SECONDS, + SERVICE_PORT_NAME_HTTP, SERVICE_PORT_NAME_HTTPS, SERVICE_PORT_NAME_IPC, + SERVICE_PORT_NAME_RPC, STACKABLE_ROOT_DATA_DIR, + }, + storage::DataNodeStorageConfig, + v1alpha1, AnyNodeConfig, DataNodeContainer, HdfsNodeRole, HdfsPodRef, NameNodeContainer, + UpgradeState, + }, product_logging::{ FORMAT_NAMENODES_LOG4J_CONFIG_FILE, FORMAT_ZOOKEEPER_LOG4J_CONFIG_FILE, HDFS_LOG4J_CONFIG_FILE, MAX_FORMAT_NAMENODE_LOG_FILE_SIZE, @@ -79,7 +80,6 @@ use crate::{ WAIT_FOR_NAMENODES_LOG4J_CONFIG_FILE, ZKFC_LOG4J_CONFIG_FILE, }, security::kerberos::KERBEROS_CONTAINER_PATH, - DATANODE_ROOT_DATA_DIR_PREFIX, LOG4J_PROPERTIES, }; pub(crate) const TLS_STORE_DIR: &str = "/stackable/tls"; @@ -150,7 +150,7 @@ pub enum Error { pub enum ContainerConfig { Hdfs { /// HDFS role (name-, data-, journal-node) which will be the container_name. - role: HdfsRole, + role: HdfsNodeRole, /// The container name derived from the provided role. container_name: String, /// Volume mounts for config and logging. @@ -212,9 +212,9 @@ impl ContainerConfig { #[allow(clippy::too_many_arguments)] pub fn add_containers_and_volumes( pb: &mut PodBuilder, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, - role: &HdfsRole, + role: &HdfsNodeRole, role_group: &str, resolved_product_image: &ResolvedProductImage, merged_config: &AnyNodeConfig, @@ -305,7 +305,7 @@ impl ContainerConfig { // role specific pod settings configured here match role { - HdfsRole::NameNode => { + HdfsNodeRole::Name => { // Zookeeper fail over container let zkfc_container_config = Self::try_from(NameNodeContainer::Zkfc.to_string())?; pb.add_volumes(zkfc_container_config.volumes( @@ -370,7 +370,7 @@ impl ContainerConfig { labels, )?); } - HdfsRole::DataNode => { + HdfsNodeRole::Data => { // Wait for namenode init container let wait_for_namenodes_container_config = Self::try_from(DataNodeContainer::WaitForNameNodes.to_string())?; @@ -393,7 +393,7 @@ impl ContainerConfig { labels, )?); } - HdfsRole::JournalNode => {} + HdfsNodeRole::Journal => {} } Ok(()) @@ -404,7 +404,7 @@ impl ContainerConfig { labels: &Labels, ) -> Result> { match merged_config { - AnyNodeConfig::NameNode(node) => { + AnyNodeConfig::Name(node) => { let listener = ListenerOperatorVolumeSourceBuilder::new( &ListenerReference::ListenerClass(node.listener_class.to_string()), labels, @@ -432,11 +432,11 @@ impl ContainerConfig { Ok(pvcs) } - AnyNodeConfig::JournalNode(node) => Ok(vec![node.resources.storage.data.build_pvc( + AnyNodeConfig::Journal(node) => Ok(vec![node.resources.storage.data.build_pvc( ContainerConfig::DATA_VOLUME_MOUNT_NAME, Some(vec!["ReadWriteOnce"]), )]), - AnyNodeConfig::DataNode(node) => Ok(DataNodeStorageConfig { + AnyNodeConfig::Data(node) => Ok(DataNodeStorageConfig { pvcs: node.resources.storage.clone(), } .build_pvcs()), @@ -451,9 +451,9 @@ impl ContainerConfig { #[allow(clippy::too_many_arguments)] fn main_container( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, - role: &HdfsRole, + role: &HdfsNodeRole, role_group: &str, resolved_product_image: &ResolvedProductImage, zookeeper_config_map_name: &str, @@ -512,9 +512,9 @@ impl ContainerConfig { #[allow(clippy::too_many_arguments)] fn init_container( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, - role: &HdfsRole, + role: &HdfsNodeRole, role_group: &str, resolved_product_image: &ResolvedProductImage, zookeeper_config_map_name: &str, @@ -585,9 +585,9 @@ impl ContainerConfig { /// Returns the container command arguments. fn args( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, - role: &HdfsRole, + role: &HdfsNodeRole, merged_config: &AnyNodeConfig, namenode_podrefs: &[HdfsPodRef], ) -> Result, Error> { @@ -601,7 +601,7 @@ impl ContainerConfig { } let upgrade_args = if hdfs.upgrade_state().ok() == Some(Some(UpgradeState::Upgrading)) - && *role == HdfsRole::NameNode + && *role == HdfsNodeRole::Name { "-rollingUpgrade started" } else { @@ -809,8 +809,8 @@ wait_for_termination $! /// Needs the KERBEROS_REALM env var, which will be written with `export_kerberos_real_env_var_command` /// Needs the POD_NAME env var to be present, which will be provided by the PodSpec fn get_kerberos_ticket( - hdfs: &HdfsCluster, - role: &HdfsRole, + hdfs: &v1alpha1::HdfsCluster, + role: &HdfsNodeRole, cluster_info: &KubernetesClusterInfo, ) -> Result { let principal = format!( @@ -839,7 +839,7 @@ wait_for_termination $! /// Returns the container env variables. fn env( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, role_group: &str, zookeeper_config_map_name: &str, env_overrides: Option<&BTreeMap>, @@ -945,9 +945,9 @@ wait_for_termination $! | ContainerConfig::FormatNameNodes { .. } | ContainerConfig::FormatZooKeeper { .. } | ContainerConfig::WaitForNameNodes { .. } => match merged_config { - AnyNodeConfig::NameNode(node) => Some(node.resources.clone().into()), - AnyNodeConfig::DataNode(node) => Some(node.resources.clone().into()), - AnyNodeConfig::JournalNode(node) => Some(node.resources.clone().into()), + AnyNodeConfig::Name(node) => Some(node.resources.clone().into()), + AnyNodeConfig::Data(node) => Some(node.resources.clone().into()), + AnyNodeConfig::Journal(node) => Some(node.resources.clone().into()), }, } } @@ -955,7 +955,7 @@ wait_for_termination $! /// Creates a probe for the web UI port fn web_ui_port_probe( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, period_seconds: i32, initial_delay_seconds: i32, failure_threshold: i32, @@ -1021,7 +1021,7 @@ wait_for_termination $! let mut volumes = vec![]; if let ContainerConfig::Hdfs { .. } = self { - if let AnyNodeConfig::DataNode(node) = merged_config { + if let AnyNodeConfig::Data(node) = merged_config { volumes.push( VolumeBuilder::new(LISTENER_VOLUME_NAME) .ephemeral( @@ -1086,7 +1086,7 @@ wait_for_termination $! /// Returns the container volume mounts. fn volume_mounts( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, merged_config: &AnyNodeConfig, labels: &Labels, ) -> Result> { @@ -1126,7 +1126,7 @@ wait_for_termination $! } ContainerConfig::Hdfs { role, .. } => { // JournalNode doesn't use listeners, since it's only used internally by the namenodes - if let HdfsRole::NameNode | HdfsRole::DataNode = role { + if let HdfsNodeRole::Name | HdfsNodeRole::Data = role { volume_mounts.push( VolumeMountBuilder::new(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR).build(), ); @@ -1134,7 +1134,7 @@ wait_for_termination $! // Add data volume match role { - HdfsRole::NameNode | HdfsRole::JournalNode => { + HdfsNodeRole::Name | HdfsNodeRole::Journal => { volume_mounts.push( VolumeMountBuilder::new( Self::DATA_VOLUME_MOUNT_NAME, @@ -1143,7 +1143,7 @@ wait_for_termination $! .build(), ); } - HdfsRole::DataNode => { + HdfsNodeRole::Data => { for pvc in Self::volume_claim_templates(merged_config, labels)? { let pvc_name = pvc.name_any(); volume_mounts.push(VolumeMount { @@ -1210,7 +1210,7 @@ wait_for_termination $! /// Build HADOOP_{*node}_OPTS for each namenode, datanodes and journalnodes. fn build_hadoop_opts( &self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, role_group: &str, resources: Option<&ResourceRequirements>, ) -> Result { @@ -1238,7 +1238,7 @@ wait_for_termination $! } /// Container ports for the main containers namenode, datanode and journalnode. - fn container_ports(&self, hdfs: &HdfsCluster) -> Vec { + fn container_ports(&self, hdfs: &v1alpha1::HdfsCluster) -> Vec { match self { ContainerConfig::Hdfs { role, .. } => hdfs .ports(role) @@ -1358,10 +1358,10 @@ wait_for_termination $! } } -impl From for ContainerConfig { - fn from(role: HdfsRole) -> Self { +impl From for ContainerConfig { + fn from(role: HdfsNodeRole) -> Self { match role { - HdfsRole::NameNode => Self::Hdfs { + HdfsNodeRole::Name => Self::Hdfs { role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), @@ -1370,7 +1370,7 @@ impl From for ContainerConfig { web_ui_https_port_name: SERVICE_PORT_NAME_HTTPS, metrics_port: DEFAULT_NAME_NODE_METRICS_PORT, }, - HdfsRole::DataNode => Self::Hdfs { + HdfsNodeRole::Data => Self::Hdfs { role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), @@ -1379,7 +1379,7 @@ impl From for ContainerConfig { web_ui_https_port_name: SERVICE_PORT_NAME_HTTPS, metrics_port: DEFAULT_DATA_NODE_METRICS_PORT, }, - HdfsRole::JournalNode => Self::Hdfs { + HdfsNodeRole::Journal => Self::Hdfs { role, container_name: role.to_string(), volume_mounts: ContainerVolumeDirs::from(role), @@ -1396,7 +1396,7 @@ impl TryFrom for ContainerConfig { type Error = Error; fn try_from(container_name: String) -> Result { - match HdfsRole::from_str(container_name.as_str()) { + match HdfsNodeRole::from_str(container_name.as_str()) { Ok(role) => Ok(ContainerConfig::from(role)), // No hadoop main process container Err(_) => match container_name { @@ -1469,8 +1469,8 @@ impl ContainerVolumeDirs { } } -impl From for ContainerVolumeDirs { - fn from(role: HdfsRole) -> Self { +impl From for ContainerVolumeDirs { + fn from(role: HdfsNodeRole) -> Self { ContainerVolumeDirs { final_config_dir: format!("{base}/{role}", base = Self::NODE_BASE_CONFIG_DIR), config_mount: format!("{base}/{role}", base = Self::NODE_BASE_CONFIG_DIR_MOUNT), @@ -1481,8 +1481,8 @@ impl From for ContainerVolumeDirs { } } -impl From<&HdfsRole> for ContainerVolumeDirs { - fn from(role: &HdfsRole) -> Self { +impl From<&HdfsNodeRole> for ContainerVolumeDirs { + fn from(role: &HdfsNodeRole) -> Self { ContainerVolumeDirs { final_config_dir: format!("{base}/{role}", base = Self::NODE_BASE_CONFIG_DIR), config_mount: format!("{base}/{role}", base = Self::NODE_BASE_CONFIG_DIR_MOUNT), @@ -1497,7 +1497,7 @@ impl TryFrom<&str> for ContainerVolumeDirs { type Error = Error; fn try_from(container_name: &str) -> Result { - if let Ok(role) = HdfsRole::from_str(container_name) { + if let Ok(role) = HdfsNodeRole::from_str(container_name) { return Ok(ContainerVolumeDirs::from(role)); } diff --git a/rust/crd/src/affinity.rs b/rust/operator-binary/src/crd/affinity.rs similarity index 92% rename from rust/crd/src/affinity.rs rename to rust/operator-binary/src/crd/affinity.rs index a96c0816..917a1482 100644 --- a/rust/crd/src/affinity.rs +++ b/rust/operator-binary/src/crd/affinity.rs @@ -5,9 +5,9 @@ use stackable_operator::{ k8s_openapi::api::core::v1::{PodAffinity, PodAntiAffinity}, }; -use crate::{HdfsRole, APP_NAME}; +use crate::crd::{constants::APP_NAME, HdfsNodeRole}; -pub fn get_affinity(cluster_name: &str, role: &HdfsRole) -> StackableAffinityFragment { +pub fn get_affinity(cluster_name: &str, role: &HdfsNodeRole) -> StackableAffinityFragment { StackableAffinityFragment { pod_affinity: Some(PodAffinity { preferred_during_scheduling_ignored_during_execution: Some(vec![ @@ -41,13 +41,13 @@ mod test { }, }; - use crate::{HdfsCluster, HdfsRole}; + use crate::crd::{v1alpha1, HdfsNodeRole}; #[rstest] - #[case(HdfsRole::JournalNode)] - #[case(HdfsRole::NameNode)] - #[case(HdfsRole::DataNode)] - fn test_affinity_defaults(#[case] role: HdfsRole) { + #[case(HdfsNodeRole::Journal)] + #[case(HdfsNodeRole::Name)] + #[case(HdfsNodeRole::Data)] + fn test_affinity_defaults(#[case] role: HdfsNodeRole) { let input = r#" apiVersion: hdfs.stackable.tech/v1alpha1 kind: HdfsCluster @@ -71,7 +71,7 @@ spec: default: replicas: 1 "#; - let hdfs: HdfsCluster = serde_yaml::from_str(input).unwrap(); + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(input).unwrap(); let merged_config = role.merged_config(&hdfs, "default").unwrap(); assert_eq!( diff --git a/rust/crd/src/constants.rs b/rust/operator-binary/src/crd/constants.rs similarity index 96% rename from rust/crd/src/constants.rs rename to rust/operator-binary/src/crd/constants.rs index 0a70807c..7dea151e 100644 --- a/rust/crd/src/constants.rs +++ b/rust/operator-binary/src/crd/constants.rs @@ -2,10 +2,7 @@ use stackable_operator::time::Duration; pub const DEFAULT_DFS_REPLICATION_FACTOR: u8 = 3; -pub const CONTROLLER_NAME: &str = "hdfsclusters.hdfs.stackable.tech"; - pub const FIELD_MANAGER_SCOPE: &str = "hdfscluster"; -pub const FIELD_MANAGER_SCOPE_POD: &str = "pod-service"; pub const APP_NAME: &str = "hdfs"; diff --git a/rust/crd/src/lib.rs b/rust/operator-binary/src/crd/mod.rs similarity index 80% rename from rust/crd/src/lib.rs rename to rust/operator-binary/src/crd/mod.rs index ed743480..d721fa62 100644 --- a/rust/crd/src/lib.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -1,7 +1,6 @@ use std::{ borrow::Cow, collections::{BTreeMap, HashMap}, - fmt::Display, num::TryFromIntError, ops::Deref, }; @@ -48,11 +47,25 @@ use stackable_operator::{ time::Duration, utils::cluster_info::KubernetesClusterInfo, }; +use stackable_versioned::versioned; use strum::{Display, EnumIter, EnumString, IntoStaticStr}; -use crate::{ +use crate::crd::{ affinity::get_affinity, - constants::*, + constants::{ + APP_NAME, CORE_SITE_XML, DEFAULT_DATA_NODE_DATA_PORT, + DEFAULT_DATA_NODE_GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_DATA_NODE_HTTPS_PORT, + DEFAULT_DATA_NODE_HTTP_PORT, DEFAULT_DATA_NODE_IPC_PORT, DEFAULT_DATA_NODE_METRICS_PORT, + DEFAULT_DFS_REPLICATION_FACTOR, DEFAULT_JOURNAL_NODE_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_JOURNAL_NODE_HTTPS_PORT, DEFAULT_JOURNAL_NODE_HTTP_PORT, + DEFAULT_JOURNAL_NODE_METRICS_PORT, DEFAULT_JOURNAL_NODE_RPC_PORT, DEFAULT_LISTENER_CLASS, + DEFAULT_NAME_NODE_GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_NAME_NODE_HTTPS_PORT, + DEFAULT_NAME_NODE_HTTP_PORT, DEFAULT_NAME_NODE_METRICS_PORT, DEFAULT_NAME_NODE_RPC_PORT, + DFS_REPLICATION, HADOOP_POLICY_XML, HDFS_SITE_XML, JVM_SECURITY_PROPERTIES_FILE, + LISTENER_VOLUME_NAME, SERVICE_PORT_NAME_DATA, SERVICE_PORT_NAME_HTTP, + SERVICE_PORT_NAME_HTTPS, SERVICE_PORT_NAME_IPC, SERVICE_PORT_NAME_METRICS, + SERVICE_PORT_NAME_RPC, SSL_CLIENT_XML, SSL_SERVER_XML, + }, security::{AuthenticationConfig, KerberosConfig}, storage::{ DataNodePvcFragment, DataNodeStorageConfigInnerType, HdfsStorageConfig, @@ -108,409 +121,102 @@ pub enum Error { MergeJvmArgumentOverrides { source: role_utils::Error }, } -/// An HDFS cluster stacklet. This resource is managed by the Stackable operator for Apache Hadoop HDFS. -/// Find more information on how to use it and the resources that the operator generates in the -/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/hdfs/). -/// -/// The CRD contains three roles: `nameNodes`, `dataNodes` and `journalNodes`. -#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] -#[kube( - group = "hdfs.stackable.tech", - version = "v1alpha1", - kind = "HdfsCluster", - plural = "hdfsclusters", - shortname = "hdfs", - status = "HdfsClusterStatus", - namespaced, - crates( - kube_core = "stackable_operator::kube::core", - k8s_openapi = "stackable_operator::k8s_openapi", - schemars = "stackable_operator::schemars" - ) -)] -#[serde(rename_all = "camelCase")] -pub struct HdfsClusterSpec { - /// Configuration that applies to all roles and role groups. - /// This includes settings for authentication, logging and the ZooKeeper cluster to use. - pub cluster_config: HdfsClusterConfig, - - // no doc string - See ProductImage struct - pub image: ProductImage, - - // no doc string - See ClusterOperation struct - #[serde(default)] - pub cluster_operation: ClusterOperation, - - // no doc string - See Role struct - #[serde(default, skip_serializing_if = "Option::is_none")] - pub name_nodes: Option>, - - // no doc string - See Role struct - #[serde(default, skip_serializing_if = "Option::is_none")] - pub data_nodes: Option>, - - // no doc string - See Role struct - #[serde(default, skip_serializing_if = "Option::is_none")] - pub journal_nodes: Option>, -} - -#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct HdfsClusterConfig { - /// `dfsReplication` is the factor of how many times a file will be replicated to different data nodes. - /// The default is 3. - /// You need at least the same amount of data nodes so each file can be replicated correctly, otherwise a warning will be printed. - #[serde(default = "default_dfs_replication_factor")] - pub dfs_replication: u8, - - /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). - /// It must contain the key `ADDRESS` with the address of the Vector aggregator. - /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) - /// to learn how to configure log aggregation with Vector. - #[serde(skip_serializing_if = "Option::is_none")] - pub vector_aggregator_config_map_name: Option, - - /// Name of the [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) - /// for a ZooKeeper cluster. - pub zookeeper_config_map_name: String, - - /// Settings related to user [authentication](DOCS_BASE_URL_PLACEHOLDER/usage-guide/security). - pub authentication: Option, - - /// Authorization options for HDFS. - /// Learn more in the [HDFS authorization usage guide](DOCS_BASE_URL_PLACEHOLDER/hdfs/usage-guide/security#authorization). - #[serde(skip_serializing_if = "Option::is_none")] - pub authorization: Option, - - // Scheduled for removal in v1alpha2, see https://github.com/stackabletech/issues/issues/504 - /// Deprecated, please use `.spec.nameNodes.config.listenerClass` and `.spec.dataNodes.config.listenerClass` instead. - #[serde(default)] - pub listener_class: DeprecatedClusterListenerClass, - - /// Configuration to control HDFS topology (rack) awareness feature - pub rack_awareness: Option>, -} - -#[derive(Clone, Debug, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub enum TopologyLabel { - /// Name of the label on the Kubernetes Node (where the Pod is placed on) used to resolve a datanode to a topology - /// zone. - NodeLabel(String), - - /// Name of the label on the Kubernetes Pod used to resolve a datanode to a topology zone. - PodLabel(String), -} - -impl TopologyLabel { - pub fn to_config(&self) -> String { - match &self { - TopologyLabel::NodeLabel(l) => format!("Node:{l}"), - TopologyLabel::PodLabel(l) => format!("Pod:{l}"), - } +#[versioned(version(name = "v1alpha1"))] +pub mod versioned { + /// An HDFS cluster stacklet. This resource is managed by the Stackable operator for Apache Hadoop HDFS. + /// Find more information on how to use it and the resources that the operator generates in the + /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/hdfs/). + /// + /// The CRD contains three roles: `nameNodes`, `dataNodes` and `journalNodes`. + #[versioned(k8s( + group = "hdfs.stackable.tech", + kind = "HdfsCluster", + plural = "hdfsclusters", + shortname = "hdfs", + status = "HdfsClusterStatus", + namespaced, + crates( + kube_core = "stackable_operator::kube::core", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars" + ) + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct HdfsClusterSpec { + /// Configuration that applies to all roles and role groups. + /// This includes settings for authentication, logging and the ZooKeeper cluster to use. + pub cluster_config: v1alpha1::HdfsClusterConfig, + + // no doc string - See ProductImage struct + pub image: ProductImage, + + // no doc string - See ClusterOperation struct + #[serde(default)] + pub cluster_operation: ClusterOperation, + + // no doc string - See Role struct + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name_nodes: Option>, + + // no doc string - See Role struct + #[serde(default, skip_serializing_if = "Option::is_none")] + pub data_nodes: Option>, + + // no doc string - See Role struct + #[serde(default, skip_serializing_if = "Option::is_none")] + pub journal_nodes: + Option>, } -} - -fn default_dfs_replication_factor() -> u8 { - DEFAULT_DFS_REPLICATION_FACTOR -} - -#[derive(Clone, Debug, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize, Default)] -#[serde(rename_all = "kebab-case")] -pub enum DeprecatedClusterListenerClass { - #[default] - ClusterInternal, -} -/// Configuration options that are available for all roles. -#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] -#[fragment_attrs( - derive( - Clone, - Debug, - Default, - Deserialize, - Merge, - JsonSchema, - PartialEq, - Serialize - ), - serde(rename_all = "camelCase") -)] -pub struct CommonNodeConfig { - #[fragment_attrs(serde(default))] - pub affinity: StackableAffinity, - /// Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. - #[fragment_attrs(serde(default))] - pub graceful_shutdown_timeout: Option, - - /// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. - /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. - #[fragment_attrs(serde(default))] - pub requested_secret_lifetime: Option, -} - -/// Configuration for a rolegroup of an unknown type. -#[derive(Debug)] -pub enum AnyNodeConfig { - NameNode(NameNodeConfig), - DataNode(DataNodeConfig), - JournalNode(JournalNodeConfig), -} - -impl Deref for AnyNodeConfig { - type Target = CommonNodeConfig; - fn deref(&self) -> &Self::Target { - match self { - AnyNodeConfig::NameNode(node) => &node.common, - AnyNodeConfig::DataNode(node) => &node.common, - AnyNodeConfig::JournalNode(node) => &node.common, - } + #[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct HdfsClusterConfig { + /// `dfsReplication` is the factor of how many times a file will be replicated to different data nodes. + /// The default is 3. + /// You need at least the same amount of data nodes so each file can be replicated correctly, otherwise a warning will be printed. + #[serde(default = "default_dfs_replication_factor")] + pub dfs_replication: u8, + + /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) + /// to learn how to configure log aggregation with Vector. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, + + /// Name of the [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) + /// for a ZooKeeper cluster. + pub zookeeper_config_map_name: String, + + /// Settings related to user [authentication](DOCS_BASE_URL_PLACEHOLDER/usage-guide/security). + pub authentication: Option, + + /// Authorization options for HDFS. + /// Learn more in the [HDFS authorization usage guide](DOCS_BASE_URL_PLACEHOLDER/hdfs/usage-guide/security#authorization). + #[serde(skip_serializing_if = "Option::is_none")] + pub authorization: Option, + + // Scheduled for removal in v1alpha2, see https://github.com/stackabletech/issues/issues/504 + /// Deprecated, please use `.spec.nameNodes.config.listenerClass` and `.spec.dataNodes.config.listenerClass` instead. + #[serde(default)] + pub listener_class: DeprecatedClusterListenerClass, + + /// Configuration to control HDFS topology (rack) awareness feature + pub rack_awareness: Option>, } } -impl AnyNodeConfig { - // Downcasting helpers for each variant - pub fn as_namenode(&self) -> Option<&NameNodeConfig> { - if let Self::NameNode(node) = self { - Some(node) - } else { - None - } - } - pub fn as_datanode(&self) -> Option<&DataNodeConfig> { - if let Self::DataNode(node) = self { - Some(node) - } else { - None - } - } - pub fn as_journalnode(&self) -> Option<&JournalNodeConfig> { - if let Self::JournalNode(node) = self { - Some(node) - } else { - None - } - } - - // Logging config is distinct between each role, due to the different enum types, - // so provide helpers for containers that are common between all roles. - pub fn hdfs_logging(&self) -> Cow { - match self { - AnyNodeConfig::NameNode(node) => node.logging.for_container(&NameNodeContainer::Hdfs), - AnyNodeConfig::DataNode(node) => node.logging.for_container(&DataNodeContainer::Hdfs), - AnyNodeConfig::JournalNode(node) => { - node.logging.for_container(&JournalNodeContainer::Hdfs) - } - } - } - pub fn vector_logging(&self) -> Cow { - match &self { - AnyNodeConfig::NameNode(node) => node.logging.for_container(&NameNodeContainer::Vector), - AnyNodeConfig::DataNode(node) => node.logging.for_container(&DataNodeContainer::Vector), - AnyNodeConfig::JournalNode(node) => { - node.logging.for_container(&JournalNodeContainer::Vector) - } - } - } - pub fn vector_logging_enabled(&self) -> bool { - match self { - AnyNodeConfig::NameNode(node) => node.logging.enable_vector_agent, - AnyNodeConfig::DataNode(node) => node.logging.enable_vector_agent, - AnyNodeConfig::JournalNode(node) => node.logging.enable_vector_agent, - } - } - pub fn requested_secret_lifetime(&self) -> Option { - match self { - AnyNodeConfig::NameNode(node) => node.common.requested_secret_lifetime, - AnyNodeConfig::DataNode(node) => node.common.requested_secret_lifetime, - AnyNodeConfig::JournalNode(node) => node.common.requested_secret_lifetime, - } - } -} - -#[derive( - Clone, - Copy, - Debug, - Deserialize, - Display, - EnumIter, - EnumString, - IntoStaticStr, - Eq, - Hash, - JsonSchema, - PartialEq, - Serialize, -)] -pub enum HdfsRole { - #[serde(rename = "journalnode")] - #[strum(serialize = "journalnode")] - JournalNode, - #[serde(rename = "namenode")] - #[strum(serialize = "namenode")] - NameNode, - #[serde(rename = "datanode")] - #[strum(serialize = "datanode")] - DataNode, -} - -impl HdfsRole { - pub fn min_replicas(&self) -> u16 { - match self { - HdfsRole::NameNode => 2, - HdfsRole::DataNode => 1, - HdfsRole::JournalNode => 3, - } - } - - pub fn replicas_can_be_even(&self) -> bool { - match self { - HdfsRole::NameNode => true, - HdfsRole::DataNode => true, - HdfsRole::JournalNode => false, - } - } - - pub fn check_valid_dfs_replication(&self) -> bool { - match self { - HdfsRole::NameNode => false, - HdfsRole::DataNode => true, - HdfsRole::JournalNode => false, - } - } - - /// Merge the [Name|Data|Journal]NodeConfigFragment defaults, role and role group settings. - /// The priority is: default < role config < role_group config - pub fn merged_config( - &self, - hdfs: &HdfsCluster, - role_group: &str, - ) -> Result { - match self { - HdfsRole::NameNode => { - let default_config = NameNodeConfigFragment::default_config(&hdfs.name_any(), self); - let role = hdfs - .spec - .name_nodes - .as_ref() - .with_context(|| MissingRoleSnafu { - role: HdfsRole::NameNode.to_string(), - })?; - - let mut role_config = role.config.config.clone(); - let mut role_group_config = hdfs - .namenode_rolegroup(role_group) - .with_context(|| MissingRoleGroupSnafu { - role: HdfsRole::NameNode.to_string(), - role_group: role_group.to_string(), - })? - .config - .config - .clone(); - - role_config.merge(&default_config); - role_group_config.merge(&role_config); - Ok(AnyNodeConfig::NameNode( - fragment::validate::(role_group_config) - .context(FragmentValidationFailureSnafu)?, - )) - } - HdfsRole::DataNode => { - let default_config = DataNodeConfigFragment::default_config(&hdfs.name_any(), self); - let role = hdfs - .spec - .data_nodes - .as_ref() - .with_context(|| MissingRoleSnafu { - role: HdfsRole::DataNode.to_string(), - })?; - - let mut role_config = role.config.config.clone(); - let mut role_group_config = hdfs - .datanode_rolegroup(role_group) - .with_context(|| MissingRoleGroupSnafu { - role: HdfsRole::DataNode.to_string(), - role_group: role_group.to_string(), - })? - .config - .config - .clone(); - - role_config.merge(&default_config); - role_group_config.merge(&role_config); - Ok(AnyNodeConfig::DataNode( - fragment::validate::(role_group_config) - .context(FragmentValidationFailureSnafu)?, - )) - } - HdfsRole::JournalNode => { - let default_config = - JournalNodeConfigFragment::default_config(&hdfs.name_any(), self); - let role = hdfs - .spec - .journal_nodes - .as_ref() - .with_context(|| MissingRoleSnafu { - role: HdfsRole::JournalNode.to_string(), - })?; - - let mut role_config = role.config.config.clone(); - let mut role_group_config = hdfs - .journalnode_rolegroup(role_group) - .with_context(|| MissingRoleGroupSnafu { - role: HdfsRole::JournalNode.to_string(), - role_group: role_group.to_string(), - })? - .config - .config - .clone(); - - role_config.merge(&default_config); - role_group_config.merge(&role_config); - Ok(AnyNodeConfig::JournalNode( - fragment::validate::(role_group_config) - .context(FragmentValidationFailureSnafu)?, - )) - } - } - } - - /// Name of the Hadoop process HADOOP_OPTS. - pub fn hadoop_opts_env_var_for_role(&self) -> &'static str { - match self { - HdfsRole::NameNode => "HDFS_NAMENODE_OPTS", - HdfsRole::DataNode => "HDFS_DATANODE_OPTS", - HdfsRole::JournalNode => "HDFS_JOURNALNODE_OPTS", - } - } - - pub fn kerberos_service_name(&self) -> &'static str { - match self { - HdfsRole::NameNode => "nn", - HdfsRole::DataNode => "dn", - HdfsRole::JournalNode => "jn", - } - } - - /// Return replicas for a certain rolegroup. - pub fn role_group_replicas(&self, hdfs: &HdfsCluster, role_group: &str) -> Option { - match self { - HdfsRole::NameNode => hdfs - .namenode_rolegroup(role_group) - .and_then(|rg| rg.replicas), - HdfsRole::DataNode => hdfs - .datanode_rolegroup(role_group) - .and_then(|rg| rg.replicas), - HdfsRole::JournalNode => hdfs - .journalnode_rolegroup(role_group) - .and_then(|rg| rg.replicas), +impl HasStatusCondition for v1alpha1::HdfsCluster { + fn conditions(&self) -> Vec { + match &self.status { + Some(status) => status.conditions.clone(), + None => vec![], } } } -impl HdfsCluster { +impl v1alpha1::HdfsCluster { /// Return the namespace of the cluster or an error in case it is not set. pub fn namespace_or_error(&self) -> Result { self.namespace().context(NoNamespaceSnafu) @@ -521,7 +227,7 @@ impl HdfsCluster { /// The same labels are also used as selectors for Services and StatefulSets. pub fn rolegroup_selector_labels( &self, - rolegroup_ref: &RoleGroupRef, + rolegroup_ref: &RoleGroupRef, ) -> Result { let mut group_labels = Labels::role_group_selector( self, @@ -568,62 +274,62 @@ impl HdfsCluster { .get(role_group) } - pub fn role_config(&self, hdfs_role: &HdfsRole) -> Option<&GenericRoleConfig> { + pub fn role_config(&self, hdfs_role: &HdfsNodeRole) -> Option<&GenericRoleConfig> { match hdfs_role { - HdfsRole::NameNode => self.spec.name_nodes.as_ref().map(|nn| &nn.role_config), - HdfsRole::DataNode => self.spec.data_nodes.as_ref().map(|dn| &dn.role_config), - HdfsRole::JournalNode => self.spec.journal_nodes.as_ref().map(|jn| &jn.role_config), + HdfsNodeRole::Name => self.spec.name_nodes.as_ref().map(|nn| &nn.role_config), + HdfsNodeRole::Data => self.spec.data_nodes.as_ref().map(|dn| &dn.role_config), + HdfsNodeRole::Journal => self.spec.journal_nodes.as_ref().map(|jn| &jn.role_config), } } pub fn get_merged_jvm_argument_overrides( &self, - hdfs_role: &HdfsRole, + hdfs_role: &HdfsNodeRole, role_group: &str, operator_generated: &JvmArgumentOverrides, ) -> Result { match hdfs_role { - HdfsRole::JournalNode => self + HdfsNodeRole::Journal => self .spec .journal_nodes .as_ref() .with_context(|| MissingRoleSnafu { - role: HdfsRole::JournalNode.to_string(), + role: HdfsNodeRole::Journal.to_string(), })? .get_merged_jvm_argument_overrides(role_group, operator_generated), - HdfsRole::NameNode => self + HdfsNodeRole::Name => self .spec .name_nodes .as_ref() .with_context(|| MissingRoleSnafu { - role: HdfsRole::NameNode.to_string(), + role: HdfsNodeRole::Name.to_string(), })? .get_merged_jvm_argument_overrides(role_group, operator_generated), - HdfsRole::DataNode => self + HdfsNodeRole::Data => self .spec .data_nodes .as_ref() .with_context(|| MissingRoleSnafu { - role: HdfsRole::DataNode.to_string(), + role: HdfsNodeRole::Data.to_string(), })? .get_merged_jvm_argument_overrides(role_group, operator_generated), } .context(MergeJvmArgumentOverridesSnafu) } - pub fn pod_overrides_for_role(&self, role: &HdfsRole) -> Option<&PodTemplateSpec> { + pub fn pod_overrides_for_role(&self, role: &HdfsNodeRole) -> Option<&PodTemplateSpec> { match role { - HdfsRole::NameNode => self + HdfsNodeRole::Name => self .spec .name_nodes .as_ref() .map(|n| &n.config.pod_overrides), - HdfsRole::DataNode => self + HdfsNodeRole::Data => self .spec .data_nodes .as_ref() .map(|n| &n.config.pod_overrides), - HdfsRole::JournalNode => self + HdfsNodeRole::Journal => self .spec .journal_nodes .as_ref() @@ -633,17 +339,17 @@ impl HdfsCluster { pub fn pod_overrides_for_role_group( &self, - role: &HdfsRole, + role: &HdfsNodeRole, role_group: &str, ) -> Option<&PodTemplateSpec> { match role { - HdfsRole::NameNode => self + HdfsNodeRole::Name => self .namenode_rolegroup(role_group) .map(|r| &r.config.pod_overrides), - HdfsRole::DataNode => self + HdfsNodeRole::Data => self .datanode_rolegroup(role_group) .map(|r| &r.config.pod_overrides), - HdfsRole::JournalNode => self + HdfsNodeRole::Journal => self .journalnode_rolegroup(role_group) .map(|r| &r.config.pod_overrides), } @@ -653,7 +359,7 @@ impl HdfsCluster { &self, role_name: impl Into, group_name: impl Into, - ) -> RoleGroupRef { + ) -> RoleGroupRef { RoleGroupRef { cluster: ObjectRef::from_obj(self), role: role_name.into(), @@ -661,14 +367,14 @@ impl HdfsCluster { } } - /// List all [`HdfsPodRef`]s expected for the given [`role`](HdfsRole). + /// List all [`HdfsPodRef`]s expected for the given [`role`](HdfsNodeRole). /// /// The `validated_config` is used to extract the ports exposed by the pods. /// /// The pod refs returned by `pod_refs` will only be able to able to access HDFS /// from inside the Kubernetes cluster. For configuring downstream clients, /// consider using [`Self::namenode_listener_refs`] instead. - pub fn pod_refs(&self, role: &HdfsRole) -> Result, Error> { + pub fn pod_refs(&self, role: &HdfsNodeRole) -> Result, Error> { let ns = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; let rolegroup_ref_and_replicas = self.rolegroup_ref_and_replicas(role); @@ -706,7 +412,7 @@ impl HdfsCluster { &self, client: &stackable_operator::client::Client, ) -> Result, Error> { - let pod_refs = self.pod_refs(&HdfsRole::NameNode)?; + let pod_refs = self.pod_refs(&HdfsNodeRole::Name)?; try_join_all(pod_refs.into_iter().map(|pod_ref| async { let listener_name = format!("{LISTENER_VOLUME_NAME}-{}", pod_ref.pod_name); let listener_ref = @@ -748,10 +454,10 @@ impl HdfsCluster { pub fn rolegroup_ref_and_replicas( &self, - role: &HdfsRole, - ) -> Vec<(RoleGroupRef, u16)> { + role: &HdfsNodeRole, + ) -> Vec<(RoleGroupRef, u16)> { match role { - HdfsRole::NameNode => self + HdfsNodeRole::Name => self .spec .name_nodes .iter() @@ -761,12 +467,12 @@ impl HdfsCluster { .into_iter() .map(|(rolegroup_name, role_group)| { ( - self.rolegroup_ref(HdfsRole::NameNode.to_string(), rolegroup_name), + self.rolegroup_ref(HdfsNodeRole::Name.to_string(), rolegroup_name), role_group.replicas.unwrap_or_default(), ) }) .collect(), - HdfsRole::DataNode => self + HdfsNodeRole::Data => self .spec .data_nodes .iter() @@ -776,12 +482,12 @@ impl HdfsCluster { .into_iter() .map(|(rolegroup_name, role_group)| { ( - self.rolegroup_ref(HdfsRole::DataNode.to_string(), rolegroup_name), + self.rolegroup_ref(HdfsNodeRole::Data.to_string(), rolegroup_name), role_group.replicas.unwrap_or_default(), ) }) .collect(), - HdfsRole::JournalNode => self + HdfsNodeRole::Journal => self .spec .journal_nodes .iter() @@ -791,7 +497,7 @@ impl HdfsCluster { .into_iter() .map(|(rolegroup_name, role_group)| { ( - self.rolegroup_ref(HdfsRole::JournalNode.to_string(), rolegroup_name), + self.rolegroup_ref(HdfsNodeRole::Journal.to_string(), rolegroup_name), role_group.replicas.unwrap_or_default(), ) }) @@ -808,7 +514,7 @@ impl HdfsCluster { ( Vec, Role< - impl Configuration, + impl Configuration, GenericRoleConfig, JavaCommonConfig, >, @@ -829,34 +535,34 @@ impl HdfsCluster { if let Some(name_nodes) = &self.spec.name_nodes { result.insert( - HdfsRole::NameNode.to_string(), + HdfsNodeRole::Name.to_string(), (pnk.clone(), name_nodes.clone().erase()), ); } else { return Err(Error::MissingRole { - role: HdfsRole::NameNode.to_string(), + role: HdfsNodeRole::Name.to_string(), }); } if let Some(data_nodes) = &self.spec.data_nodes { result.insert( - HdfsRole::DataNode.to_string(), + HdfsNodeRole::Data.to_string(), (pnk.clone(), data_nodes.clone().erase()), ); } else { return Err(Error::MissingRole { - role: HdfsRole::DataNode.to_string(), + role: HdfsNodeRole::Data.to_string(), }); } if let Some(journal_nodes) = &self.spec.journal_nodes { result.insert( - HdfsRole::JournalNode.to_string(), + HdfsNodeRole::Journal.to_string(), (pnk, journal_nodes.clone().erase()), ); } else { return Err(Error::MissingRole { - role: HdfsRole::JournalNode.to_string(), + role: HdfsNodeRole::Journal.to_string(), }); } @@ -899,135 +605,457 @@ impl HdfsCluster { Ok(None) } } - - pub fn authentication_config(&self) -> Option<&AuthenticationConfig> { - self.spec.cluster_config.authentication.as_ref() + + pub fn authentication_config(&self) -> Option<&AuthenticationConfig> { + self.spec.cluster_config.authentication.as_ref() + } + + pub fn has_kerberos_enabled(&self) -> bool { + self.kerberos_config().is_some() + } + + pub fn kerberos_config(&self) -> Option<&KerberosConfig> { + self.spec + .cluster_config + .authentication + .as_ref() + .map(|s| &s.kerberos) + } + + pub fn has_https_enabled(&self) -> bool { + self.https_secret_class().is_some() + } + + pub fn rackawareness_config(&self) -> Option { + self.spec + .cluster_config + .rack_awareness + .as_ref() + .map(|label_list| { + label_list + .iter() + .map(TopologyLabel::to_config) + .collect::>() + .join(";") + }) + } + + pub fn https_secret_class(&self) -> Option<&str> { + self.spec + .cluster_config + .authentication + .as_ref() + .map(|k| k.tls_secret_class.as_str()) + } + + pub fn has_authorization_enabled(&self) -> bool { + self.spec.cluster_config.authorization.is_some() + } + + pub fn num_datanodes(&self) -> u16 { + self.spec + .data_nodes + .iter() + .flat_map(|dn| dn.role_groups.values()) + .map(|rg| rg.replicas.unwrap_or(1)) + .sum() + } + + /// Returns required port name and port number tuples depending on the role. + pub fn ports(&self, role: &HdfsNodeRole) -> Vec<(String, u16)> { + match role { + HdfsNodeRole::Name => vec![ + ( + String::from(SERVICE_PORT_NAME_METRICS), + DEFAULT_NAME_NODE_METRICS_PORT, + ), + ( + String::from(SERVICE_PORT_NAME_RPC), + DEFAULT_NAME_NODE_RPC_PORT, + ), + if self.has_https_enabled() { + ( + String::from(SERVICE_PORT_NAME_HTTPS), + DEFAULT_NAME_NODE_HTTPS_PORT, + ) + } else { + ( + String::from(SERVICE_PORT_NAME_HTTP), + DEFAULT_NAME_NODE_HTTP_PORT, + ) + }, + ], + HdfsNodeRole::Data => vec![ + ( + String::from(SERVICE_PORT_NAME_METRICS), + DEFAULT_DATA_NODE_METRICS_PORT, + ), + ( + String::from(SERVICE_PORT_NAME_DATA), + DEFAULT_DATA_NODE_DATA_PORT, + ), + ( + String::from(SERVICE_PORT_NAME_IPC), + DEFAULT_DATA_NODE_IPC_PORT, + ), + if self.has_https_enabled() { + ( + String::from(SERVICE_PORT_NAME_HTTPS), + DEFAULT_DATA_NODE_HTTPS_PORT, + ) + } else { + ( + String::from(SERVICE_PORT_NAME_HTTP), + DEFAULT_DATA_NODE_HTTP_PORT, + ) + }, + ], + HdfsNodeRole::Journal => vec![ + ( + String::from(SERVICE_PORT_NAME_METRICS), + DEFAULT_JOURNAL_NODE_METRICS_PORT, + ), + ( + String::from(SERVICE_PORT_NAME_RPC), + DEFAULT_JOURNAL_NODE_RPC_PORT, + ), + if self.has_https_enabled() { + ( + String::from(SERVICE_PORT_NAME_HTTPS), + DEFAULT_JOURNAL_NODE_HTTPS_PORT, + ) + } else { + ( + String::from(SERVICE_PORT_NAME_HTTP), + DEFAULT_JOURNAL_NODE_HTTP_PORT, + ) + }, + ], + } + } +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum TopologyLabel { + /// Name of the label on the Kubernetes Node (where the Pod is placed on) used to resolve a datanode to a topology + /// zone. + NodeLabel(String), + + /// Name of the label on the Kubernetes Pod used to resolve a datanode to a topology zone. + PodLabel(String), +} + +impl TopologyLabel { + pub fn to_config(&self) -> String { + match &self { + TopologyLabel::NodeLabel(l) => format!("Node:{l}"), + TopologyLabel::PodLabel(l) => format!("Pod:{l}"), + } + } +} + +fn default_dfs_replication_factor() -> u8 { + DEFAULT_DFS_REPLICATION_FACTOR +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize, Default)] +#[serde(rename_all = "kebab-case")] +pub enum DeprecatedClusterListenerClass { + #[default] + ClusterInternal, +} + +/// Configuration options that are available for all roles. +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] +pub struct CommonNodeConfig { + #[fragment_attrs(serde(default))] + pub affinity: StackableAffinity, + /// Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + #[fragment_attrs(serde(default))] + pub graceful_shutdown_timeout: Option, + + /// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + #[fragment_attrs(serde(default))] + pub requested_secret_lifetime: Option, +} + +/// Configuration for a rolegroup of an unknown type. +#[derive(Debug)] +pub enum AnyNodeConfig { + Name(NameNodeConfig), + Data(DataNodeConfig), + Journal(JournalNodeConfig), +} + +impl Deref for AnyNodeConfig { + type Target = CommonNodeConfig; + fn deref(&self) -> &Self::Target { + match self { + AnyNodeConfig::Name(node) => &node.common, + AnyNodeConfig::Data(node) => &node.common, + AnyNodeConfig::Journal(node) => &node.common, + } + } +} + +impl AnyNodeConfig { + // Downcasting helpers for each variant + pub fn as_namenode(&self) -> Option<&NameNodeConfig> { + if let Self::Name(node) = self { + Some(node) + } else { + None + } + } + pub fn as_datanode(&self) -> Option<&DataNodeConfig> { + if let Self::Data(node) = self { + Some(node) + } else { + None + } + } + pub fn as_journalnode(&self) -> Option<&JournalNodeConfig> { + if let Self::Journal(node) = self { + Some(node) + } else { + None + } + } + + // Logging config is distinct between each role, due to the different enum types, + // so provide helpers for containers that are common between all roles. + pub fn hdfs_logging(&self) -> Cow { + match self { + AnyNodeConfig::Name(node) => node.logging.for_container(&NameNodeContainer::Hdfs), + AnyNodeConfig::Data(node) => node.logging.for_container(&DataNodeContainer::Hdfs), + AnyNodeConfig::Journal(node) => node.logging.for_container(&JournalNodeContainer::Hdfs), + } + } + pub fn vector_logging(&self) -> Cow { + match &self { + AnyNodeConfig::Name(node) => node.logging.for_container(&NameNodeContainer::Vector), + AnyNodeConfig::Data(node) => node.logging.for_container(&DataNodeContainer::Vector), + AnyNodeConfig::Journal(node) => { + node.logging.for_container(&JournalNodeContainer::Vector) + } + } + } + pub fn vector_logging_enabled(&self) -> bool { + match self { + AnyNodeConfig::Name(node) => node.logging.enable_vector_agent, + AnyNodeConfig::Data(node) => node.logging.enable_vector_agent, + AnyNodeConfig::Journal(node) => node.logging.enable_vector_agent, + } + } + pub fn requested_secret_lifetime(&self) -> Option { + match self { + AnyNodeConfig::Name(node) => node.common.requested_secret_lifetime, + AnyNodeConfig::Data(node) => node.common.requested_secret_lifetime, + AnyNodeConfig::Journal(node) => node.common.requested_secret_lifetime, + } } +} - pub fn has_kerberos_enabled(&self) -> bool { - self.kerberos_config().is_some() - } +#[derive( + Clone, + Copy, + Debug, + Deserialize, + Display, + EnumIter, + EnumString, + IntoStaticStr, + Eq, + Hash, + JsonSchema, + PartialEq, + Serialize, +)] +pub enum HdfsNodeRole { + #[serde(rename = "journalnode")] + #[strum(serialize = "journalnode")] + Journal, + #[serde(rename = "namenode")] + #[strum(serialize = "namenode")] + Name, + #[serde(rename = "datanode")] + #[strum(serialize = "datanode")] + Data, +} - pub fn kerberos_config(&self) -> Option<&KerberosConfig> { - self.spec - .cluster_config - .authentication - .as_ref() - .map(|s| &s.kerberos) +impl HdfsNodeRole { + pub fn min_replicas(&self) -> u16 { + match self { + HdfsNodeRole::Name => 2, + HdfsNodeRole::Data => 1, + HdfsNodeRole::Journal => 3, + } } - pub fn has_https_enabled(&self) -> bool { - self.https_secret_class().is_some() + pub fn replicas_can_be_even(&self) -> bool { + match self { + HdfsNodeRole::Name => true, + HdfsNodeRole::Data => true, + HdfsNodeRole::Journal => false, + } } - pub fn rackawareness_config(&self) -> Option { - self.spec - .cluster_config - .rack_awareness - .as_ref() - .map(|label_list| { - label_list - .iter() - .map(TopologyLabel::to_config) - .collect::>() - .join(";") - }) + pub fn check_valid_dfs_replication(&self) -> bool { + match self { + HdfsNodeRole::Name => false, + HdfsNodeRole::Data => true, + HdfsNodeRole::Journal => false, + } } - pub fn https_secret_class(&self) -> Option<&str> { - self.spec - .cluster_config - .authentication - .as_ref() - .map(|k| k.tls_secret_class.as_str()) + /// Merge the [Name|Data|Journal]NodeConfigFragment defaults, role and role group settings. + /// The priority is: default < role config < role_group config + pub fn merged_config( + &self, + hdfs: &v1alpha1::HdfsCluster, + role_group: &str, + ) -> Result { + match self { + HdfsNodeRole::Name => { + let default_config = NameNodeConfigFragment::default_config(&hdfs.name_any(), self); + let role = hdfs + .spec + .name_nodes + .as_ref() + .with_context(|| MissingRoleSnafu { + role: HdfsNodeRole::Name.to_string(), + })?; + + let mut role_config = role.config.config.clone(); + let mut role_group_config = hdfs + .namenode_rolegroup(role_group) + .with_context(|| MissingRoleGroupSnafu { + role: HdfsNodeRole::Name.to_string(), + role_group: role_group.to_string(), + })? + .config + .config + .clone(); + + role_config.merge(&default_config); + role_group_config.merge(&role_config); + Ok(AnyNodeConfig::Name( + fragment::validate::(role_group_config) + .context(FragmentValidationFailureSnafu)?, + )) + } + HdfsNodeRole::Data => { + let default_config = DataNodeConfigFragment::default_config(&hdfs.name_any(), self); + let role = hdfs + .spec + .data_nodes + .as_ref() + .with_context(|| MissingRoleSnafu { + role: HdfsNodeRole::Data.to_string(), + })?; + + let mut role_config = role.config.config.clone(); + let mut role_group_config = hdfs + .datanode_rolegroup(role_group) + .with_context(|| MissingRoleGroupSnafu { + role: HdfsNodeRole::Data.to_string(), + role_group: role_group.to_string(), + })? + .config + .config + .clone(); + + role_config.merge(&default_config); + role_group_config.merge(&role_config); + Ok(AnyNodeConfig::Data( + fragment::validate::(role_group_config) + .context(FragmentValidationFailureSnafu)?, + )) + } + HdfsNodeRole::Journal => { + let default_config = + JournalNodeConfigFragment::default_config(&hdfs.name_any(), self); + let role = hdfs + .spec + .journal_nodes + .as_ref() + .with_context(|| MissingRoleSnafu { + role: HdfsNodeRole::Journal.to_string(), + })?; + + let mut role_config = role.config.config.clone(); + let mut role_group_config = hdfs + .journalnode_rolegroup(role_group) + .with_context(|| MissingRoleGroupSnafu { + role: HdfsNodeRole::Journal.to_string(), + role_group: role_group.to_string(), + })? + .config + .config + .clone(); + + role_config.merge(&default_config); + role_group_config.merge(&role_config); + Ok(AnyNodeConfig::Journal( + fragment::validate::(role_group_config) + .context(FragmentValidationFailureSnafu)?, + )) + } + } } - pub fn has_authorization_enabled(&self) -> bool { - self.spec.cluster_config.authorization.is_some() + /// Name of the Hadoop process HADOOP_OPTS. + pub fn hadoop_opts_env_var_for_role(&self) -> &'static str { + match self { + HdfsNodeRole::Name => "HDFS_NAMENODE_OPTS", + HdfsNodeRole::Data => "HDFS_DATANODE_OPTS", + HdfsNodeRole::Journal => "HDFS_JOURNALNODE_OPTS", + } } - pub fn num_datanodes(&self) -> u16 { - self.spec - .data_nodes - .iter() - .flat_map(|dn| dn.role_groups.values()) - .map(|rg| rg.replicas.unwrap_or(1)) - .sum() + pub fn kerberos_service_name(&self) -> &'static str { + match self { + HdfsNodeRole::Name => "nn", + HdfsNodeRole::Data => "dn", + HdfsNodeRole::Journal => "jn", + } } - /// Returns required port name and port number tuples depending on the role. - pub fn ports(&self, role: &HdfsRole) -> Vec<(String, u16)> { - match role { - HdfsRole::NameNode => vec![ - ( - String::from(SERVICE_PORT_NAME_METRICS), - DEFAULT_NAME_NODE_METRICS_PORT, - ), - ( - String::from(SERVICE_PORT_NAME_RPC), - DEFAULT_NAME_NODE_RPC_PORT, - ), - if self.has_https_enabled() { - ( - String::from(SERVICE_PORT_NAME_HTTPS), - DEFAULT_NAME_NODE_HTTPS_PORT, - ) - } else { - ( - String::from(SERVICE_PORT_NAME_HTTP), - DEFAULT_NAME_NODE_HTTP_PORT, - ) - }, - ], - HdfsRole::DataNode => vec![ - ( - String::from(SERVICE_PORT_NAME_METRICS), - DEFAULT_DATA_NODE_METRICS_PORT, - ), - ( - String::from(SERVICE_PORT_NAME_DATA), - DEFAULT_DATA_NODE_DATA_PORT, - ), - ( - String::from(SERVICE_PORT_NAME_IPC), - DEFAULT_DATA_NODE_IPC_PORT, - ), - if self.has_https_enabled() { - ( - String::from(SERVICE_PORT_NAME_HTTPS), - DEFAULT_DATA_NODE_HTTPS_PORT, - ) - } else { - ( - String::from(SERVICE_PORT_NAME_HTTP), - DEFAULT_DATA_NODE_HTTP_PORT, - ) - }, - ], - HdfsRole::JournalNode => vec![ - ( - String::from(SERVICE_PORT_NAME_METRICS), - DEFAULT_JOURNAL_NODE_METRICS_PORT, - ), - ( - String::from(SERVICE_PORT_NAME_RPC), - DEFAULT_JOURNAL_NODE_RPC_PORT, - ), - if self.has_https_enabled() { - ( - String::from(SERVICE_PORT_NAME_HTTPS), - DEFAULT_JOURNAL_NODE_HTTPS_PORT, - ) - } else { - ( - String::from(SERVICE_PORT_NAME_HTTP), - DEFAULT_JOURNAL_NODE_HTTP_PORT, - ) - }, - ], + /// Return replicas for a certain rolegroup. + pub fn role_group_replicas( + &self, + hdfs: &v1alpha1::HdfsCluster, + role_group: &str, + ) -> Option { + match self { + HdfsNodeRole::Name => hdfs + .namenode_rolegroup(role_group) + .and_then(|rg| rg.replicas), + HdfsNodeRole::Data => hdfs + .datanode_rolegroup(role_group) + .and_then(|rg| rg.replicas), + HdfsNodeRole::Journal => hdfs + .journalnode_rolegroup(role_group) + .and_then(|rg| rg.replicas), } } } + /// Reference to a single `Pod` that is a component of a [`HdfsCluster`] /// /// Used for service discovery. @@ -1133,7 +1161,7 @@ pub struct NameNodeConfig { impl NameNodeConfigFragment { const DEFAULT_NAME_NODE_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1); - pub fn default_config(cluster_name: &str, role: &HdfsRole) -> Self { + pub fn default_config(cluster_name: &str, role: &HdfsNodeRole) -> Self { Self { resources: ResourcesFragment { cpu: CpuLimitsFragment { @@ -1164,7 +1192,7 @@ impl NameNodeConfigFragment { } impl Configuration for NameNodeConfigFragment { - type Configurable = HdfsCluster; + type Configurable = v1alpha1::HdfsCluster; fn compute_env( &self, @@ -1176,7 +1204,7 @@ impl Configuration for NameNodeConfigFragment { // If rack awareness is configured, insert the labels into an env var to configure // the topology-provider and add the artifact to the classpath. // This is only needed on namenodes. - if role_name == HdfsRole::NameNode.to_string() { + if role_name == HdfsNodeRole::Name.to_string() { if let Some(awareness_config) = resource.rackawareness_config() { result.insert("TOPOLOGY_LABELS".to_string(), Some(awareness_config)); } @@ -1204,7 +1232,7 @@ impl Configuration for NameNodeConfigFragment { DFS_REPLICATION.to_string(), Some(resource.spec.cluster_config.dfs_replication.to_string()), ); - } else if file == CORE_SITE_XML && role_name == HdfsRole::NameNode.to_string() { + } else if file == CORE_SITE_XML && role_name == HdfsNodeRole::Name.to_string() { if let Some(_awareness_config) = resource.rackawareness_config() { config.insert( "net.topology.node.switch.mapping.impl".to_string(), @@ -1270,7 +1298,7 @@ pub struct DataNodeConfig { impl DataNodeConfigFragment { const DEFAULT_DATA_NODE_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1); - pub fn default_config(cluster_name: &str, role: &HdfsRole) -> Self { + pub fn default_config(cluster_name: &str, role: &HdfsNodeRole) -> Self { Self { resources: ResourcesFragment { cpu: CpuLimitsFragment { @@ -1306,7 +1334,7 @@ impl DataNodeConfigFragment { } impl Configuration for DataNodeConfigFragment { - type Configurable = HdfsCluster; + type Configurable = v1alpha1::HdfsCluster; fn compute_env( &self, @@ -1388,7 +1416,7 @@ pub struct JournalNodeConfig { impl JournalNodeConfigFragment { const DEFAULT_JOURNAL_NODE_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1); - pub fn default_config(cluster_name: &str, role: &HdfsRole) -> Self { + pub fn default_config(cluster_name: &str, role: &HdfsNodeRole) -> Self { Self { resources: ResourcesFragment { cpu: CpuLimitsFragment { @@ -1418,7 +1446,7 @@ impl JournalNodeConfigFragment { } impl Configuration for JournalNodeConfigFragment { - type Configurable = HdfsCluster; + type Configurable = v1alpha1::HdfsCluster; fn compute_env( &self, @@ -1461,42 +1489,14 @@ pub struct HdfsClusterStatus { pub upgrade_target_product_version: Option, } -impl HasStatusCondition for HdfsCluster { - fn conditions(&self) -> Vec { - match &self.status { - Some(status) => status.conditions.clone(), - None => vec![], - } - } -} - -// TODO: upstream? -pub trait LoggingExt { - type Container; - fn for_container(&self, container: &Self::Container) -> Cow; -} -impl LoggingExt for Logging -where - T: Ord + Clone + Display, -{ - type Container = T; - - fn for_container(&self, container: &Self::Container) -> Cow { - self.containers - .get(container) - .map(Cow::Borrowed) - .unwrap_or_default() - } -} - #[cfg(test)] mod test { use stackable_operator::k8s_openapi::{ api::core::v1::ResourceRequirements, apimachinery::pkg::api::resource::Quantity, }; - use super::{HdfsCluster, HdfsRole}; - use crate::storage::HdfsStorageType; + use super::*; + use crate::crd::storage::HdfsStorageType; #[test] pub fn test_pvc_rolegroup_from_yaml() { @@ -1522,8 +1522,8 @@ spec: replicas: 1 "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); - let role = HdfsRole::DataNode; + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let role = HdfsNodeRole::Data; let config = &role.merged_config(&hdfs, "default").unwrap(); let resources = &config.as_datanode().unwrap().resources; let pvc = resources.storage.get("data").unwrap(); @@ -1557,8 +1557,8 @@ spec: replicas: 1 "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); - let role = HdfsRole::DataNode; + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let role = HdfsNodeRole::Data; let config = &role.merged_config(&hdfs, "default").unwrap(); let resources = &config.as_datanode().unwrap().resources; let pvc = resources.storage.get("data").unwrap(); @@ -1587,8 +1587,8 @@ spec: replicas: 1 "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); - let role = HdfsRole::DataNode; + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let role = HdfsNodeRole::Data; let config = role.merged_config(&hdfs, "default").unwrap(); let resources = &config.as_datanode().unwrap().resources; let pvc = resources.storage.get("data").unwrap(); @@ -1641,9 +1641,9 @@ spec: replicas: 1"; let deserializer = serde_yaml::Deserializer::from_str(cr); - let hdfs: HdfsCluster = + let hdfs: v1alpha1::HdfsCluster = serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); - let role = HdfsRole::DataNode; + let role = HdfsNodeRole::Data; let config = &role.merged_config(&hdfs, "default").unwrap(); let resources = &config.as_datanode().unwrap().resources; @@ -1689,8 +1689,8 @@ spec: replicas: 1 "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); - let role = HdfsRole::DataNode; + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let role = HdfsNodeRole::Data; let rr: ResourceRequirements = role .merged_config(&hdfs, "default") .unwrap() @@ -1744,8 +1744,8 @@ spec: min: '250m' "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); - let role = HdfsRole::DataNode; + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let role = HdfsNodeRole::Data; let rr: ResourceRequirements = role .merged_config(&hdfs, "default") .unwrap() @@ -1797,7 +1797,7 @@ spec: replicas: 42 "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); assert_eq!(hdfs.num_datanodes(), 45); } @@ -1832,7 +1832,7 @@ spec: replicas: 1"; let deserializer = serde_yaml::Deserializer::from_str(cr); - let hdfs: HdfsCluster = + let hdfs: v1alpha1::HdfsCluster = serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); let rack_awareness = hdfs.rackawareness_config(); diff --git a/rust/crd/src/security.rs b/rust/operator-binary/src/crd/security.rs similarity index 100% rename from rust/crd/src/security.rs rename to rust/operator-binary/src/crd/security.rs diff --git a/rust/crd/src/storage.rs b/rust/operator-binary/src/crd/storage.rs similarity index 99% rename from rust/crd/src/storage.rs rename to rust/operator-binary/src/crd/storage.rs index 2abe6788..7353405a 100644 --- a/rust/crd/src/storage.rs +++ b/rust/operator-binary/src/crd/storage.rs @@ -11,7 +11,7 @@ use stackable_operator::{ schemars::{self, JsonSchema}, }; -use crate::constants::*; +use crate::crd::constants::{DATANODE_ROOT_DATA_DIR_PREFIX, DATANODE_ROOT_DATA_DIR_SUFFIX}; #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index c41d8b75..dc09b5e4 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -1,8 +1,4 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{ - constants::{CORE_SITE_XML, HDFS_SITE_XML}, - HdfsCluster, HdfsPodRef, HdfsRole, -}; use stackable_operator::{ builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, @@ -14,6 +10,10 @@ use stackable_operator::{ use crate::{ build_recommended_labels, config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}, + crd::{ + constants::{CORE_SITE_XML, HDFS_SITE_XML}, + v1alpha1, HdfsNodeRole, HdfsPodRef, + }, security::kerberos, }; @@ -25,7 +25,7 @@ pub enum Error { #[snafu(display("object {hdfs} is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, - hdfs: ObjectRef, + hdfs: ObjectRef, }, #[snafu(display("failed to build ConfigMap"))] @@ -45,7 +45,7 @@ pub enum Error { /// Creates a discovery config map containing the `hdfs-site.xml` and `core-site.xml` /// for clients. pub fn build_discovery_configmap( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, controller: &str, namenode_podrefs: &[HdfsPodRef], @@ -61,7 +61,7 @@ pub fn build_discovery_configmap( hdfs, controller, &resolved_product_image.app_version_label, - &HdfsRole::NameNode.to_string(), + &HdfsNodeRole::Name.to_string(), "discovery", )) .context(ObjectMetaSnafu)? @@ -82,7 +82,7 @@ pub fn build_discovery_configmap( } fn build_discovery_hdfs_site_xml( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, logical_name: String, namenode_podrefs: &[HdfsPodRef], @@ -98,7 +98,7 @@ fn build_discovery_hdfs_site_xml( } fn build_discovery_core_site_xml( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, logical_name: String, ) -> Result { diff --git a/rust/operator-binary/src/event.rs b/rust/operator-binary/src/event.rs index dfc01a70..e0b65f58 100644 --- a/rust/operator-binary/src/event.rs +++ b/rust/operator-binary/src/event.rs @@ -1,12 +1,14 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{HdfsCluster, HdfsRole}; use stackable_operator::{ k8s_openapi::api::core::v1::ObjectReference, kube::runtime::events::{Event, EventType}, }; use strum::{EnumDiscriminants, IntoStaticStr}; -use crate::hdfs_controller::Ctx; +use crate::{ + crd::{v1alpha1, HdfsNodeRole}, + hdfs_controller::Ctx, +}; #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] @@ -41,8 +43,8 @@ pub async fn publish_warning_event( } pub fn build_invalid_replica_message( - hdfs: &HdfsCluster, - role: &HdfsRole, + hdfs: &v1alpha1::HdfsCluster, + role: &HdfsNodeRole, dfs_replication: u8, ) -> Option { let replicas: u16 = hdfs diff --git a/rust/operator-binary/src/hdfs_clusterrolebinding_nodes_controller.rs b/rust/operator-binary/src/hdfs_clusterrolebinding_nodes_controller.rs index e15e5c90..7faad530 100644 --- a/rust/operator-binary/src/hdfs_clusterrolebinding_nodes_controller.rs +++ b/rust/operator-binary/src/hdfs_clusterrolebinding_nodes_controller.rs @@ -1,8 +1,4 @@ use serde_json::json; -use stackable_hdfs_crd::{ - constants::{APP_NAME, FIELD_MANAGER_SCOPE}, - HdfsCluster, -}; use stackable_operator::{ commons::rbac::build_rbac_resources, k8s_openapi::api::rbac::v1::{ClusterRoleBinding, Subject}, @@ -19,10 +15,15 @@ use stackable_operator::{ }; use tracing::{error, info}; +use crate::crd::{ + constants::{APP_NAME, FIELD_MANAGER_SCOPE}, + v1alpha1, +}; + pub async fn reconcile( client: Client, - store: &Store>, - ev: watcher::Result>>, + store: &Store>, + ev: watcher::Result>>, ) { match ev { Ok(watcher::Event::Apply(o)) => { diff --git a/rust/operator-binary/src/hdfs_controller.rs b/rust/operator-binary/src/hdfs_controller.rs index 226b7ec2..f124209c 100644 --- a/rust/operator-binary/src/hdfs_controller.rs +++ b/rust/operator-binary/src/hdfs_controller.rs @@ -10,10 +10,6 @@ use product_config::{ ProductConfigManager, }; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_hdfs_crd::{ - constants::*, AnyNodeConfig, HdfsCluster, HdfsClusterStatus, HdfsPodRef, HdfsRole, - UpgradeState, UpgradeStateError, -}; use stackable_operator::{ builder::{ configmap::ConfigMapBuilder, @@ -58,6 +54,10 @@ use crate::{ build_recommended_labels, config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}, container::{self, ContainerConfig, TLS_STORE_DIR, TLS_STORE_PASSWORD}, + crd::{ + constants::*, v1alpha1, AnyNodeConfig, HdfsClusterStatus, HdfsNodeRole, HdfsPodRef, + UpgradeState, UpgradeStateError, + }, discovery::{self, build_discovery_configmap}, event::{build_invalid_replica_message, publish_warning_event}, operations::{ @@ -118,7 +118,7 @@ pub enum Error { #[snafu(display("no metadata for {obj_ref:?}"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, - obj_ref: ObjectRef, + obj_ref: ObjectRef, }, #[snafu(display("invalid role {role:?}"))] @@ -128,7 +128,9 @@ pub enum Error { }, #[snafu(display("object has no name"))] - ObjectHasNoName { obj_ref: ObjectRef }, + ObjectHasNoName { + obj_ref: ObjectRef, + }, #[snafu(display("cannot build config map for role {role:?} and role group {role_group:?}"))] BuildRoleGroupConfigMap { @@ -138,7 +140,7 @@ pub enum Error { }, #[snafu(display("cannot collect discovery configuration"))] - CollectDiscoveryConfig { source: stackable_hdfs_crd::Error }, + CollectDiscoveryConfig { source: crate::crd::Error }, #[snafu(display("cannot build config discovery config map"))] BuildDiscoveryConfigMap { source: discovery::Error }, @@ -164,10 +166,10 @@ pub enum Error { }, #[snafu(display("failed to create pod references"))] - CreatePodReferences { source: stackable_hdfs_crd::Error }, + CreatePodReferences { source: crate::crd::Error }, #[snafu(display("failed to build role properties"))] - BuildRoleProperties { source: stackable_hdfs_crd::Error }, + BuildRoleProperties { source: crate::crd::Error }, #[snafu(display("failed to resolve the Vector aggregator address"))] ResolveVectorAggregatorAddress { @@ -181,7 +183,7 @@ pub enum Error { }, #[snafu(display("failed to merge config"))] - ConfigMerge { source: stackable_hdfs_crd::Error }, + ConfigMerge { source: crate::crd::Error }, #[snafu(display("failed to create cluster event"))] FailedToCreateClusterEvent { source: crate::event::Error }, @@ -217,7 +219,7 @@ pub enum Error { GracefulShutdown { source: graceful_shutdown::Error }, #[snafu(display("failed to build roleGroup selector labels"))] - RoleGroupSelectorLabels { source: stackable_hdfs_crd::Error }, + RoleGroupSelectorLabels { source: crate::crd::Error }, #[snafu(display("failed to build prometheus label"))] BuildPrometheusLabel { source: LabelError }, @@ -263,7 +265,7 @@ pub struct Ctx { } pub async fn reconcile_hdfs( - hdfs: Arc>, + hdfs: Arc>, ctx: Arc, ) -> HdfsOperatorResult { tracing::info!("Starting reconcile"); @@ -301,10 +303,10 @@ pub async fn reconcile_hdfs( let hdfs_obj_ref = hdfs.object_ref(&()); // A list of all name and journal nodes across all role groups is needed for all ConfigMaps and initialization checks. let namenode_podrefs = hdfs - .pod_refs(&HdfsRole::NameNode) + .pod_refs(&HdfsNodeRole::Name) .context(CreatePodReferencesSnafu)?; let journalnode_podrefs = hdfs - .pod_refs(&HdfsRole::JournalNode) + .pod_refs(&HdfsNodeRole::Journal) .context(CreatePodReferencesSnafu)?; let mut cluster_resources = ClusterResources::new( @@ -361,7 +363,7 @@ pub async fn reconcile_hdfs( } _ => false, }, - HdfsRole::iter(), + HdfsNodeRole::iter(), ); 'roles: for role in roles { let role_name: &str = role.into(); @@ -561,10 +563,10 @@ pub async fn reconcile_hdfs( } fn rolegroup_service( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, metadata: &ObjectMetaBuilder, - role: &HdfsRole, - rolegroup_ref: &RoleGroupRef, + role: &HdfsNodeRole, + rolegroup_ref: &RoleGroupRef, ) -> HdfsOperatorResult { tracing::info!("Setting up Service for {:?}", rolegroup_ref); @@ -606,10 +608,10 @@ fn rolegroup_service( #[allow(clippy::too_many_arguments)] fn rolegroup_config_map( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, metadata: &ObjectMetaBuilder, - rolegroup_ref: &RoleGroupRef, + rolegroup_ref: &RoleGroupRef, rolegroup_config: &HashMap>, namenode_podrefs: &[HdfsPodRef], journalnode_podrefs: &[HdfsPodRef], @@ -811,11 +813,11 @@ fn rolegroup_config_map( #[allow(clippy::too_many_arguments)] fn rolegroup_statefulset( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, metadata: &ObjectMetaBuilder, - role: &HdfsRole, - rolegroup_ref: &RoleGroupRef, + role: &HdfsNodeRole, + rolegroup_ref: &RoleGroupRef, resolved_product_image: &ResolvedProductImage, env_overrides: Option<&BTreeMap>, merged_config: &AnyNodeConfig, @@ -910,7 +912,7 @@ fn rolegroup_statefulset( } pub fn error_policy( - _obj: Arc>, + _obj: Arc>, error: &Error, _ctx: Arc, ) -> Action { @@ -966,7 +968,7 @@ spec: properties: [] "; - let hdfs: HdfsCluster = serde_yaml::from_str(cr).unwrap(); + let hdfs: v1alpha1::HdfsCluster = serde_yaml::from_str(cr).unwrap(); let config = transform_all_roles_to_config(&hdfs, hdfs.build_role_properties().unwrap()).unwrap(); @@ -980,7 +982,7 @@ properties: [] ) .unwrap(); - let role = HdfsRole::DataNode; + let role = HdfsNodeRole::Data; let rolegroup_config = validated_config .get(&role.to_string()) .unwrap() diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 0b237d1a..93b0ab4a 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,11 +1,9 @@ use std::sync::Arc; -use built_info::PKG_VERSION; use clap::{crate_description, crate_version, Parser}; use futures::{pin_mut, StreamExt}; use hdfs_controller::HDFS_FULL_CONTROLLER_NAME; use product_config::ProductConfigManager; -use stackable_hdfs_crd::{constants::*, HdfsCluster}; use stackable_operator::{ cli::{Command, ProductOperatorRun}, client::{self, Client}, @@ -25,13 +23,17 @@ use stackable_operator::{ kvp::ObjectLabels, logging::controller::report_controller_reconciled, namespace::WatchNamespace, - CustomResourceExt, + shared::yaml::SerializeOptions, + YamlSchema, }; use tracing::info_span; use tracing_futures::Instrument; +use crate::crd::{constants::APP_NAME, v1alpha1, HdfsCluster}; + mod config; mod container; +mod crd; mod discovery; mod event; mod hdfs_clusterrolebinding_nodes_controller; @@ -57,7 +59,8 @@ struct Opts { async fn main() -> anyhow::Result<()> { let opts = Opts::parse(); match opts.cmd { - Command::Crd => HdfsCluster::print_yaml_schema(PKG_VERSION)?, + Command::Crd => HdfsCluster::merged_crd(HdfsCluster::V1Alpha1)? + .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?, Command::Run(ProductOperatorRun { product_config, watch_namespace, @@ -113,7 +116,7 @@ pub async fn create_controller( let reflector = reflector::reflector( store_w, watcher( - Api::>::all(client.as_kube_client()), + Api::>::all(client.as_kube_client()), watcher::Config::default(), ), ) @@ -123,7 +126,7 @@ pub async fn create_controller( .collect::<()>(); let hdfs_controller = Controller::new( - namespace.get_api::>(&client), + namespace.get_api::>(&client), watcher::Config::default(), ) .owns( diff --git a/rust/operator-binary/src/operations/graceful_shutdown.rs b/rust/operator-binary/src/operations/graceful_shutdown.rs index 97fe0c0a..819d6bde 100644 --- a/rust/operator-binary/src/operations/graceful_shutdown.rs +++ b/rust/operator-binary/src/operations/graceful_shutdown.rs @@ -1,7 +1,8 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::CommonNodeConfig; use stackable_operator::builder::pod::PodBuilder; +use crate::crd::CommonNodeConfig; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("failed to set terminationGracePeriod"))] diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index bfbed8e4..e6265775 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -1,13 +1,16 @@ use std::cmp::{max, min}; use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{constants::APP_NAME, HdfsCluster, HdfsRole}; use stackable_operator::{ builder::pdb::PodDisruptionBudgetBuilder, client::Client, cluster_resources::ClusterResources, commons::pdb::PdbConfig, kube::ResourceExt, }; -use crate::{hdfs_controller::RESOURCE_MANAGER_HDFS_CONTROLLER, OPERATOR_NAME}; +use crate::{ + crd::{constants::APP_NAME, v1alpha1, HdfsNodeRole}, + hdfs_controller::RESOURCE_MANAGER_HDFS_CONTROLLER, + OPERATOR_NAME, +}; #[derive(Snafu, Debug)] pub enum Error { @@ -26,8 +29,8 @@ pub enum Error { pub async fn add_pdbs( pdb: &PdbConfig, - hdfs: &HdfsCluster, - role: &HdfsRole, + hdfs: &v1alpha1::HdfsCluster, + role: &HdfsNodeRole, client: &Client, cluster_resources: &mut ClusterResources, ) -> Result<(), Error> { @@ -35,12 +38,12 @@ pub async fn add_pdbs( return Ok(()); } let max_unavailable = pdb.max_unavailable.unwrap_or(match role { - HdfsRole::NameNode => max_unavailable_name_nodes(), - HdfsRole::DataNode => max_unavailable_data_nodes( + HdfsNodeRole::Name => max_unavailable_name_nodes(), + HdfsNodeRole::Data => max_unavailable_data_nodes( hdfs.num_datanodes(), hdfs.spec.cluster_config.dfs_replication as u16, ), - HdfsRole::JournalNode => max_unavailable_journal_nodes(), + HdfsNodeRole::Journal => max_unavailable_journal_nodes(), }); let pdb = PodDisruptionBudgetBuilder::new_with_role( hdfs, diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index e2cb4554..a411024d 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -1,7 +1,6 @@ use std::{borrow::Cow, fmt::Display}; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_hdfs_crd::{AnyNodeConfig, DataNodeContainer, HdfsCluster, NameNodeContainer}; use stackable_operator::{ builder::configmap::ConfigMapBuilder, client::Client, @@ -15,6 +14,8 @@ use stackable_operator::{ role_utils::RoleGroupRef, }; +use crate::crd::{v1alpha1, AnyNodeConfig, DataNodeContainer, NameNodeContainer}; + #[derive(Snafu, Debug)] pub enum Error { #[snafu(display("object has no namespace"))] @@ -84,7 +85,7 @@ const WAIT_FOR_NAMENODES_LOG_FILE: &str = "wait-for-namenodes.log4j.xml"; /// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the /// cluster spec pub async fn resolve_vector_aggregator_address( - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, client: &Client, ) -> Result> { let vector_aggregator_address = if let Some(vector_aggregator_config_map_name) = @@ -117,7 +118,7 @@ pub async fn resolve_vector_aggregator_address( /// Extend the role group ConfigMap with logging and Vector configurations pub fn extend_role_group_config_map( - rolegroup: &RoleGroupRef, + rolegroup: &RoleGroupRef, vector_aggregator_address: Option<&str>, merged_config: &AnyNodeConfig, cm_builder: &mut ConfigMapBuilder, diff --git a/rust/operator-binary/src/security/kerberos.rs b/rust/operator-binary/src/security/kerberos.rs index 5bb85f5a..7b26a6cd 100644 --- a/rust/operator-binary/src/security/kerberos.rs +++ b/rust/operator-binary/src/security/kerberos.rs @@ -1,14 +1,16 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{ - constants::{SSL_CLIENT_XML, SSL_SERVER_XML}, - HdfsCluster, -}; use stackable_operator::{ kube::{runtime::reflector::ObjectRef, ResourceExt}, utils::cluster_info::KubernetesClusterInfo, }; -use crate::config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}; +use crate::{ + config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}, + crd::{ + constants::{SSL_CLIENT_XML, SSL_SERVER_XML}, + v1alpha1, + }, +}; pub const KERBEROS_CONTAINER_PATH: &str = "/stackable/kerberos"; @@ -19,13 +21,13 @@ type Result = std::result::Result; pub enum Error { #[snafu(display("object has no namespace"))] ObjectHasNoNamespace { - source: stackable_hdfs_crd::Error, - obj_ref: ObjectRef, + source: crate::crd::Error, + obj_ref: ObjectRef, }, } impl HdfsSiteConfigBuilder { - pub fn security_config(&mut self, hdfs: &HdfsCluster) -> &mut Self { + pub fn security_config(&mut self, hdfs: &v1alpha1::HdfsCluster) -> &mut Self { if hdfs.has_kerberos_enabled() { self.add("dfs.block.access.token.enable", "true") .add("dfs.http.policy", "HTTPS_ONLY") @@ -37,7 +39,7 @@ impl HdfsSiteConfigBuilder { self } - pub fn security_discovery_config(&mut self, hdfs: &HdfsCluster) -> &mut Self { + pub fn security_discovery_config(&mut self, hdfs: &v1alpha1::HdfsCluster) -> &mut Self { if hdfs.has_kerberos_enabled() { // We want e.g. hbase to automatically renew the Kerberos tickets. // This shouldn't harm any other consumers. @@ -57,7 +59,7 @@ impl HdfsSiteConfigBuilder { impl CoreSiteConfigBuilder { pub fn security_config( &mut self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, ) -> Result<&mut Self> { if hdfs.authentication_config().is_some() { @@ -126,7 +128,7 @@ impl CoreSiteConfigBuilder { pub fn security_discovery_config( &mut self, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, cluster_info: &KubernetesClusterInfo, ) -> Result<&mut Self> { if hdfs.has_kerberos_enabled() { @@ -169,7 +171,10 @@ impl CoreSiteConfigBuilder { /// ``` /// /// After we have switched to using the following principals everything worked without problems -fn principal_host_part(hdfs: &HdfsCluster, cluster_info: &KubernetesClusterInfo) -> Result { +fn principal_host_part( + hdfs: &v1alpha1::HdfsCluster, + cluster_info: &KubernetesClusterInfo, +) -> Result { let hdfs_name = hdfs.name_any(); let hdfs_namespace = hdfs .namespace_or_error() diff --git a/rust/operator-binary/src/security/opa.rs b/rust/operator-binary/src/security/opa.rs index 151ba79f..02eb0acf 100644 --- a/rust/operator-binary/src/security/opa.rs +++ b/rust/operator-binary/src/security/opa.rs @@ -1,8 +1,10 @@ use snafu::{ResultExt, Snafu}; -use stackable_hdfs_crd::{security::AuthorizationConfig, HdfsCluster}; use stackable_operator::{client::Client, commons::opa::OpaApiVersion}; -use crate::config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}; +use crate::{ + config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder}, + crd::{security::AuthorizationConfig, v1alpha1}, +}; #[derive(Snafu, Debug)] pub enum Error { @@ -21,7 +23,7 @@ pub struct HdfsOpaConfig { impl HdfsOpaConfig { pub async fn from_opa_config( client: &Client, - hdfs: &HdfsCluster, + hdfs: &v1alpha1::HdfsCluster, authorization_config: &AuthorizationConfig, ) -> Result { let authorization_connection_string = authorization_config