Skip to content

Commit 76f14a5

Browse files
authored
Fix missing jmx reporter (#11379)
Signed-off-by: Paolo Patierno <ppatierno@live.com>
1 parent 726387b commit 76f14a5

File tree

4 files changed

+115
-36
lines changed

4 files changed

+115
-36
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
If you have any custom logging configuration, you might need to update it during the upgrade to Kafka 4.0.
5252
* Kubernetes events for Pod restarts no longer have the Pod as the `regardingObject`.
5353
If you are using `regardingObject` as a `field-selector` for listing events you must update the selector to specify the Kafka resource instead.
54+
* From Kafka 4.0.0, to enable the JMXReporter you must either enable metrics in `.spec.kafka.metrics`, or explicitly add JMXReporter in `metric.reporters`.
5455

5556
## 0.45.0
5657

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class KafkaBrokerConfigurationBuilder {
6060
private final static String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}";
6161
private final static String PLACEHOLDER_OAUTH_CLIENT_SECRET_TEMPLATE_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:STRIMZI_%s_OAUTH_CLIENT_SECRET}";
6262

63+
private final static String KAFKA_JMX_REPORTER_CLASS = "org.apache.kafka.common.metrics.JmxReporter";
64+
6365
private final StringWriter stringWriter = new StringWriter();
6466
private final PrintWriter writer = new PrintWriter(stringWriter);
6567
private final Reconciliation reconciliation;
@@ -787,44 +789,52 @@ private void configProviders(KafkaConfiguration userConfig) {
787789
*
788790
* @param userConfig The User configuration - Kafka broker configuration options specified by the user in the Kafka custom resource
789791
* @param injectCcMetricsReporter Inject the Cruise Control Metrics Reporter into the configuration
792+
* @param isMetricsEnabled Flag to indicate if metrics are enabled. If they are we inject the JmxReporter into the configuration
790793
*
791794
* @return Returns the builder instance
792795
*/
793-
public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration userConfig, boolean injectCcMetricsReporter) {
794-
if (userConfig != null && !userConfig.getConfiguration().isEmpty()) {
795-
// We have to create a copy of the configuration before we modify it
796-
userConfig = new KafkaConfiguration(userConfig);
797-
798-
// Configure the configuration providers => we have to inject the Strimzi ones
799-
configProviders(userConfig);
800-
801-
if (injectCcMetricsReporter) {
802-
// We configure the Cruise Control Metrics Reporter is needed
803-
if (userConfig.getConfigOption(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD) != null) {
804-
if (!userConfig.getConfigOption(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD).contains(CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER)) {
805-
userConfig.setConfigOption(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD, userConfig.getConfigOption(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD) + "," + CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER);
806-
}
807-
} else {
808-
userConfig.setConfigOption(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD, CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER);
809-
}
810-
}
796+
public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration userConfig, boolean injectCcMetricsReporter, boolean isMetricsEnabled) {
797+
// We have to create a copy of the configuration before we modify it
798+
userConfig = userConfig != null ? new KafkaConfiguration(userConfig) : new KafkaConfiguration(reconciliation, new ArrayList<>());
799+
800+
// Configure the configuration providers => we have to inject the Strimzi ones
801+
configProviders(userConfig);
811802

803+
addMetricReporters(userConfig, injectCcMetricsReporter, isMetricsEnabled);
804+
805+
// print user config with Strimzi injections
806+
if (!userConfig.getConfiguration().isEmpty()) {
812807
printSectionHeader("User provided configuration");
813808
writer.println(userConfig.getConfiguration());
814809
writer.println();
815-
} else {
816-
// Configure the configuration providers => we have to inject the Strimzi ones
817-
configProviders(userConfig);
810+
}
818811

819-
if (injectCcMetricsReporter) {
820-
// There is no user provided configuration. But we still need to inject the Cruise Control Metrics Reporter
821-
printSectionHeader("Cruise Control Metrics Reporter");
822-
writer.println(CruiseControlMetricsReporter.KAFKA_METRIC_REPORTERS_CONFIG_FIELD + "=" + CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER);
823-
writer.println();
812+
return this;
813+
}
814+
815+
/**
816+
* Add metric reporters to the corresponding Kafka configuration
817+
*
818+
* @param userConfig The User configuration - Kafka broker configuration options specified by the user in the Kafka custom resource
819+
* @param injectCcMetricsReporter Inject the Cruise Control Metrics Reporter into the configuration
820+
* @param injectJmxReporter Inject the JMX Reporter into the configuration
821+
*/
822+
private void addMetricReporters(KafkaConfiguration userConfig, boolean injectCcMetricsReporter, boolean injectJmxReporter) {
823+
if (injectJmxReporter) {
824+
if (userConfig.getConfigOption("metric.reporters") != null && !userConfig.getConfigOption("metric.reporters").contains(KAFKA_JMX_REPORTER_CLASS)) {
825+
userConfig.setConfigOption("metric.reporters", userConfig.getConfigOption("metric.reporters") + "," + KAFKA_JMX_REPORTER_CLASS);
826+
} else {
827+
userConfig.setConfigOption("metric.reporters", KAFKA_JMX_REPORTER_CLASS);
824828
}
825829
}
826830

827-
return this;
831+
if (injectCcMetricsReporter) {
832+
if (userConfig.getConfigOption("metric.reporters") != null && !userConfig.getConfigOption("metric.reporters").contains(CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER)) {
833+
userConfig.setConfigOption("metric.reporters", userConfig.getConfigOption("metric.reporters") + "," + CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER);
834+
} else {
835+
userConfig.setConfigOption("metric.reporters", CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER);
836+
}
837+
}
828838
}
829839

830840
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1814,7 +1814,7 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
18141814
.withCruiseControl(cluster, ccMetricsReporter, node.broker())
18151815
.withTieredStorage(cluster, tieredStorage)
18161816
.withQuotas(cluster, quotas)
1817-
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null)
1817+
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, metrics.isEnabled())
18181818
.build()
18191819
.trim();
18201820
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void testOpaAuthorizationWithTls() {
457457
@ParallelTest
458458
public void testNullUserConfiguration() {
459459
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
460-
.withUserConfiguration(null, false)
460+
.withUserConfiguration(null, false, false)
461461
.build();
462462

463463
assertThat(configuration, isEquivalent("node.id=2",
@@ -473,7 +473,7 @@ public void testNullUserConfiguration() {
473473
@ParallelTest
474474
public void testNullUserConfigurationAndCCReporter() {
475475
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
476-
.withUserConfiguration(null, true)
476+
.withUserConfiguration(null, true, false)
477477
.build();
478478

479479
assertThat(configuration, isEquivalent("node.id=2",
@@ -487,13 +487,30 @@ public void testNullUserConfigurationAndCCReporter() {
487487
"metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
488488
}
489489

490+
@ParallelTest
491+
public void testNullUserConfigurationAndMetricsEnabled() {
492+
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
493+
.withUserConfiguration(null, false, true)
494+
.build();
495+
496+
assertThat(configuration, isEquivalent("node.id=2",
497+
"config.providers=strimzienv,strimzifile,strimzidir",
498+
"config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
499+
"config.providers.strimzienv.param.allowlist.pattern=.*",
500+
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
501+
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
502+
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
503+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
504+
"metric.reporters=org.apache.kafka.common.metrics.JmxReporter"));
505+
}
506+
490507
@ParallelTest
491508
public void testEmptyUserConfiguration() {
492509
Map<String, Object> userConfiguration = new HashMap<>();
493510
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
494511

495512
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
496-
.withUserConfiguration(kafkaConfiguration, false)
513+
.withUserConfiguration(kafkaConfiguration, false, false)
497514
.build();
498515

499516
assertThat(configuration, isEquivalent("node.id=2",
@@ -517,7 +534,7 @@ public void testUserConfiguration() {
517534
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
518535

519536
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
520-
.withUserConfiguration(kafkaConfiguration, false)
537+
.withUserConfiguration(kafkaConfiguration, false, false)
521538
.build();
522539

523540
assertThat(configuration, isEquivalent("node.id=2",
@@ -544,7 +561,7 @@ public void testUserConfigurationWithConfigProviders() {
544561

545562
// Broker
546563
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
547-
.withUserConfiguration(kafkaConfiguration, false)
564+
.withUserConfiguration(kafkaConfiguration, false, false)
548565
.build();
549566

550567
assertThat(configuration, isEquivalent("node.id=2",
@@ -559,7 +576,7 @@ public void testUserConfigurationWithConfigProviders() {
559576

560577
// Controller
561578
configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, new NodeRef("my-cluster-kafka-3", 3, "kafka", true, false))
562-
.withUserConfiguration(kafkaConfiguration, false)
579+
.withUserConfiguration(kafkaConfiguration, false, false)
563580
.build();
564581

565582
assertThat(configuration, isEquivalent("node.id=3",
@@ -580,7 +597,7 @@ public void testUserConfigurationWithCCMetricsReporter() {
580597
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
581598

582599
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
583-
.withUserConfiguration(kafkaConfiguration, true)
600+
.withUserConfiguration(kafkaConfiguration, true, false)
584601
.build();
585602

586603
assertThat(configuration, isEquivalent("node.id=2",
@@ -598,6 +615,35 @@ public void testUserConfigurationWithCCMetricsReporter() {
598615
"metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
599616
}
600617

618+
@ParallelTest
619+
public void testUserConfigurationWithCCMetricsReporterAndMetricsEnabled() {
620+
Map<String, Object> userConfiguration = new HashMap<>();
621+
userConfiguration.put("auto.create.topics.enable", "false");
622+
userConfiguration.put("offsets.topic.replication.factor", 3);
623+
userConfiguration.put("transaction.state.log.replication.factor", 3);
624+
userConfiguration.put("transaction.state.log.min.isr", 2);
625+
626+
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
627+
628+
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
629+
.withUserConfiguration(kafkaConfiguration, true, true)
630+
.build();
631+
632+
assertThat(configuration, isEquivalent("node.id=2",
633+
"config.providers=strimzienv,strimzifile,strimzidir",
634+
"config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
635+
"config.providers.strimzienv.param.allowlist.pattern=.*",
636+
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
637+
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
638+
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
639+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
640+
"auto.create.topics.enable=false",
641+
"offsets.topic.replication.factor=3",
642+
"transaction.state.log.replication.factor=3",
643+
"transaction.state.log.min.isr=2",
644+
"metric.reporters=org.apache.kafka.common.metrics.JmxReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
645+
}
646+
601647
@ParallelTest
602648
public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters() {
603649
Map<String, Object> userConfiguration = new HashMap<>();
@@ -606,7 +652,7 @@ public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters()
606652
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
607653

608654
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
609-
.withUserConfiguration(kafkaConfiguration, true)
655+
.withUserConfiguration(kafkaConfiguration, true, false)
610656
.build();
611657

612658
assertThat(configuration, isEquivalent("node.id=2",
@@ -620,6 +666,28 @@ public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters()
620666
"metric.reporters=my.domain.CustomMetricReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
621667
}
622668

669+
@ParallelTest
670+
public void testUserConfigurationWithCCMetricsReporterAndMetricsEnabledAndOtherMetricReporters() {
671+
Map<String, Object> userConfiguration = new HashMap<>();
672+
userConfiguration.put("metric.reporters", "my.domain.CustomMetricReporter");
673+
674+
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
675+
676+
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
677+
.withUserConfiguration(kafkaConfiguration, true, true)
678+
.build();
679+
680+
assertThat(configuration, isEquivalent("node.id=2",
681+
"config.providers=strimzienv,strimzifile,strimzidir",
682+
"config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
683+
"config.providers.strimzienv.param.allowlist.pattern=.*",
684+
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
685+
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
686+
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
687+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
688+
"metric.reporters=my.domain.CustomMetricReporter,org.apache.kafka.common.metrics.JmxReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
689+
}
690+
623691
@ParallelTest
624692
public void testEphemeralStorageLogDirs() {
625693
Storage storage = new EphemeralStorageBuilder()

0 commit comments

Comments
 (0)