-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Strimzi Metrics Reporter integration with KafkaConnect and MirrorMaker2 #11570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Strimzi Metrics Reporter integration with KafkaConnect and MirrorMaker2 #11570
Conversation
9c6fd18
to
2238a75
Compare
api/src/main/java/io/strimzi/api/kafka/model/connect/AbstractKafkaConnectSpec.java
Show resolved
Hide resolved
boolean injectKafkaJmxReporter, | ||
boolean injectStrimziMetricsReporter) { | ||
// creating a defensive copy to avoid mutating the input config | ||
if (configurations instanceof KafkaConnectConfiguration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (configurations instanceof KafkaConnectConfiguration) { | |
if (configurations instanceof KafkaConnectConfiguration kcConfiguration) { |
boolean injectStrimziMetricsReporter) { | ||
// creating a defensive copy to avoid mutating the input config | ||
if (configurations instanceof KafkaConnectConfiguration) { | ||
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations); | |
configurations = new KafkaConnectConfiguration(kcConfiguration); |
// creating a defensive copy to avoid mutating the input config | ||
if (configurations instanceof KafkaConnectConfiguration) { | ||
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations); | ||
} else if (configurations instanceof KafkaMirrorMaker2Configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else if (configurations instanceof KafkaMirrorMaker2Configuration) { | |
} else if (configurations instanceof KafkaMirrorMaker2Configuration kmm2Configuration) { |
if (configurations instanceof KafkaConnectConfiguration) { | ||
configurations = new KafkaConnectConfiguration((KafkaConnectConfiguration) configurations); | ||
} else if (configurations instanceof KafkaMirrorMaker2Configuration) { | ||
configurations = new KafkaMirrorMaker2Configuration((KafkaMirrorMaker2Configuration) configurations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
configurations = new KafkaMirrorMaker2Configuration((KafkaMirrorMaker2Configuration) configurations); | |
configurations = new KafkaMirrorMaker2Configuration(kmm2Configuration); |
public static KafkaBuilder kafkaWithoutEntityOperator(String namespaceName, String kafkaClusterName, int kafkaReplicas) { | ||
KafkaBuilder kb = kafka(namespaceName, kafkaClusterName, kafkaReplicas) | ||
.editSpec() | ||
.editEntityOperator() | ||
.withTopicOperator(null) | ||
.withUserOperator(null) | ||
.endEntityOperator() | ||
.endSpec(); | ||
|
||
return kb; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
I'm not sure that it will do exactly what you want, we usually set it in the Kafka
once it is "built".
In the past this didn't work.
But anyway, the KafkaTemplates
class is there for things that are used multiple times, you are using it just once, which doesn't make sense to me to have it in the templates class.
@@ -98,8 +165,10 @@ void setupEnvironment() { | |||
|
|||
KubeResourceManager.get().createResourceWithWait( | |||
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3).build(), | |||
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build() | |||
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build(), | |||
KafkaNodePoolTemplates.mixedPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName() + "-tgt", testStorage.getClusterName() + "-tgt", 1).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's desired to use mixed pools? Also, should it be really just one node? These tests were always really unstable, so I wouldn't go with the mixed node pool and with just 1 replica (if you keep the mixed node pool).
Also, you don't need to use the -tgt
, you can just use getTargetClusterName()
, getTargetBrokerPoolName()
from the testStorage.
); | ||
|
||
KubeResourceManager.get().createResourceWithWait( | ||
KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getNamespaceName(), "my-connect", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use things from the testStorage. It's easier than having hardcoded values like my-connect
. Just go with the getClusterName()
as we have in all other tests.
.endValues() | ||
.endStrimziMetricsReporterConfig() | ||
.endSpec().build(), | ||
KafkaMirrorMaker2Templates.kafkaMirrorMaker2(testStorage.getNamespaceName(), "my-mm2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@im-konge this would result in a KubernetesClientException
due to the operator attempt to patch the roleRef
field of an existing RoleBinding, which is not allowed in Kubernetes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using it everywhere else:
KafkaMirrorMaker2Templates.kafkaMirrorMaker2(testStorage.getNamespaceName(), testStorage.getClusterName(), testStorage.getSourceClusterName(), testStorage.getTargetClusterName(), 1, false)
Why it would not work here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with your change, we are creating both KC and MM2 with the same name, and the role binding reconciliation logic is in common between these components
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, I understand.
So anyway, instead of hardcoding it, it would be better to create some local variables for the class (similarly how it is done in MetricsST
) for better manipulation in other tests.
Also, I'm thinking about leaving the creation of the MM2 and Connect (and possibly other components in the future) for the particular tests.
So in case that one test (for Kafka for example) fails, the suite will not try to recreate MM2 and Connect.
That's I guess the right way (plus for example in the MetricsST
we can move the second Kafka cluster out of the before all, as there is no MM1 anymore) - and it will speed things up. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the issue because you have different namespaces in beforeall and in the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm always using testStorage.getNamespaceName()
, and looking at logs it's the same namespace (test-suite-namespace). Uploading 2025-06-24-10-44-07.zip…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will a have a look.
What you should do is that you will create class variable for the testStorage and use it everywhere.
Because the context is different in the before all and in the test.
So even if the namespace is same, the cluster name you are passing there is different in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check for example HttpBridgeST
where we are using the suiteTestStorage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm doing exactly that, but still running some tests to confirm is all good.
ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build(), | ||
KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getTopicName(), testStorage.getClusterName(), 5, 2).build() | ||
KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getTopicName(), testStorage.getClusterName(), 5, 2).build(), | ||
KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), "my-connect").build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
04f28c5
to
d2ec4c9
Compare
d2ec4c9
to
70c2a3e
Compare
@im-konge your comments should be addressed now. Thanks. |
d33eb0a
to
2456fff
Compare
* @param configuration Existing configuration | ||
* | ||
*/ | ||
public KafkaMirrorMaker2Configuration(KafkaMirrorMaker2Configuration configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should modify this in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? Can you please give more details?
* @param configuration User provided Kafka Connect configuration | ||
* | ||
*/ | ||
public KafkaConnectConfiguration(KafkaConnectConfiguration configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should modify this in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? Can you please give more details?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to avoid modifying it in the first place. So I guess the main question is why is this needed in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @scholzj.
The javadoc in these configuration classes and the comment in KafkaConnectConfigurationBuilder
are pretty clear about why these changes are needed.
We are creating a defensive copy and utilising the newly created constructor to do so.
// creating a defensive copy to avoid mutating the input config
if (configurations instanceof KafkaConnectConfiguration kcConfiguration) {
configurations = new KafkaConnectConfiguration(kcConfiguration);
Try to comment out this change in KafkaConnectConfiguration and run the KafkaConnectConfigurationBuilderTest to have an example of the kind of issues you may have. That's how I found the issue and prompted the need for that change
Also, this is the same strategy used here in KafkaBrokerConfigurationBuilder
, for exactly the same reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure the Kafka Broker Configuration Builder does not insert anything into the user configuration and neither should you. If it does, it is wrong and should be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got your point. +1 for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed last changes and now I am less convinced that using a copy of the user configuration as cache was not that great (and sorry if I changed my mind).
With making a copy of the user configuration (which anyway we do in the broker and connect builder for removing stuff from there) we have just one consistent place where things are happening and where we build the configuration to be printed at the end (and it contains both strimzi and user ones).
With the latest changes we are anyway using a copy (because of the removing stuff) but then we are directly printing some other stuff so it's now a mix. The only advantage that current code is bringing is a clear separation of configuration sections (for user and strimzi) within the Kafka configuration, but from a code perspective we are losing clarity and maybe introducing side effects. Let's see what others think but imho we could come back to the previous solution, unless someone else strongly think (and convince me) that the current one is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do any of the @strimzi/maintainers have any further opinion on this thread? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking back at previous changes, there was already logic in the KafkaBrokerConfigurationBuilder.withUserConfiguration
method handling metric configs, so I can see why we added the part about the Strimzi Metrics Reporter there as well in the previous PR and it went in without anyone commenting on it.
However, I do think it is confusing to have a method called withUserConfiguration
and then to actually add some other metrics that aren't directly user configurations. So I would prefer a dedicated method in KafkaConnectConfigurationBuilder called something like withMetricsConfiguration
that handles the metrics configuration.
I think we might also want to consider changing the way we do it in the KafkaBrokerConfigurationBuilder, but I think that should be handled in a separate PR to make things easier to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree that from a logical perspective, having two methods would make much more sense but I think it was done this ways because both configuration (from user and by strimzi) have an impact on the same property, the metric.reporters
so you have to mix and match the reporters set by the user via the spec.kafka.config
together with the ones added by Strimzi automatically when you enable the Strimzi Metrics reporter for example.
You can't write the property in each withXXXConfiguration method you should write it when everything is consistent in one object, thus the first/previous approach by cloning the user config object and using it to add the Strimzi ones to write everything at the end.
Of course, it doesn't mean that having two methods is not possible. We should pass a config object across them in order to fill what we need from both and the write it.
* @param key List configuration key. | ||
* @param values List configuration values. | ||
*/ | ||
public static void createOrAddListConfig(AbstractConfiguration kafkaConfig, String key, String values) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not really model utils from my point of view and should not be here. (Plus as I said, you should not need these things in the first place.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you suggest a better place please? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly in the configuration classes themself? But as I said in the other place, I do not think you should be adding to it in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will use AbstractConfiguration to host that method. Thanks for the suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved createOrAddListConfig
whilst awaiting the outcome of the conversation here #11570 (comment)
FYI there is an issue in metrics-reporter 0.2.0 with MirrorMaker that prevents it from retrieving metrics: strimzi/metrics-reporter#87 |
@mimaison thanks for the note. AFAICS, it should be easy to reproduce with this PR and, in order to move this PR forward, we would need metrics-reporter 0.2.1 or 0.3.0. What's the plan? |
I'd suggest releasing metrics-reporter 0.3.0 and using that in strimzi-kafka-operator. But obviously we need the input of Strimzi maintainers. |
It looks like 0.2.0 is not affected by metrics-reporter/issues/87 (*) and we can proceed with this PR review. Most probably the issue originated from some other change done after the release. @im-konge @scholzj @ppatierno can you please have another look? Thanks. (*) In order to reproduce the issue you need to set the allowList to Source connector status: {
"name": "my-cluster->my-cluster-tgt.MirrorSourceConnector",
"connector": {
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
{
"id": 4,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
}
],
"type": "source"
} Checkpoint connector status: {
"name": "my-cluster->my-cluster-tgt.MirrorCheckpointConnector",
"connector": {
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "my-mm2-cluster-mirrormaker2-0.my-mm2-cluster-mirrormaker2.test.svc:8083"
}
],
"type": "source"
} After that, I'm able to get all metrics by invoking the endpoint, including the offending one: # TYPE kafka_connect_mirror_kafka_metrics_count_count gauge
kafka_connect_mirror_kafka_metrics_count_count 1.0 |
Thanks @fvaleri for checking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from systemtests POV. Thanks
2456fff
to
11da745
Compare
@@ -375,4 +375,4 @@ private static List<String> validateComputeResources(ResourceRequirements resour | |||
} | |||
return errors; | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's get rid of any change to this class.
} | ||
if (injectKafkaJmxReporter) { | ||
AbstractConfiguration.createOrAddListConfig(userConfig, "metric.reporters", "org.apache.kafka.common.metrics.JmxReporter"); | ||
private void printMetricReporters(AbstractConfiguration userConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why AbstractConfiguration
and not KafkaConfiguration
?
* @param injectStrimziMetricsReporter Flag indicating whether to inject the Strimzi Metrics Reporter. | ||
*/ | ||
private void maybeAddYammerMetricsReporters(KafkaConfiguration userConfig, boolean injectStrimziMetricsReporter) { | ||
private void printYammerMetricsReporters(AbstractConfiguration userConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto as above
|
||
for (int i = 0; i < reporterClasses.size(); i++) { | ||
if (userConfig != null && !userConfig.getConfiguration().isEmpty() && | ||
userConfig.getConfigOption(configKey) != null && injectFlags.get(i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU this part is about putting the user config, why are you also testing injectFlags.get(i)
which is about the injection of "internal" reporters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to the following comment.
userConfig.getConfigOption(configKey) != null && injectFlags.get(i)) { | ||
hasUserMetricReporters = true; | ||
props.addPair(configKey, userConfig.getConfigOption(configKey)); | ||
userConfig.removeConfigOption(configKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the above three lines of code doesn't use anything which is driving the for loop (i.e. reporterClasses
, ...) why can't they be outside of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These have to be executed only if there is user config AND an injection needed (i.e. we don't mess with user config unless we have to). I reworked the code to make it more readable and less error-prone.
boolean hasUserMetricReporters = false; | ||
|
||
if (userConfig != null && !userConfig.getConfiguration().isEmpty() && | ||
userConfig.getConfigOption(configKey) != null && injectStrimziMetricsReporter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's similar to the previous question but:
if there is a "kafka.metrics.reporters" provided by the user BUT injectStrimziMetricsReporter
is false, we are not entering in the if
so we are not adding the user config to the props
collection. The hasUserMetricReporters
remain false and when we go down the final code with don't print any configuration. The result we are not printing the user provided configuration, or?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, the user configuration stays in the userConfig and it is printed at the end of withUserConfiguration.
699e5d8
to
c299e48
Compare
CHANGELOG.md
Outdated
@@ -8,6 +8,7 @@ | |||
* Support for Kubernetes Image Volumes to mount custom plugins | |||
* Adding support for [JsonTemplateLayout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html) in Operators, Cruise Control and Kafka 4.0.0 or greater. | |||
* Strimzi Drain Cleaner updated to 1.4.0 (included in the Strimzi installation files) | |||
* Added support for Strimzi Metrics Reporter to the KafkaConnect and MirrorMaker2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After #11638 is merged, this has to be moved to the 0.48.0 section to avoid the same problem we had with the Kafka brokers part.
c52d7f5
to
5bc302d
Compare
@ppatierno @scholzj any feedback on this? We are already working on the Bridge support, but we would need this one to move on. Thanks. |
String configKey = "metric.reporters"; | ||
boolean hasUserMetricReporters = false; | ||
// the JmxReporter is explicitly added when metric.reporters is not empty | ||
List<Map.Entry<String, Boolean>> reporterConfig = List.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to feed this into a list instead of just printing it as it comes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary list of 3 elements that is used as an optimization to make the code less verbose. The alternative would be to get rid of this data structure and replace the for loop with explicit if statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really less verbose? Is it really more readable which is probably more important quality? TBH, I do not think so. Why is it not possible to just assemble it as a String right away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this code very hard to read. The code in KafkaConnectConfigurationBuilder.printMetricReporters is even more difficult to follow since there are two Lists and two a for loop within a for loop. I think we should aim for readability here over fewer lines of code.
private void maybeAddYammerMetricsReporters(KafkaConfiguration userConfig, boolean injectStrimziMetricsReporter) { | ||
private void printYammerMetricsReporters(KafkaConfiguration userConfig, | ||
boolean injectStrimziMetricsReporter) { | ||
OrderedProperties props = new OrderedProperties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an optimization where we are only printing the data at the end. It is one of the options you suggested in a previous comment:
We can either generate the configuration right away into the configuration file or if we really need to store something between the different steps, we can use the builder class for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, the configuration builder classes are all designed around just printing the configurations into the PrintWriter. When I suggested to store the state, I expected for example the user-configured metric reporters will be taken in the user configuration method, stored in the class variable and added later in the metrics method to the. Not that you generate everything and store it just to print it later. Not sure that makes sense. Don't the order properties have always only one entry anyway?
...erator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java
Show resolved
Hide resolved
* @param key List property key. | ||
* @param values List property values. | ||
*/ | ||
public static void createOrAddListProperty(OrderedProperties props, String key, String values) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea to move this into AbstractConfiguration made sense when this was invoked in the Configuration classes. But it does not seem to make much sense if you use it with different classes only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had this in ModelUtils and that wasn't ok. Where would you suggest to move it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was not OK to have it in ModelUtils, and use it for configurations, yes. But the overall idea was to avoid as much as possible the modifications of the configurations. In particular inserting our own configurations into them. So this method should not be needed for the Configuration classes if that is where we moved.
// metrics are collected into a single registry and exposed using the Kafka Connect listener, | ||
// so we need to disable the listener here to avoid opening the default port (8080) | ||
if (strimziMetricsReporterEnabled) { | ||
config.put("metric.reporters", StrimziMetricsReporterConfig.KAFKA_CLASS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I read this correctly that we do not expect metric reporters in MM2 connectors? Can we make such an assumption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we expect them.
MM2 connectors metrics are collected through this dedicated SMR instance into a shared Prometheus registry instance, so they can be exposed through the Kafka Connect SMR listener endpoint without enabling a new listener.
Comment updated for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We insert our own metrics reporter. But what I meant is that from the code, it seems we do not expect any user-configured metrics reporters there and just insert our own. Is there some reason for that? Or did I misunderstand the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. We also need to handle user config when present.
public class StrimziMetricsReporterConfig { | ||
/** KafkaMetricsReporter fully qualified class name. */ | ||
public static final String KAFKA_CLASS = "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"; | ||
|
||
/** YammerMetricsReporter fully qualified class name. */ | ||
public static final String YAMMER_CLASS = "io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter"; | ||
|
||
/** Enable HTTP listener (true/false). */ | ||
public static final String LISTENER_ENABLE = "prometheus.metrics.reporter.listener.enable"; | ||
|
||
/** Set listener endpoint (e.g. http://:8080). */ | ||
public static final String LISTENER = "prometheus.metrics.reporter.listener"; | ||
|
||
/** Set a comma separated list of regexes. */ | ||
public static final String ALLOW_LIST = "prometheus.metrics.reporter.allowlist"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to have these option in the Metrics Reporter itself and load it from there? Or do you think it is better to avoid it as a CO depedency and define them here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a level of indirection, similar to what we have for CruiseControl. It would be great to have them from the SMR, but we would still need to complement with additional configurations (e.g. FQCNs), and it's handy to have them grouped here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we need to complement them? Or what would we need to complement them with? I do not understand that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the fully qualified class names in addition to configuration keys.
@scholzj we are ready with the following PR, so it would be helpful if you could have another look at this. |
writer.println("prometheus.metrics.reporter.listener.enable=true"); | ||
writer.println("prometheus.metrics.reporter.listener=http://:" + StrimziMetricsReporterModel.METRICS_PORT); | ||
writer.println("prometheus.metrics.reporter.allowlist=" + reporterModel.getAllowList()); | ||
writer.println(StrimziMetricsReporterConfig.LISTENER_ENABLE + "=true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right in understanding that these changes are here because we added the StrimziMetricsReporterConfig class in this PR?
This patch adds trimzi Metrics Reporter integration with KC and MM2. Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
b58bedb
to
ab0afc3
Compare
@scholzj @katheris @ppatierno last commit should implement the strategy we discussed during the community meeting and address latest comments. Let us know if you are happy with it. |
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Type of change
Description
This patch adds support for the Strimzi Metrics Reporter to Kafka Connect and MirrorMaker2 as described by the following proposal:
https://github.com/strimzi/proposals/blob/main/064-prometheus-metrics-reporter.md
Related to #10753
We won’t initially support the CruiseControl component. To make it work, CC should be changed to expose metrics through its HTTP endpoint.
Documentation will be updated in a separate PR.
Checklist