diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index bbcd0de844..bbbe72c3b0 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -32,6 +32,7 @@ import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; +import io.strimzi.operator.common.InvalidConfigurationException; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; @@ -40,6 +41,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; @@ -780,16 +782,14 @@ private void configProviders(KafkaConfiguration userConfig) { } else { strimziConfigProviders = "strimzienv"; } - - if (userConfig != null - && !userConfig.getConfiguration().isEmpty() - && userConfig.getConfigOption("config.providers") != null) { - writer.println("# Configuration providers configured by the user and by Strimzi"); - writer.println("config.providers=" + userConfig.getConfigOption("config.providers") + "," + strimziConfigProviders); - userConfig.removeConfigOption("config.providers"); - } else { + + final String userConfigProviders = getUserConfigProviderAliases(strimziConfigProviders, userConfig); + if ("".equals(userConfigProviders)) { writer.println("# Configuration providers configured by Strimzi"); writer.println("config.providers=" + strimziConfigProviders); + } else { + writer.println("# Configuration providers configured by the user and by Strimzi"); + writer.println("config.providers=" + userConfigProviders + "," + strimziConfigProviders); } writer.println("config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"); @@ -805,6 +805,32 @@ private void configProviders(KafkaConfiguration userConfig) { writer.println(); } + + /** + * Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would overwrite the Strimzi defined configuration providers + * + * @param strimziConfigProviders The Strimzi defined configuration providers + * @param userConfig The user configuration to extract the possible user-provided config provider configuration from + * @return The user defined Kafka configuration provider aliases or empty string + */ + private String getUserConfigProviderAliases(String strimziConfigProviders, KafkaConfiguration userConfig) { + String userConfigProviderAliases = ""; + if (userConfig != null + && !userConfig.getConfiguration().isEmpty() + && userConfig.getConfigOption("config.providers") != null) { + userConfigProviderAliases = userConfig.getConfigOption("config.providers"); + Collection userAliases = Arrays.asList(userConfigProviderAliases.split(",")); + Collection strimziAliases = Arrays.asList(strimziConfigProviders.split(",")); + + userAliases.stream().forEach(alias -> { + if (strimziAliases.contains(alias)) { + throw new InvalidConfigurationException("config.provider " + alias + " not permitted as it reserved for Strimzi. Not permitted aliases: " + strimziAliases); + } + }); + userConfig.removeConfigOption("config.providers"); + } + return userConfigProviderAliases; + } /** * Adds the configurations passed by the user in the Kafka CR, injecting Strimzi configurations when needed. diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index 8017c93776..c206ca8bd4 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -36,6 +36,7 @@ import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; +import io.strimzi.operator.common.InvalidConfigurationException; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.test.annotations.ParallelSuite; @@ -600,6 +601,22 @@ public void testUserConfigurationWithConfigProviders() { "config.providers.strimzienv.param.allowlist.pattern=.*", "config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider")); } + + @ParallelTest + public void testUserConfigurationWithInvalidConfigProviders() { + Map userConfiguration = new HashMap<>(); + userConfiguration.put("config.providers", "env,strimzienv"); + userConfiguration.put("config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); + userConfiguration.put("config.providers.strimzienv.class", "org.apache.kafka.common.config.provider.UserConfigProvider"); + + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet()); + + assertThrows(InvalidConfigurationException.class, () -> { + new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF) + .withUserConfiguration(kafkaConfiguration, false, false, false) + .build(); + }, "InvalidConfigurationException was expected"); + } @ParallelTest public void testNullUserConfigurationWithJmxMetricsReporter() {