diff --git a/CHANGELOG.md b/CHANGELOG.md index 5acafbcb..33e2bb0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. config property `requestedSecretLifetime`. This helps reduce frequent Pod restarts ([#796]). - Run a `containerdebug` process in the background of each Kafka container to collect debugging information ([#803]). - Aggregate emitted Kubernetes events on the CustomResources ([#809]). +- Support configuring JVM arguments ([#819]). ### Changed @@ -25,6 +26,7 @@ All notable changes to this project will be documented in this file. [#809]: https://github.com/stackabletech/kafka-operator/pull/809 [#810]: https://github.com/stackabletech/kafka-operator/pull/810 [#813]: https://github.com/stackabletech/kafka-operator/pull/813 +[#819]: https://github.com/stackabletech/kafka-operator/pull/819 ## [24.11.1] - 2025-01-10 diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index eeec1528..f4755372 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -266,6 +266,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. @@ -541,6 +567,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. diff --git a/docs/modules/kafka/pages/usage-guide/configuration-environment-overrides.adoc b/docs/modules/kafka/pages/usage-guide/overrides.adoc similarity index 86% rename from docs/modules/kafka/pages/usage-guide/configuration-environment-overrides.adoc rename to docs/modules/kafka/pages/usage-guide/overrides.adoc index a6d4ac83..2abd5543 100644 --- a/docs/modules/kafka/pages/usage-guide/configuration-environment-overrides.adoc +++ b/docs/modules/kafka/pages/usage-guide/overrides.adoc @@ -95,3 +95,10 @@ servers: The Kafka operator also supports Pod overrides, allowing you to override any property that you can set on a Kubernetes Pod. Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides documentation] to learn more about this feature. + +== JVM argument overrides + +Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores. +Using JVM argument overrides you can configure the JVM arguments xref:concepts:overrides.adoc#jvm-argument-overrides[according to the concepts page]. + +One thing that is different for Kafka, is that all heap-related arguments will be passed in via the env variable `KAFKA_HEAP_OPTS`, all the other ones via `EXTRA_ARGS`. diff --git a/docs/modules/kafka/partials/nav.adoc b/docs/modules/kafka/partials/nav.adoc index ed7f9352..7de36dbd 100644 --- a/docs/modules/kafka/partials/nav.adoc +++ b/docs/modules/kafka/partials/nav.adoc @@ -7,7 +7,7 @@ ** xref:kafka:usage-guide/security.adoc[] ** xref:kafka:usage-guide/monitoring.adoc[] ** xref:kafka:usage-guide/logging.adoc[] -** xref:kafka:usage-guide/configuration-environment-overrides.adoc[] +** xref:kafka:usage-guide/overrides.adoc[] ** xref:kafka:usage-guide/operations/index.adoc[] *** xref:kafka:usage-guide/operations/cluster-operations.adoc[] *** xref:kafka:usage-guide/operations/pod-placement.adoc[] diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index c4b96c7e..a8d6609b 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -32,9 +32,7 @@ use stackable_operator::{ kube::{runtime::reflector::ObjectRef, CustomResource, ResourceExt}, product_config_utils::Configuration, product_logging::{self, spec::Logging}, - role_utils::{ - GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, RoleGroupRef, - }, + role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef}, schemars::{self, JsonSchema}, status::condition::{ClusterCondition, HasStatusCondition}, time::Duration, @@ -129,7 +127,7 @@ pub struct KafkaClusterSpec { pub image: ProductImage, // no doc - docs in Role struct. - pub brokers: Option>, + pub brokers: Option>, /// Kafka settings that affect all roles and role groups. /// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level. @@ -191,7 +189,10 @@ impl KafkaCluster { } } - pub fn role(&self, role_variant: &KafkaRole) -> Result<&Role, Error> { + pub fn role( + &self, + role_variant: &KafkaRole, + ) -> Result<&Role, Error> { match role_variant { KafkaRole::Broker => self.spec.brokers.as_ref(), } @@ -203,7 +204,7 @@ impl KafkaCluster { pub fn rolegroup( &self, rolegroup_ref: &RoleGroupRef, - ) -> Result<&RoleGroup, Error> { + ) -> Result<&RoleGroup, Error> { let role_variant = KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu { role: rolegroup_ref.role.to_owned(), diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs new file mode 100644 index 00000000..233f7635 --- /dev/null +++ b/rust/operator-binary/src/config/jvm.rs @@ -0,0 +1,200 @@ +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_kafka_crd::{ + KafkaConfig, KafkaConfigFragment, JVM_SECURITY_PROPERTIES_FILE, METRICS_PORT, + STACKABLE_CONFIG_DIR, +}; +use stackable_operator::{ + memory::{BinaryMultiple, MemoryQuantity}, + role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role}, +}; + +const JAVA_HEAP_FACTOR: f32 = 0.8; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("invalid memory resource configuration - missing default or value in crd?"))] + MissingMemoryResourceConfig, + + #[snafu(display("invalid memory config"))] + InvalidMemoryConfig { + source: stackable_operator::memory::Error, + }, + + #[snafu(display("failed to merge jvm argument overrides"))] + MergeJvmArgumentOverrides { source: role_utils::Error }, +} + +/// All JVM arguments. +fn construct_jvm_args( + merged_config: &KafkaConfig, + role: &Role, + role_group: &str, +) -> Result, Error> { + let heap_size = MemoryQuantity::try_from( + merged_config + .resources + .memory + .limit + .as_ref() + .context(MissingMemoryResourceConfigSnafu)?, + ) + .context(InvalidMemoryConfigSnafu)? + .scale_to(BinaryMultiple::Mebi) + * JAVA_HEAP_FACTOR; + let java_heap = heap_size + .format_for_java() + .context(InvalidMemoryConfigSnafu)?; + + let jvm_args = vec![ + // Heap settings + format!("-Xmx{java_heap}"), + format!("-Xms{java_heap}"), + format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"), + format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml") + ]; + + let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args); + let merged = role + .get_merged_jvm_argument_overrides(role_group, &operator_generated) + .context(MergeJvmArgumentOverridesSnafu)?; + Ok(merged + .effective_jvm_config_after_merging() + // Sorry for the clone, that's how operator-rs is currently modelled :P + .clone()) +} + +/// Arguments that go into `EXTRA_ARGS`, so *not* the heap settings (which you can get using +/// [`construct_heap_jvm_args`]). +pub fn construct_non_heap_jvm_args( + merged_config: &KafkaConfig, + role: &Role, + role_group: &str, +) -> Result { + let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?; + jvm_args.retain(|arg| !is_heap_jvm_argument(arg)); + + Ok(jvm_args.join(" ")) +} + +/// Arguments that go into `KAFKA_HEAP_OPTS`. +/// You can get the normal JVM arguments using [`construct_non_heap_jvm_args`]. +pub fn construct_heap_jvm_args( + merged_config: &KafkaConfig, + role: &Role, + role_group: &str, +) -> Result { + let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?; + jvm_args.retain(|arg| is_heap_jvm_argument(arg)); + + Ok(jvm_args.join(" ")) +} + +fn is_heap_jvm_argument(jvm_argument: &str) -> bool { + let lowercase = jvm_argument.to_lowercase(); + + lowercase.starts_with("-xms") || lowercase.starts_with("-xmx") +} + +#[cfg(test)] +mod tests { + use stackable_kafka_crd::{KafkaCluster, KafkaRole}; + + use super::*; + + #[test] + fn test_construct_jvm_arguments_defaults() { + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + image: + productVersion: 3.7.1 + clusterConfig: + zookeeperConfigMapName: xyz + brokers: + roleGroups: + default: + replicas: 1 + "#; + let (kafka_role, role, merged_config) = construct_boilerplate(input); + let non_heap_jvm_args = + construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap(); + let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap(); + + assert_eq!( + non_heap_jvm_args, + "-Djava.security.properties=/stackable/config/security.properties \ + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml" + ); + assert_eq!(heap_jvm_args, "-Xmx819m -Xms819m"); + } + + #[test] + fn test_construct_jvm_argument_overrides() { + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + image: + productVersion: 3.7.1 + clusterConfig: + zookeeperConfigMapName: xyz + brokers: + config: + resources: + memory: + limit: 42Gi + jvmArgumentOverrides: + add: + - -Dhttps.proxyHost=proxy.my.corp + - -Dhttps.proxyPort=8080 + - -Djava.net.preferIPv4Stack=true + roleGroups: + default: + replicas: 1 + jvmArgumentOverrides: + # We need more memory! + removeRegex: + - -Xmx.* + - -Dhttps.proxyPort=.* + add: + - -Xmx40000m + - -Dhttps.proxyPort=1234 + "#; + let (merged_config, role, role_group) = construct_boilerplate(input); + let non_heap_jvm_args = + construct_non_heap_jvm_args(&merged_config, &role, &role_group).unwrap(); + let heap_jvm_args = construct_heap_jvm_args(&merged_config, &role, &role_group).unwrap(); + + assert_eq!( + non_heap_jvm_args, + "-Djava.security.properties=/stackable/config/security.properties \ + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml \ + -Dhttps.proxyHost=proxy.my.corp \ + -Djava.net.preferIPv4Stack=true \ + -Dhttps.proxyPort=1234" + ); + assert_eq!(heap_jvm_args, "-Xms34406m -Xmx40000m"); + } + + fn construct_boilerplate( + kafka_cluster: &str, + ) -> ( + KafkaConfig, + Role, + String, + ) { + let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input"); + + let kafka_role = KafkaRole::Broker; + let rolegroup_ref = kafka.broker_rolegroup_ref("default"); + let merged_config = kafka.merged_config(&kafka_role, &rolegroup_ref).unwrap(); + let role = kafka.spec.brokers.unwrap(); + + (merged_config, role, "default".to_owned()) + } +} diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs new file mode 100644 index 00000000..271c6d99 --- /dev/null +++ b/rust/operator-binary/src/config/mod.rs @@ -0,0 +1 @@ +pub mod jvm; diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 5611a83c..093d36d8 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -63,7 +63,6 @@ use stackable_operator::{ }, kvp::{Label, Labels}, logging::controller::ReconcilerError, - memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, product_logging::{ self, @@ -84,6 +83,7 @@ use stackable_operator::{ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ + config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args}, discovery::{self, build_discovery_configmaps}, kerberos::{self, add_kerberos_pod_config}, operations::{ @@ -102,7 +102,6 @@ pub const KAFKA_FULL_CONTROLLER_NAME: &str = concatcp!(KAFKA_CONTROLLER_NAME, '. /// Used as runAsUser in the pod security context. This is specified in the kafka image file pub const KAFKA_UID: i64 = 1000; -const JAVA_HEAP_RATIO: f32 = 0.8; pub struct Ctx { pub client: stackable_operator::client::Client, @@ -246,11 +245,6 @@ pub enum Error { source: stackable_kafka_crd::security::Error, }, - #[snafu(display("invalid memory resource configuration"))] - InvalidHeapConfig { - source: stackable_operator::memory::Error, - }, - #[snafu(display("failed to create cluster resources"))] CreateClusterResources { source: stackable_operator::cluster_resources::Error, @@ -359,6 +353,9 @@ pub enum Error { InvalidKafkaCluster { source: error_boundary::InvalidObject, }, + + #[snafu(display("failed to construct JVM arguments"))] + ConstructJvmArguments { source: crate::config::jvm::Error }, } type Result = std::result::Result; @@ -401,7 +398,6 @@ impl ReconcilerError for Error { Error::InvalidContainerName { .. } => None, Error::DeleteOrphans { .. } => None, Error::FailedToInitializeSecurityContext { .. } => None, - Error::InvalidHeapConfig { .. } => None, Error::CreateClusterResources { .. } => None, Error::FailedToResolveConfig { .. } => None, Error::ResolveVectorAggregatorAddress { .. } => None, @@ -425,6 +421,7 @@ impl ReconcilerError for Error { Error::AddKerberosConfig { .. } => None, Error::FailedToValidateAuthenticationMethod { .. } => None, Error::InvalidKafkaCluster { .. } => None, + Error::ConstructJvmArguments { .. } => None, } } } @@ -920,24 +917,6 @@ fn build_broker_rolegroup_statefulset( }) .collect::>(); - if let Some(memory_limit) = merged_config.resources.memory.limit.as_ref() { - let heap_size = MemoryQuantity::try_from(memory_limit) - .context(InvalidHeapConfigSnafu)? - .scale_to(BinaryMultiple::Mebi) - * JAVA_HEAP_RATIO; - - env.push(EnvVar { - name: KAFKA_HEAP_OPTS.to_string(), - value: Some(format!( - "-Xmx{heap}", - heap = heap_size - .format_for_java() - .context(InvalidHeapConfigSnafu)? - )), - ..EnvVar::default() - }); - } - env.push(EnvVar { name: "ZOOKEEPER".to_string(), value_from: Some(EnvVarSource { @@ -963,17 +942,6 @@ fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - // Needed for the `containerdebug` process to log it's tracing information to. - env.push(EnvVar { - name: "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), - value: Some(format!("{STACKABLE_LOG_DIR}/containerdebug")), - value_from: None, - }); - - let jvm_args = format!( - "-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE} -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml", - ); - let kafka_listeners = get_kafka_listener_config( kafka, kafka_security, @@ -998,11 +966,25 @@ fn build_broker_rolegroup_statefulset( kafka_security.has_kerberos_enabled(), ) .join("\n")]) - .add_env_var("EXTRA_ARGS", jvm_args) + .add_env_var( + "EXTRA_ARGS", + construct_non_heap_jvm_args(merged_config, role, &rolegroup_ref.role_group) + .context(ConstructJvmArgumentsSnafu)?, + ) + .add_env_var( + KAFKA_HEAP_OPTS, + construct_heap_jvm_args(merged_config, role, &rolegroup_ref.role_group) + .context(ConstructJvmArgumentsSnafu)?, + ) .add_env_var( "KAFKA_LOG4J_OPTS", format!("-Dlog4j.configuration=file:{STACKABLE_LOG_CONFIG_DIR}/{LOG4J_CONFIG_FILE}"), ) + // Needed for the `containerdebug` process to log it's tracing information to. + .add_env_var( + "CONTAINERDEBUG_LOG_DIRECTORY", + format!("{STACKABLE_LOG_DIR}/containerdebug"), + ) .add_env_vars(env) .add_container_ports(container_ports(kafka_security)) .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 8db6cca5..797e0e15 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -27,6 +27,7 @@ use stackable_operator::{ use crate::kafka_controller::KAFKA_FULL_CONTROLLER_NAME; +mod config; mod discovery; mod kafka_controller; mod kerberos; diff --git a/tests/templates/kuttl/smoke/test_heap.sh b/tests/templates/kuttl/smoke/test_heap.sh index 0b41df50..71193850 100755 --- a/tests/templates/kuttl/smoke/test_heap.sh +++ b/tests/templates/kuttl/smoke/test_heap.sh @@ -2,7 +2,7 @@ # Usage: test_heap.sh # 1Gi * 0.8 -> 819 -EXPECTED_HEAP=-"Xmx819m" +EXPECTED_HEAP="-Xmx819m -Xms819m" # Check if ZK_SERVER_HEAP is set to the correct calculated value if [[ $KAFKA_HEAP_OPTS == "$EXPECTED_HEAP" ]]