From 97c7107d45e862b166a755e3c58f79af0631d5de Mon Sep 17 00:00:00 2001 From: MichaelMorris Date: Thu, 29 May 2025 14:58:01 +0100 Subject: [PATCH 1/3] Ignore user config provider aliases with conflict Signed-off-by: MichaelMorris --- .../KafkaBrokerConfigurationBuilder.java | 47 +++++++++++++++---- .../KafkaBrokerConfigurationBuilderTest.java | 3 +- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index bbcd0de844..ef7343cfa6 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -33,6 +33,7 @@ import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.ReconciliationLogger; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import java.io.PrintWriter; @@ -40,8 +41,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -58,6 +61,7 @@ * generate the configuration file, it is using the PrintWriter. */ public class KafkaBrokerConfigurationBuilder { + private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaBrokerConfigurationBuilder.class.getName()); private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090"; private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091"; // Names of environment variables expanded through config providers inside the Kafka node @@ -780,16 +784,14 @@ private void configProviders(KafkaConfiguration userConfig) { } else { strimziConfigProviders = "strimzienv"; } - - if (userConfig != null - && !userConfig.getConfiguration().isEmpty() - && userConfig.getConfigOption("config.providers") != null) { - writer.println("# Configuration providers configured by the user and by Strimzi"); - writer.println("config.providers=" + userConfig.getConfigOption("config.providers") + "," + strimziConfigProviders); - userConfig.removeConfigOption("config.providers"); - } else { + + final String userConfigProviders = getUserConfigProviderAliases(strimziConfigProviders, userConfig); + if ("".equals(userConfigProviders)) { writer.println("# Configuration providers configured by Strimzi"); writer.println("config.providers=" + strimziConfigProviders); + } else { + writer.println("# Configuration providers configured by the user and by Strimzi"); + writer.println("config.providers=" + userConfigProviders + "," + strimziConfigProviders); } writer.println("config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"); @@ -805,6 +807,35 @@ private void configProviders(KafkaConfiguration userConfig) { writer.println(); } + + /** + * Get the user provided Kafka configuration provider aliases, dropping any that would overwrite the Strimzi defined configuration providers + * + * @param strimziConfigProviders The Strimzi defined configuration providers + * @param userConfig The user configuration to extract the possible user-provided config provider configuration from + * @return The user defined Kafka configuration provider aliases or empty string + */ + private String getUserConfigProviderAliases(String strimziConfigProviders, KafkaConfiguration userConfig) { + if (userConfig != null + && !userConfig.getConfiguration().isEmpty() + && userConfig.getConfigOption("config.providers") != null) { + Collection userAliases = Arrays.asList(userConfig.getConfigOption("config.providers").split(",")); + Collection strimziAliases = Arrays.asList(strimziConfigProviders.split(",")); + + Set validUserAliases = new HashSet<>(); + userAliases.stream().forEach(alias -> { + if (strimziAliases.contains(alias)) { + LOGGER.warnCr(reconciliation, "config.provider " + alias + " ignored as it is not permitted. Not permitted aliases: " + strimziAliases); + userConfig.removeConfigOption("config.providers." + alias + ".class"); + } else { + validUserAliases.add(alias); + } + }); + userConfig.removeConfigOption("config.providers"); + return String.join(",", validUserAliases); + } + return ""; + } /** * Adds the configurations passed by the user in the Kafka CR, injecting Strimzi configurations when needed. diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index 8017c93776..b1afe7707a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -569,8 +569,9 @@ public void testUserConfiguration() { @ParallelTest public void testUserConfigurationWithConfigProviders() { Map userConfiguration = new HashMap<>(); - userConfiguration.put("config.providers", "env"); + userConfiguration.put("config.providers", "env,strimzienv"); userConfiguration.put("config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); + userConfiguration.put("config.providers.strimzienv.class", "org.apache.kafka.common.config.provider.UserConfigProvider"); KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet()); From 5a2831c97438c2970c4f9c629316ba216b2700dc Mon Sep 17 00:00:00 2001 From: MichaelMorris Date: Wed, 11 Jun 2025 18:54:53 +0100 Subject: [PATCH 2/3] Addressed code review comment Signed-off-by: MichaelMorris --- .../KafkaBrokerConfigurationBuilder.java | 19 +++++++----------- .../KafkaBrokerConfigurationBuilderTest.java | 20 +++++++++++++++++-- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index ef7343cfa6..f66038ff45 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -32,8 +32,8 @@ import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; +import io.strimzi.operator.common.InvalidConfigurationException; import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.ReconciliationLogger; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import java.io.PrintWriter; @@ -44,7 +44,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -61,7 +60,6 @@ * generate the configuration file, it is using the PrintWriter. */ public class KafkaBrokerConfigurationBuilder { - private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaBrokerConfigurationBuilder.class.getName()); private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090"; private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091"; // Names of environment variables expanded through config providers inside the Kafka node @@ -809,32 +807,29 @@ private void configProviders(KafkaConfiguration userConfig) { } /** - * Get the user provided Kafka configuration provider aliases, dropping any that would overwrite the Strimzi defined configuration providers + * Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would that overwrite the Strimzi defined configuration providers * * @param strimziConfigProviders The Strimzi defined configuration providers * @param userConfig The user configuration to extract the possible user-provided config provider configuration from * @return The user defined Kafka configuration provider aliases or empty string */ private String getUserConfigProviderAliases(String strimziConfigProviders, KafkaConfiguration userConfig) { + String userConfigProviderAliases = ""; if (userConfig != null && !userConfig.getConfiguration().isEmpty() && userConfig.getConfigOption("config.providers") != null) { - Collection userAliases = Arrays.asList(userConfig.getConfigOption("config.providers").split(",")); + userConfigProviderAliases = userConfig.getConfigOption("config.providers"); + Collection userAliases = Arrays.asList(userConfigProviderAliases.split(",")); Collection strimziAliases = Arrays.asList(strimziConfigProviders.split(",")); - Set validUserAliases = new HashSet<>(); userAliases.stream().forEach(alias -> { if (strimziAliases.contains(alias)) { - LOGGER.warnCr(reconciliation, "config.provider " + alias + " ignored as it is not permitted. Not permitted aliases: " + strimziAliases); - userConfig.removeConfigOption("config.providers." + alias + ".class"); - } else { - validUserAliases.add(alias); + throw new InvalidConfigurationException("config.provider " + alias + " not permitted as it reserved for Strimzi. Not permitted aliases: " + strimziAliases); } }); userConfig.removeConfigOption("config.providers"); - return String.join(",", validUserAliases); } - return ""; + return userConfigProviderAliases; } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index b1afe7707a..c206ca8bd4 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -36,6 +36,7 @@ import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; +import io.strimzi.operator.common.InvalidConfigurationException; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.test.annotations.ParallelSuite; @@ -569,9 +570,8 @@ public void testUserConfiguration() { @ParallelTest public void testUserConfigurationWithConfigProviders() { Map userConfiguration = new HashMap<>(); - userConfiguration.put("config.providers", "env,strimzienv"); + userConfiguration.put("config.providers", "env"); userConfiguration.put("config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); - userConfiguration.put("config.providers.strimzienv.class", "org.apache.kafka.common.config.provider.UserConfigProvider"); KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet()); @@ -601,6 +601,22 @@ public void testUserConfigurationWithConfigProviders() { "config.providers.strimzienv.param.allowlist.pattern=.*", "config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider")); } + + @ParallelTest + public void testUserConfigurationWithInvalidConfigProviders() { + Map userConfiguration = new HashMap<>(); + userConfiguration.put("config.providers", "env,strimzienv"); + userConfiguration.put("config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); + userConfiguration.put("config.providers.strimzienv.class", "org.apache.kafka.common.config.provider.UserConfigProvider"); + + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet()); + + assertThrows(InvalidConfigurationException.class, () -> { + new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF) + .withUserConfiguration(kafkaConfiguration, false, false, false) + .build(); + }, "InvalidConfigurationException was expected"); + } @ParallelTest public void testNullUserConfigurationWithJmxMetricsReporter() { From 087779eb233b62570327f3ae55a442e615e2107f Mon Sep 17 00:00:00 2001 From: MichaelMorris Date: Fri, 13 Jun 2025 14:29:04 +0100 Subject: [PATCH 3/3] Corrected javadoc comment Signed-off-by: MichaelMorris --- .../operator/cluster/model/KafkaBrokerConfigurationBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index f66038ff45..bbbe72c3b0 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -807,7 +807,7 @@ private void configProviders(KafkaConfiguration userConfig) { } /** - * Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would that overwrite the Strimzi defined configuration providers + * Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would overwrite the Strimzi defined configuration providers * * @param strimziConfigProviders The Strimzi defined configuration providers * @param userConfig The user configuration to extract the possible user-provided config provider configuration from