Skip to content

Strimzi Metrics Reporter integration with KafkaConnect and MirrorMaker2 #11570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

OwenCorrigan76
Copy link
Contributor

@OwenCorrigan76 OwenCorrigan76 commented Jun 20, 2025

Type of change

  • Enhancement / new feature

Description

This patch adds support for the Strimzi Metrics Reporter to Kafka Connect and MirrorMaker2 as described by the following proposal:

https://github.com/strimzi/proposals/blob/main/064-prometheus-metrics-reporter.md

Related to #10753

We won’t initially support the CruiseControl component. To make it work, CC should be changed to expose metrics through its HTTP endpoint.

Documentation will be updated in a separate PR.

Checklist

  • Write tests
  • Make sure all tests pass
  • [] Update documentation
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md

@OwenCorrigan76 OwenCorrigan76 requested a review from fvaleri June 20, 2025 09:53
@OwenCorrigan76 OwenCorrigan76 added this to the 0.47.0 milestone Jun 20, 2025
@OwenCorrigan76 OwenCorrigan76 force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from 9c6fd18 to 2238a75 Compare June 20, 2025 09:57
@OwenCorrigan76 OwenCorrigan76 requested a review from a team June 20, 2025 11:09
boolean injectKafkaJmxReporter,
boolean injectStrimziMetricsReporter) {
// creating a defensive copy to avoid mutating the input config
if (configurations instanceof KafkaConnectConfiguration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (configurations instanceof KafkaConnectConfiguration) {
if (configurations instanceof KafkaConnectConfiguration kcConfiguration) {

boolean injectStrimziMetricsReporter) {
// creating a defensive copy to avoid mutating the input config
if (configurations instanceof KafkaConnectConfiguration) {
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations);
configurations = new KafkaConnectConfiguration(kcConfiguration);

// creating a defensive copy to avoid mutating the input config
if (configurations instanceof KafkaConnectConfiguration) {
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations);
} else if (configurations instanceof KafkaMirrorMaker2Configuration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (configurations instanceof KafkaMirrorMaker2Configuration) {
} else if (configurations instanceof KafkaMirrorMaker2Configuration kmm2Configuration) {

if (configurations instanceof KafkaConnectConfiguration) {
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations);
} else if (configurations instanceof KafkaMirrorMaker2Configuration) {
configurations = new KafkaMirrorMaker2Configuration((KafkaMirrorMaker2Configuration) configurations);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
configurations = new KafkaMirrorMaker2Configuration((KafkaMirrorMaker2Configuration) configurations);
configurations = new KafkaMirrorMaker2Configuration(kmm2Configuration);

Comment on lines 187 to 197
public static KafkaBuilder kafkaWithoutEntityOperator(String namespaceName, String kafkaClusterName, int kafkaReplicas) {
KafkaBuilder kb = kafka(namespaceName, kafkaClusterName, kafkaReplicas)
.editSpec()
.editEntityOperator()
.withTopicOperator(null)
.withUserOperator(null)
.endEntityOperator()
.endSpec();

return kb;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?
I'm not sure that it will do exactly what you want, we usually set it in the Kafka once it is "built".
In the past this didn't work.

But anyway, the KafkaTemplates class is there for things that are used multiple times, you are using it just once, which doesn't make sense to me to have it in the templates class.

@@ -98,8 +165,10 @@ void setupEnvironment() {

KubeResourceManager.get().createResourceWithWait(
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3).build(),
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build()
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build(),
KafkaNodePoolTemplates.mixedPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName() + "-tgt", testStorage.getClusterName() + "-tgt", 1).build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's desired to use mixed pools? Also, should it be really just one node? These tests were always really unstable, so I wouldn't go with the mixed node pool and with just 1 replica (if you keep the mixed node pool).

Also, you don't need to use the -tgt, you can just use getTargetClusterName(), getTargetBrokerPoolName() from the testStorage.

);

KubeResourceManager.get().createResourceWithWait(
KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getNamespaceName(), "my-connect",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use things from the testStorage. It's easier than having hardcoded values like my-connect. Just go with the getClusterName() as we have in all other tests.

.endValues()
.endStrimziMetricsReporterConfig()
.endSpec().build(),
KafkaMirrorMaker2Templates.kafkaMirrorMaker2(testStorage.getNamespaceName(), "my-mm2",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@im-konge this would result in a KubernetesClientException due to the operator attempt to patch the roleRef field of an existing RoleBinding, which is not allowed in Kubernetes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using it everywhere else:

KafkaMirrorMaker2Templates.kafkaMirrorMaker2(testStorage.getNamespaceName(), testStorage.getClusterName(), testStorage.getSourceClusterName(), testStorage.getTargetClusterName(), 1, false)

Why it would not work here as well?

Copy link
Contributor

@fvaleri fvaleri Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with your change, we are creating both KC and MM2 with the same name, and the role binding reconciliation logic is in common between these components

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I understand.
So anyway, instead of hardcoding it, it would be better to create some local variables for the class (similarly how it is done in MetricsST) for better manipulation in other tests.

Also, I'm thinking about leaving the creation of the MM2 and Connect (and possibly other components in the future) for the particular tests.
So in case that one test (for Kafka for example) fails, the suite will not try to recreate MM2 and Connect.

That's I guess the right way (plus for example in the MetricsST we can move the second Kafka cluster out of the before all, as there is no MM1 anymore) - and it will speed things up. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the issue because you have different namespaces in beforeall and in the test?

Copy link
Contributor

@fvaleri fvaleri Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always using testStorage.getNamespaceName(), and looking at logs it's the same namespace (test-suite-namespace). Uploading 2025-06-24-10-44-07.zip…

Copy link
Member

@im-konge im-konge Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will a have a look.
What you should do is that you will create class variable for the testStorage and use it everywhere.
Because the context is different in the before all and in the test.

So even if the namespace is same, the cluster name you are passing there is different in the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check for example HttpBridgeST where we are using the suiteTestStorage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm doing exactly that, but still running some tests to confirm is all good.

ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build(),
KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getTopicName(), testStorage.getClusterName(), 5, 2).build()
KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getTopicName(), testStorage.getClusterName(), 5, 2).build(),
KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), "my-connect").build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@OwenCorrigan76 OwenCorrigan76 force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from 04f28c5 to d2ec4c9 Compare June 23, 2025 15:38
@fvaleri fvaleri force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from d2ec4c9 to 70c2a3e Compare June 24, 2025 11:35
@fvaleri
Copy link
Contributor

fvaleri commented Jun 24, 2025

@im-konge your comments should be addressed now. Thanks.

@fvaleri fvaleri force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch 2 times, most recently from d33eb0a to 2456fff Compare June 24, 2025 11:54
@im-konge
Copy link
Member

im-konge commented Jun 24, 2025

@im-konge your comments should be addressed now. Thanks.

Thanks @fvaleri, I will have a look later today or tomorrow :)

* @param configuration Existing configuration
*
*/
public KafkaMirrorMaker2Configuration(KafkaMirrorMaker2Configuration configuration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should modify this in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Can you please give more details?

* @param configuration User provided Kafka Connect configuration
*
*/
public KafkaConnectConfiguration(KafkaConnectConfiguration configuration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should modify this in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Can you please give more details?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try to avoid modifying it in the first place. So I guess the main question is why is this needed in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @scholzj.
The javadoc in these configuration classes and the comment in KafkaConnectConfigurationBuilder are pretty clear about why these changes are needed.
We are creating a defensive copy and utilising the newly created constructor to do so.

 // creating a defensive copy to avoid mutating the input config
        if (configurations instanceof KafkaConnectConfiguration kcConfiguration) {
            configurations = new KafkaConnectConfiguration(kcConfiguration);

Try to comment out this change in KafkaConnectConfiguration and run the KafkaConnectConfigurationBuilderTest to have an example of the kind of issues you may have. That's how I found the issue and prompted the need for that change

Also, this is the same strategy used here in KafkaBrokerConfigurationBuilder, for exactly the same reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the Kafka Broker Configuration Builder does not insert anything into the user configuration and neither should you. If it does, it is wrong and should be fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point. +1 for me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed last changes and now I am less convinced that using a copy of the user configuration as cache was not that great (and sorry if I changed my mind).
With making a copy of the user configuration (which anyway we do in the broker and connect builder for removing stuff from there) we have just one consistent place where things are happening and where we build the configuration to be printed at the end (and it contains both strimzi and user ones).
With the latest changes we are anyway using a copy (because of the removing stuff) but then we are directly printing some other stuff so it's now a mix. The only advantage that current code is bringing is a clear separation of configuration sections (for user and strimzi) within the Kafka configuration, but from a code perspective we are losing clarity and maybe introducing side effects. Let's see what others think but imho we could come back to the previous solution, unless someone else strongly think (and convince me) that the current one is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any of the @strimzi/maintainers have any further opinion on this thread? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back at previous changes, there was already logic in the KafkaBrokerConfigurationBuilder.withUserConfiguration method handling metric configs, so I can see why we added the part about the Strimzi Metrics Reporter there as well in the previous PR and it went in without anyone commenting on it.

However, I do think it is confusing to have a method called withUserConfiguration and then to actually add some other metrics that aren't directly user configurations. So I would prefer a dedicated method in KafkaConnectConfigurationBuilder called something like withMetricsConfiguration that handles the metrics configuration.

I think we might also want to consider changing the way we do it in the KafkaBrokerConfigurationBuilder, but I think that should be handled in a separate PR to make things easier to review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree that from a logical perspective, having two methods would make much more sense but I think it was done this ways because both configuration (from user and by strimzi) have an impact on the same property, the metric.reporters so you have to mix and match the reporters set by the user via the spec.kafka.config together with the ones added by Strimzi automatically when you enable the Strimzi Metrics reporter for example.
You can't write the property in each withXXXConfiguration method you should write it when everything is consistent in one object, thus the first/previous approach by cloning the user config object and using it to add the Strimzi ones to write everything at the end.
Of course, it doesn't mean that having two methods is not possible. We should pass a config object across them in order to fill what we need from both and the write it.

* @param key List configuration key.
* @param values List configuration values.
*/
public static void createOrAddListConfig(AbstractConfiguration kafkaConfig, String key, String values) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not really model utils from my point of view and should not be here. (Plus as I said, you should not need these things in the first place.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you suggest a better place please? Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly in the configuration classes themself? But as I said in the other place, I do not think you should be adding to it in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will use AbstractConfiguration to host that method. Thanks for the suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved createOrAddListConfig whilst awaiting the outcome of the conversation here #11570 (comment)

@mimaison
Copy link
Contributor

FYI there is an issue in metrics-reporter 0.2.0 with MirrorMaker that prevents it from retrieving metrics: strimzi/metrics-reporter#87
The issue happens when multiple tasks of MirrorMaker connectors run in the same runtime, which pretty much always happens when you run MirrorMaker.

@fvaleri
Copy link
Contributor

fvaleri commented Jun 26, 2025

FYI there is an issue in metrics-reporter 0.2.0 with MirrorMaker that prevents it from retrieving metrics: strimzi/metrics-reporter#87 The issue happens when multiple tasks of MirrorMaker connectors run in the same runtime, which pretty much always happens when you run MirrorMaker.

@mimaison thanks for the note. AFAICS, it should be easy to reproduce with this PR and, in order to move this PR forward, we would need metrics-reporter 0.2.1 or 0.3.0. What's the plan?

@mimaison
Copy link
Contributor

I'd suggest releasing metrics-reporter 0.3.0 and using that in strimzi-kafka-operator. But obviously we need the input of Strimzi maintainers.

@fvaleri
Copy link
Contributor

fvaleri commented Jun 26, 2025

It looks like 0.2.0 is not affected by metrics-reporter/issues/87 (*) and we can proceed with this PR review. Most probably the issue originated from some other change done after the release.

@im-konge @scholzj @ppatierno can you please have another look? Thanks.

(*) In order to reproduce the issue you need to set the allowList to .* so that kafka_connect_mirror_kafka_metrics_count_count is not filtered out by the default allowList. You also need to run a producer and consumer to have running tasks on both source and checkpoint connectors.

Source connector status:

{
  "name": "my-cluster->my-cluster-tgt.MirrorSourceConnector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    },
    {
      "id": 3,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    },
    {
      "id": 4,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    }
  ],
  "type": "source"
}

Checkpoint connector status:

{
  "name": "my-cluster->my-cluster-tgt.MirrorCheckpointConnector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
    }
  ],
  "type": "source"
}

After that, I'm able to get all metrics by invoking the endpoint, including the offending one:

# TYPE kafka_connect_mirror_kafka_metrics_count_count gauge
kafka_connect_mirror_kafka_metrics_count_count 1.0

@mimaison
Copy link
Contributor

Thanks @fvaleri for checking.

Copy link
Member

@im-konge im-konge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from systemtests POV. Thanks

@OwenCorrigan76 OwenCorrigan76 force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from 2456fff to 11da745 Compare July 4, 2025 15:00
@@ -375,4 +375,4 @@ private static List<String> validateComputeResources(ResourceRequirements resour
}
return errors;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's get rid of any change to this class.

@fvaleri fvaleri requested review from scholzj and ppatierno July 7, 2025 09:53
}
if (injectKafkaJmxReporter) {
AbstractConfiguration.createOrAddListConfig(userConfig, "metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");
private void printMetricReporters(AbstractConfiguration userConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why AbstractConfiguration and not KafkaConfiguration?

* @param injectStrimziMetricsReporter Flag indicating whether to inject the Strimzi Metrics Reporter.
*/
private void maybeAddYammerMetricsReporters(KafkaConfiguration userConfig, boolean injectStrimziMetricsReporter) {
private void printYammerMetricsReporters(AbstractConfiguration userConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto as above


for (int i = 0; i < reporterClasses.size(); i++) {
if (userConfig != null && !userConfig.getConfiguration().isEmpty() &&
userConfig.getConfigOption(configKey) != null && injectFlags.get(i)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU this part is about putting the user config, why are you also testing injectFlags.get(i) which is about the injection of "internal" reporters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to the following comment.

userConfig.getConfigOption(configKey) != null && injectFlags.get(i)) {
hasUserMetricReporters = true;
props.addPair(configKey, userConfig.getConfigOption(configKey));
userConfig.removeConfigOption(configKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above three lines of code doesn't use anything which is driving the for loop (i.e. reporterClasses, ...) why can't they be outside of the loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have to be executed only if there is user config AND an injection needed (i.e. we don't mess with user config unless we have to). I reworked the code to make it more readable and less error-prone.

boolean hasUserMetricReporters = false;

if (userConfig != null && !userConfig.getConfiguration().isEmpty() &&
userConfig.getConfigOption(configKey) != null && injectStrimziMetricsReporter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's similar to the previous question but:
if there is a "kafka.metrics.reporters" provided by the user BUT injectStrimziMetricsReporter is false, we are not entering in the if so we are not adding the user config to the props collection. The hasUserMetricReporters remain false and when we go down the final code with don't print any configuration. The result we are not printing the user provided configuration, or?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, the user configuration stays in the userConfig and it is printed at the end of withUserConfiguration.

@fvaleri fvaleri force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from 699e5d8 to c299e48 Compare July 9, 2025 16:37
@scholzj scholzj modified the milestones: 0.47.0, 0.48.0 Jul 10, 2025
CHANGELOG.md Outdated
@@ -8,6 +8,7 @@
* Support for Kubernetes Image Volumes to mount custom plugins
* Adding support for [JsonTemplateLayout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html) in Operators, Cruise Control and Kafka 4.0.0 or greater.
* Strimzi Drain Cleaner updated to 1.4.0 (included in the Strimzi installation files)
* Added support for Strimzi Metrics Reporter to the KafkaConnect and MirrorMaker2.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #11638 is merged, this has to be moved to the 0.48.0 section to avoid the same problem we had with the Kafka brokers part.

@OwenCorrigan76 OwenCorrigan76 force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch 3 times, most recently from c52d7f5 to 5bc302d Compare July 14, 2025 11:56
@fvaleri
Copy link
Contributor

fvaleri commented Jul 15, 2025

@ppatierno @scholzj any feedback on this? We are already working on the Bridge support, but we would need this one to move on. Thanks.

String configKey = "metric.reporters";
boolean hasUserMetricReporters = false;
// the JmxReporter is explicitly added when metric.reporters is not empty
List<Map.Entry<String, Boolean>> reporterConfig = List.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to feed this into a list instead of just printing it as it comes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary list of 3 elements that is used as an optimization to make the code less verbose. The alternative would be to get rid of this data structure and replace the for loop with explicit if statements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really less verbose? Is it really more readable which is probably more important quality? TBH, I do not think so. Why is it not possible to just assemble it as a String right away?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this code very hard to read. The code in KafkaConnectConfigurationBuilder.printMetricReporters is even more difficult to follow since there are two Lists and two a for loop within a for loop. I think we should aim for readability here over fewer lines of code.

private void maybeAddYammerMetricsReporters(KafkaConfiguration userConfig, boolean injectStrimziMetricsReporter) {
private void printYammerMetricsReporters(KafkaConfiguration userConfig,
boolean injectStrimziMetricsReporter) {
OrderedProperties props = new OrderedProperties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an optimization where we are only printing the data at the end. It is one of the options you suggested in a previous comment:

We can either generate the configuration right away into the configuration file or if we really need to store something between the different steps, we can use the builder class for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, the configuration builder classes are all designed around just printing the configurations into the PrintWriter. When I suggested to store the state, I expected for example the user-configured metric reporters will be taken in the user configuration method, stored in the class variable and added later in the metrics method to the. Not that you generate everything and store it just to print it later. Not sure that makes sense. Don't the order properties have always only one entry anyway?

* @param key List property key.
* @param values List property values.
*/
public static void createOrAddListProperty(OrderedProperties props, String key, String values) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea to move this into AbstractConfiguration made sense when this was invoked in the Configuration classes. But it does not seem to make much sense if you use it with different classes only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this in ModelUtils and that wasn't ok. Where would you suggest to move it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not OK to have it in ModelUtils, and use it for configurations, yes. But the overall idea was to avoid as much as possible the modifications of the configurations. In particular inserting our own configurations into them. So this method should not be needed for the Configuration classes if that is where we moved.

// metrics are collected into a single registry and exposed using the Kafka Connect listener,
// so we need to disable the listener here to avoid opening the default port (8080)
if (strimziMetricsReporterEnabled) {
config.put("metric.reporters", StrimziMetricsReporterConfig.KAFKA_CLASS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read this correctly that we do not expect metric reporters in MM2 connectors? Can we make such an assumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we expect them.

MM2 connectors metrics are collected through this dedicated SMR instance into a shared Prometheus registry instance, so they can be exposed through the Kafka Connect SMR listener endpoint without enabling a new listener.

Comment updated for clarity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We insert our own metrics reporter. But what I meant is that from the code, it seems we do not expect any user-configured metrics reporters there and just insert our own. Is there some reason for that? Or did I misunderstand the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We also need to handle user config when present.

Comment on lines +10 to +25
public class StrimziMetricsReporterConfig {
/** KafkaMetricsReporter fully qualified class name. */
public static final String KAFKA_CLASS = "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter";

/** YammerMetricsReporter fully qualified class name. */
public static final String YAMMER_CLASS = "io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter";

/** Enable HTTP listener (true/false). */
public static final String LISTENER_ENABLE = "prometheus.metrics.reporter.listener.enable";

/** Set listener endpoint (e.g. http://:8080). */
public static final String LISTENER = "prometheus.metrics.reporter.listener";

/** Set a comma separated list of regexes. */
public static final String ALLOW_LIST = "prometheus.metrics.reporter.allowlist";
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have these option in the Metrics Reporter itself and load it from there? Or do you think it is better to avoid it as a CO depedency and define them here?

Copy link
Contributor

@fvaleri fvaleri Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a level of indirection, similar to what we have for CruiseControl. It would be great to have them from the SMR, but we would still need to complement with additional configurations (e.g. FQCNs), and it's handy to have them grouped here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need to complement them? Or what would we need to complement them with? I do not understand that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the fully qualified class names in addition to configuration keys.

@fvaleri
Copy link
Contributor

fvaleri commented Jul 22, 2025

@scholzj we are ready with the following PR, so it would be helpful if you could have another look at this.

writer.println("prometheus.metrics.reporter.listener.enable=true");
writer.println("prometheus.metrics.reporter.listener=http://:" + StrimziMetricsReporterModel.METRICS_PORT);
writer.println("prometheus.metrics.reporter.allowlist=" + reporterModel.getAllowList());
writer.println(StrimziMetricsReporterConfig.LISTENER_ENABLE + "=true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right in understanding that these changes are here because we added the StrimziMetricsReporterConfig class in this PR?

OwenCorrigan76 and others added 10 commits July 24, 2025 19:24
This patch adds trimzi Metrics Reporter integration with KC and MM2.

Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
@fvaleri fvaleri force-pushed the Integreate_Strimzi_Metrics_Reporter_KC_MM2 branch from b58bedb to ab0afc3 Compare July 24, 2025 17:34
@fvaleri
Copy link
Contributor

fvaleri commented Jul 24, 2025

@scholzj @katheris @ppatierno last commit should implement the strategy we discussed during the community meeting and address latest comments. Let us know if you are happy with it.

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants