Skip to content

feat: Support configuring JVM arguments #819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ All notable changes to this project will be documented in this file.
config property `requestedSecretLifetime`. This helps reduce frequent Pod restarts ([#796]).
- Run a `containerdebug` process in the background of each Kafka container to collect debugging information ([#803]).
- Aggregate emitted Kubernetes events on the CustomResources ([#809]).
- Support configuring JVM arguments ([#819]).

[#796]: https://github.com/stackabletech/kafka-operator/pull/796
[#803]: https://github.com/stackabletech/kafka-operator/pull/803
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
[#813]: https://github.com/stackabletech/kafka-operator/pull/813
[#819]: https://github.com/stackabletech/kafka-operator/pull/819

## [24.11.1] - 2025-01-10

Expand Down
52 changes: 52 additions & 0 deletions deploy/helm/kafka-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,32 @@ spec:
default: {}
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
type: object
jvmArgumentOverrides:
default:
add: []
remove: []
removeRegex: []
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
properties:
add:
default: []
description: JVM arguments to be added
items:
type: string
type: array
remove:
default: []
description: JVM arguments to be removed by exact match
items:
type: string
type: array
removeRegex:
default: []
description: JVM arguments matching any of this regexes will be removed
items:
type: string
type: array
type: object
podOverrides:
default: {}
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
Expand Down Expand Up @@ -541,6 +567,32 @@ spec:
default: {}
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
type: object
jvmArgumentOverrides:
default:
add: []
remove: []
removeRegex: []
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
properties:
add:
default: []
description: JVM arguments to be added
items:
type: string
type: array
remove:
default: []
description: JVM arguments to be removed by exact match
items:
type: string
type: array
removeRegex:
default: []
description: JVM arguments matching any of this regexes will be removed
items:
type: string
type: array
type: object
podOverrides:
default: {}
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,10 @@ servers:

The Kafka operator also supports Pod overrides, allowing you to override any property that you can set on a Kubernetes Pod.
Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides documentation] to learn more about this feature.

== JVM argument overrides

Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores.
Using JVM argument overrides you can configure the JVM arguments xref:concepts:overrides.adoc#jvm-argument-overrides[according to the concepts page].

One thing that is different for Kafka, is that all head-related arguments will be passed in via the env variable `KAFKA_HEAP_OPTS`, all the other ones via `EXTRA_ARGS`.
2 changes: 1 addition & 1 deletion docs/modules/kafka/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
** xref:kafka:usage-guide/security.adoc[]
** xref:kafka:usage-guide/monitoring.adoc[]
** xref:kafka:usage-guide/logging.adoc[]
** xref:kafka:usage-guide/configuration-environment-overrides.adoc[]
** xref:kafka:usage-guide/overrides.adoc[]
** xref:kafka:usage-guide/operations/index.adoc[]
*** xref:kafka:usage-guide/operations/cluster-operations.adoc[]
*** xref:kafka:usage-guide/operations/pod-placement.adoc[]
Expand Down
13 changes: 7 additions & 6 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use stackable_operator::{
kube::{runtime::reflector::ObjectRef, CustomResource, ResourceExt},
product_config_utils::Configuration,
product_logging::{self, spec::Logging},
role_utils::{
GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, RoleGroupRef,
},
role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef},
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
time::Duration,
Expand Down Expand Up @@ -129,7 +127,7 @@ pub struct KafkaClusterSpec {
pub image: ProductImage,

// no doc - docs in Role struct.
pub brokers: Option<Role<KafkaConfigFragment>>,
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,

/// Kafka settings that affect all roles and role groups.
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
Expand Down Expand Up @@ -191,7 +189,10 @@ impl KafkaCluster {
}
}

pub fn role(&self, role_variant: &KafkaRole) -> Result<&Role<KafkaConfigFragment>, Error> {
pub fn role(
&self,
role_variant: &KafkaRole,
) -> Result<&Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>, Error> {
match role_variant {
KafkaRole::Broker => self.spec.brokers.as_ref(),
}
Expand All @@ -203,7 +204,7 @@ impl KafkaCluster {
pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
) -> Result<&RoleGroup<KafkaConfigFragment, GenericProductSpecificCommonConfig>, Error> {
) -> Result<&RoleGroup<KafkaConfigFragment, JavaCommonConfig>, Error> {
let role_variant =
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
role: rolegroup_ref.role.to_owned(),
Expand Down
200 changes: 200 additions & 0 deletions rust/operator-binary/src/config/jvm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_kafka_crd::{
KafkaConfig, KafkaConfigFragment, JVM_SECURITY_PROPERTIES_FILE, METRICS_PORT,
STACKABLE_CONFIG_DIR,
};
use stackable_operator::{
memory::{BinaryMultiple, MemoryQuantity},
role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role},
};

const JAVA_HEAP_FACTOR: f32 = 0.8;

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("invalid memory resource configuration - missing default or value in crd?"))]
MissingMemoryResourceConfig,

#[snafu(display("invalid memory config"))]
InvalidMemoryConfig {
source: stackable_operator::memory::Error,
},

#[snafu(display("failed to merge jvm argument overrides"))]
MergeJvmArgumentOverrides { source: role_utils::Error },
}

/// All JVM arguments.
fn construct_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<Vec<String>, Error> {
let heap_size = MemoryQuantity::try_from(
merged_config
.resources
.memory
.limit
.as_ref()
.context(MissingMemoryResourceConfigSnafu)?,
)
.context(InvalidMemoryConfigSnafu)?
.scale_to(BinaryMultiple::Mebi)
* JAVA_HEAP_FACTOR;
let java_heap = heap_size
.format_for_java()
.context(InvalidMemoryConfigSnafu)?;

let jvm_args = vec![
// Heap settings
format!("-Xmx{java_heap}"),
format!("-Xms{java_heap}"),
format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml")
];

let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args);
let merged = role
.get_merged_jvm_argument_overrides(role_group, &operator_generated)
.context(MergeJvmArgumentOverridesSnafu)?;
Ok(merged
.effective_jvm_config_after_merging()
// Sorry for the clone, that's how operator-rs is currently modelled :P
.clone())
}

/// Arguments that go into `EXTRA_ARGS`, so *not* the heap settings (which you cen get using
/// [`construct_heap_jvm_args`]).
pub fn construct_non_heap_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<String, Error> {
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
jvm_args.retain(|arg| !is_heap_jvm_argument(arg));

Ok(jvm_args.join(" "))
}

/// Arguments that go into `KAFKA_HEAP_OPTS`.
/// You can get the normal JVM arguments using [`construct_non_heap_jvm_args`].
pub fn construct_heap_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<String, Error> {
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
jvm_args.retain(|arg| is_heap_jvm_argument(arg));

Ok(jvm_args.join(" "))
}

fn is_heap_jvm_argument(jvm_argument: &str) -> bool {
let lowercase = jvm_argument.to_lowercase();

lowercase.starts_with("-xms") || lowercase.starts_with("-xmx")
}

#[cfg(test)]
mod tests {
use stackable_kafka_crd::{KafkaCluster, KafkaRole};

use super::*;

#[test]
fn test_construct_jvm_arguments_defaults() {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.7.1
clusterConfig:
zookeeperConfigMapName: xyz
brokers:
roleGroups:
default:
replicas: 1
"#;
let (kafka_role, role, merged_config) = construct_boilerplate(input);
let non_heap_jvm_args =
construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();

assert_eq!(
non_heap_jvm_args,
"-Djava.security.properties=/stackable/config/security.properties \
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml"
);
assert_eq!(heap_jvm_args, "-Xmx819m -Xms819m");
}

#[test]
fn test_construct_jvm_argument_overrides() {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.7.1
clusterConfig:
zookeeperConfigMapName: xyz
brokers:
config:
resources:
memory:
limit: 42Gi
jvmArgumentOverrides:
add:
- -Dhttps.proxyHost=proxy.my.corp
- -Dhttps.proxyPort=8080
- -Djava.net.preferIPv4Stack=true
roleGroups:
default:
replicas: 1
jvmArgumentOverrides:
# We need more memory!
removeRegex:
- -Xmx.*
- -Dhttps.proxyPort=.*
add:
- -Xmx40000m
- -Dhttps.proxyPort=1234
"#;
let (kafka_role, role, merged_config) = construct_boilerplate(input);
let non_heap_jvm_args =
construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();

assert_eq!(
non_heap_jvm_args,
"-Djava.security.properties=/stackable/config/security.properties \
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml \
-Dhttps.proxyHost=proxy.my.corp \
-Djava.net.preferIPv4Stack=true \
-Dhttps.proxyPort=1234"
);
assert_eq!(heap_jvm_args, "-Xms34406m -Xmx40000m");
}

fn construct_boilerplate(
kafka_cluster: &str,
) -> (
KafkaConfig,
Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
String,
) {
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");

let kafka_role = KafkaRole::Broker;
let rolegroup_ref = kafka.broker_rolegroup_ref("default");
let merged_config = kafka.merged_config(&kafka_role, &rolegroup_ref).unwrap();
let role = kafka.spec.brokers.unwrap();

(merged_config, role, "default".to_owned())
}
}
1 change: 1 addition & 0 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod jvm;
Loading