Skip to content

Commit d5a9d39

Browse files
feat: Support configuring JVM arguments (#819)
* feat: Support configuring JVM arguments * Add test * Update mem detection * Fix rustdoc * Add docs * Update docs/modules/kafka/pages/usage-guide/overrides.adoc * Fix variable names in test * Update rust/operator-binary/src/config/jvm.rs Co-authored-by: Malte Sander <contact@maltesander.com> --------- Co-authored-by: Malte Sander <contact@maltesander.com>
1 parent 91d0a20 commit d5a9d39

File tree

10 files changed

+292
-46
lines changed

10 files changed

+292
-46
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file.
1515
config property `requestedSecretLifetime`. This helps reduce frequent Pod restarts ([#796]).
1616
- Run a `containerdebug` process in the background of each Kafka container to collect debugging information ([#803]).
1717
- Aggregate emitted Kubernetes events on the CustomResources ([#809]).
18+
- Support configuring JVM arguments ([#819]).
1819

1920
### Changed
2021

@@ -25,6 +26,7 @@ All notable changes to this project will be documented in this file.
2526
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
2627
[#810]: https://github.com/stackabletech/kafka-operator/pull/810
2728
[#813]: https://github.com/stackabletech/kafka-operator/pull/813
29+
[#819]: https://github.com/stackabletech/kafka-operator/pull/819
2830

2931
## [24.11.1] - 2025-01-10
3032

deploy/helm/kafka-operator/crds/crds.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,32 @@ spec:
266266
default: {}
267267
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
268268
type: object
269+
jvmArgumentOverrides:
270+
default:
271+
add: []
272+
remove: []
273+
removeRegex: []
274+
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
275+
properties:
276+
add:
277+
default: []
278+
description: JVM arguments to be added
279+
items:
280+
type: string
281+
type: array
282+
remove:
283+
default: []
284+
description: JVM arguments to be removed by exact match
285+
items:
286+
type: string
287+
type: array
288+
removeRegex:
289+
default: []
290+
description: JVM arguments matching any of this regexes will be removed
291+
items:
292+
type: string
293+
type: array
294+
type: object
269295
podOverrides:
270296
default: {}
271297
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
@@ -541,6 +567,32 @@ spec:
541567
default: {}
542568
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
543569
type: object
570+
jvmArgumentOverrides:
571+
default:
572+
add: []
573+
remove: []
574+
removeRegex: []
575+
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
576+
properties:
577+
add:
578+
default: []
579+
description: JVM arguments to be added
580+
items:
581+
type: string
582+
type: array
583+
remove:
584+
default: []
585+
description: JVM arguments to be removed by exact match
586+
items:
587+
type: string
588+
type: array
589+
removeRegex:
590+
default: []
591+
description: JVM arguments matching any of this regexes will be removed
592+
items:
593+
type: string
594+
type: array
595+
type: object
544596
podOverrides:
545597
default: {}
546598
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,10 @@ servers:
9595

9696
The Kafka operator also supports Pod overrides, allowing you to override any property that you can set on a Kubernetes Pod.
9797
Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides documentation] to learn more about this feature.
98+
99+
== JVM argument overrides
100+
101+
Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores.
102+
Using JVM argument overrides you can configure the JVM arguments xref:concepts:overrides.adoc#jvm-argument-overrides[according to the concepts page].
103+
104+
One thing that is different for Kafka, is that all heap-related arguments will be passed in via the env variable `KAFKA_HEAP_OPTS`, all the other ones via `EXTRA_ARGS`.

docs/modules/kafka/partials/nav.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
** xref:kafka:usage-guide/security.adoc[]
88
** xref:kafka:usage-guide/monitoring.adoc[]
99
** xref:kafka:usage-guide/logging.adoc[]
10-
** xref:kafka:usage-guide/configuration-environment-overrides.adoc[]
10+
** xref:kafka:usage-guide/overrides.adoc[]
1111
** xref:kafka:usage-guide/operations/index.adoc[]
1212
*** xref:kafka:usage-guide/operations/cluster-operations.adoc[]
1313
*** xref:kafka:usage-guide/operations/pod-placement.adoc[]

rust/crd/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ use stackable_operator::{
3232
kube::{runtime::reflector::ObjectRef, CustomResource, ResourceExt},
3333
product_config_utils::Configuration,
3434
product_logging::{self, spec::Logging},
35-
role_utils::{
36-
GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, RoleGroupRef,
37-
},
35+
role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef},
3836
schemars::{self, JsonSchema},
3937
status::condition::{ClusterCondition, HasStatusCondition},
4038
time::Duration,
@@ -129,7 +127,7 @@ pub struct KafkaClusterSpec {
129127
pub image: ProductImage,
130128

131129
// no doc - docs in Role struct.
132-
pub brokers: Option<Role<KafkaConfigFragment>>,
130+
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,
133131

134132
/// Kafka settings that affect all roles and role groups.
135133
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
@@ -191,7 +189,10 @@ impl KafkaCluster {
191189
}
192190
}
193191

194-
pub fn role(&self, role_variant: &KafkaRole) -> Result<&Role<KafkaConfigFragment>, Error> {
192+
pub fn role(
193+
&self,
194+
role_variant: &KafkaRole,
195+
) -> Result<&Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>, Error> {
195196
match role_variant {
196197
KafkaRole::Broker => self.spec.brokers.as_ref(),
197198
}
@@ -203,7 +204,7 @@ impl KafkaCluster {
203204
pub fn rolegroup(
204205
&self,
205206
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
206-
) -> Result<&RoleGroup<KafkaConfigFragment, GenericProductSpecificCommonConfig>, Error> {
207+
) -> Result<&RoleGroup<KafkaConfigFragment, JavaCommonConfig>, Error> {
207208
let role_variant =
208209
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
209210
role: rolegroup_ref.role.to_owned(),
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use snafu::{OptionExt, ResultExt, Snafu};
2+
use stackable_kafka_crd::{
3+
KafkaConfig, KafkaConfigFragment, JVM_SECURITY_PROPERTIES_FILE, METRICS_PORT,
4+
STACKABLE_CONFIG_DIR,
5+
};
6+
use stackable_operator::{
7+
memory::{BinaryMultiple, MemoryQuantity},
8+
role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role},
9+
};
10+
11+
const JAVA_HEAP_FACTOR: f32 = 0.8;
12+
13+
#[derive(Snafu, Debug)]
14+
pub enum Error {
15+
#[snafu(display("invalid memory resource configuration - missing default or value in crd?"))]
16+
MissingMemoryResourceConfig,
17+
18+
#[snafu(display("invalid memory config"))]
19+
InvalidMemoryConfig {
20+
source: stackable_operator::memory::Error,
21+
},
22+
23+
#[snafu(display("failed to merge jvm argument overrides"))]
24+
MergeJvmArgumentOverrides { source: role_utils::Error },
25+
}
26+
27+
/// All JVM arguments.
28+
fn construct_jvm_args(
29+
merged_config: &KafkaConfig,
30+
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
31+
role_group: &str,
32+
) -> Result<Vec<String>, Error> {
33+
let heap_size = MemoryQuantity::try_from(
34+
merged_config
35+
.resources
36+
.memory
37+
.limit
38+
.as_ref()
39+
.context(MissingMemoryResourceConfigSnafu)?,
40+
)
41+
.context(InvalidMemoryConfigSnafu)?
42+
.scale_to(BinaryMultiple::Mebi)
43+
* JAVA_HEAP_FACTOR;
44+
let java_heap = heap_size
45+
.format_for_java()
46+
.context(InvalidMemoryConfigSnafu)?;
47+
48+
let jvm_args = vec![
49+
// Heap settings
50+
format!("-Xmx{java_heap}"),
51+
format!("-Xms{java_heap}"),
52+
format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"),
53+
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml")
54+
];
55+
56+
let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args);
57+
let merged = role
58+
.get_merged_jvm_argument_overrides(role_group, &operator_generated)
59+
.context(MergeJvmArgumentOverridesSnafu)?;
60+
Ok(merged
61+
.effective_jvm_config_after_merging()
62+
// Sorry for the clone, that's how operator-rs is currently modelled :P
63+
.clone())
64+
}
65+
66+
/// Arguments that go into `EXTRA_ARGS`, so *not* the heap settings (which you can get using
67+
/// [`construct_heap_jvm_args`]).
68+
pub fn construct_non_heap_jvm_args(
69+
merged_config: &KafkaConfig,
70+
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
71+
role_group: &str,
72+
) -> Result<String, Error> {
73+
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
74+
jvm_args.retain(|arg| !is_heap_jvm_argument(arg));
75+
76+
Ok(jvm_args.join(" "))
77+
}
78+
79+
/// Arguments that go into `KAFKA_HEAP_OPTS`.
80+
/// You can get the normal JVM arguments using [`construct_non_heap_jvm_args`].
81+
pub fn construct_heap_jvm_args(
82+
merged_config: &KafkaConfig,
83+
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
84+
role_group: &str,
85+
) -> Result<String, Error> {
86+
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
87+
jvm_args.retain(|arg| is_heap_jvm_argument(arg));
88+
89+
Ok(jvm_args.join(" "))
90+
}
91+
92+
fn is_heap_jvm_argument(jvm_argument: &str) -> bool {
93+
let lowercase = jvm_argument.to_lowercase();
94+
95+
lowercase.starts_with("-xms") || lowercase.starts_with("-xmx")
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use stackable_kafka_crd::{KafkaCluster, KafkaRole};
101+
102+
use super::*;
103+
104+
#[test]
105+
fn test_construct_jvm_arguments_defaults() {
106+
let input = r#"
107+
apiVersion: kafka.stackable.tech/v1alpha1
108+
kind: KafkaCluster
109+
metadata:
110+
name: simple-kafka
111+
spec:
112+
image:
113+
productVersion: 3.7.1
114+
clusterConfig:
115+
zookeeperConfigMapName: xyz
116+
brokers:
117+
roleGroups:
118+
default:
119+
replicas: 1
120+
"#;
121+
let (kafka_role, role, merged_config) = construct_boilerplate(input);
122+
let non_heap_jvm_args =
123+
construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
124+
let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
125+
126+
assert_eq!(
127+
non_heap_jvm_args,
128+
"-Djava.security.properties=/stackable/config/security.properties \
129+
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml"
130+
);
131+
assert_eq!(heap_jvm_args, "-Xmx819m -Xms819m");
132+
}
133+
134+
#[test]
135+
fn test_construct_jvm_argument_overrides() {
136+
let input = r#"
137+
apiVersion: kafka.stackable.tech/v1alpha1
138+
kind: KafkaCluster
139+
metadata:
140+
name: simple-kafka
141+
spec:
142+
image:
143+
productVersion: 3.7.1
144+
clusterConfig:
145+
zookeeperConfigMapName: xyz
146+
brokers:
147+
config:
148+
resources:
149+
memory:
150+
limit: 42Gi
151+
jvmArgumentOverrides:
152+
add:
153+
- -Dhttps.proxyHost=proxy.my.corp
154+
- -Dhttps.proxyPort=8080
155+
- -Djava.net.preferIPv4Stack=true
156+
roleGroups:
157+
default:
158+
replicas: 1
159+
jvmArgumentOverrides:
160+
# We need more memory!
161+
removeRegex:
162+
- -Xmx.*
163+
- -Dhttps.proxyPort=.*
164+
add:
165+
- -Xmx40000m
166+
- -Dhttps.proxyPort=1234
167+
"#;
168+
let (merged_config, role, role_group) = construct_boilerplate(input);
169+
let non_heap_jvm_args =
170+
construct_non_heap_jvm_args(&merged_config, &role, &role_group).unwrap();
171+
let heap_jvm_args = construct_heap_jvm_args(&merged_config, &role, &role_group).unwrap();
172+
173+
assert_eq!(
174+
non_heap_jvm_args,
175+
"-Djava.security.properties=/stackable/config/security.properties \
176+
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml \
177+
-Dhttps.proxyHost=proxy.my.corp \
178+
-Djava.net.preferIPv4Stack=true \
179+
-Dhttps.proxyPort=1234"
180+
);
181+
assert_eq!(heap_jvm_args, "-Xms34406m -Xmx40000m");
182+
}
183+
184+
fn construct_boilerplate(
185+
kafka_cluster: &str,
186+
) -> (
187+
KafkaConfig,
188+
Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
189+
String,
190+
) {
191+
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
192+
193+
let kafka_role = KafkaRole::Broker;
194+
let rolegroup_ref = kafka.broker_rolegroup_ref("default");
195+
let merged_config = kafka.merged_config(&kafka_role, &rolegroup_ref).unwrap();
196+
let role = kafka.spec.brokers.unwrap();
197+
198+
(merged_config, role, "default".to_owned())
199+
}
200+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod jvm;

0 commit comments

Comments
 (0)