From 222e0492084ac565d6ae73639d1ebd7518914f90 Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Tue, 10 Jun 2025 15:38:57 -0400 Subject: [PATCH 1/4] Disable CC resource goals when resource capacities are not set. Signed-off-by: Kyle Liberti --- CHANGELOG.md | 1 + .../operator/cluster/model/CruiseControl.java | 17 +- .../model/cruisecontrol/BrokerCapacity.java | 106 ---- .../cluster/model/cruisecontrol/Capacity.java | 455 ------------------ .../cruisecontrol/CapacityConfiguration.java | 313 ++++++++++++ .../model/cruisecontrol/CpuCapacity.java | 93 +++- .../CruiseControlConfiguration.java | 85 +++- .../model/cruisecontrol/DiskCapacity.java | 110 ++++- .../cruisecontrol/InboundNetworkCapacity.java | 33 ++ .../model/cruisecontrol/NetworkCapacity.java | 36 ++ .../OutboundNetworkCapacity.java | 33 ++ .../cruisecontrol/ResourceCapacityType.java | 153 ++++++ .../cluster/model/CruiseControlTest.java | 109 +++-- .../CruiseControlConfigurationTest.java | 213 ++++++++ .../assembly/CruiseControlReconcilerTest.java | 4 +- .../ResourceRequirementsType.java | 30 ++ .../ResourceRequirementsUtils.java | 76 +++ .../resourcerequirements/ResourceType.java | 35 ++ 18 files changed, 1239 insertions(+), 663 deletions(-) delete mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java delete mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java create mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java create mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java create mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c8b3c27959e..9f3aad63dd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.47.0 +* Disable CC resource goals when resource capacities are not set. * Adding progress tracking for Cruise Control rebalances * Add support for Kafka 3.9.1 * Fixed MirrorMaker 2 client rack init container override being ignored. diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 8b965894d9c..0476537b8dd 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -35,7 +35,7 @@ import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlTemplate; import io.strimzi.certs.CertAndKey; import io.strimzi.operator.cluster.ClusterOperatorConfig; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CapacityConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.HashLoginServiceApiCredentials; import io.strimzi.operator.cluster.model.logging.LoggingModel; @@ -65,6 +65,7 @@ import static io.strimzi.api.kafka.model.common.template.DeploymentStrategy.ROLLING_UPDATE; import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS; import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_GOALS; +import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.generateCruiseControlDefaultPropertiesMap; import static java.lang.String.format; /** @@ -111,7 +112,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup private boolean authEnabled; private HashLoginServiceApiCredentials apiCredentials; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method - protected Capacity capacity; + protected CapacityConfiguration capacityConfiguration; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method private MetricsModel metrics; private LoggingModel logging; @@ -197,7 +198,8 @@ public static CruiseControl fromCrd( result.image = image; KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); - result.updateConfigurationWithDefaults(ccSpec, kafkaConfiguration); + result.capacityConfiguration = new CapacityConfiguration(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); + result.updateConfigurationWithDefaults(ccSpec, result.capacityConfiguration, kafkaConfiguration); CruiseControlConfiguration ccConfiguration = result.configuration; result.sslEnabled = ccConfiguration.isApiSslEnabled(); @@ -207,7 +209,6 @@ public static CruiseControl fromCrd( // To avoid illegal storage configurations provided by the user, // we rely on the storage configuration provided by the KafkaAssemblyOperator - result.capacity = new Capacity(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.gcLoggingEnabled = ccSpec.getJvmOptions() == null ? JvmOptions.DEFAULT_GC_LOGGING_ENABLED : ccSpec.getJvmOptions().isGcLoggingEnabled(); @@ -243,8 +244,10 @@ public static CruiseControl fromCrd( } } - private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) { - Map defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap()); + private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, + CapacityConfiguration capacityConfiguration, + KafkaConfiguration kafkaConfiguration) { + Map defaultCruiseControlProperties = generateCruiseControlDefaultPropertiesMap(capacityConfiguration); if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) { defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR)); } @@ -530,7 +533,7 @@ public LoggingModel logging() { public ConfigMap generateConfigMap(MetricsAndLogging metricsAndLogging) { Map configMapData = new HashMap<>(); configMapData.put(SERVER_CONFIG_FILENAME, configuration.asOrderedProperties().asPairs()); - configMapData.put(CAPACITY_CONFIG_FILENAME, capacity.toString()); + configMapData.put(CAPACITY_CONFIG_FILENAME, capacityConfiguration.toString()); configMapData.putAll(ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)); return ConfigMapUtils diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java deleted file mode 100644 index ff45addd045..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -/** - * Configures the Kafka broker capacity - */ -public class BrokerCapacity { - // CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry. - // CC designates the id of this default broker entry as "-1". - /** - * Default broker ID - */ - public static final int DEFAULT_BROKER_ID = -1; - - /** - * Default outbound network capacity - */ - public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB."; - /** - * Default cpu core capacity - */ - public static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; - protected static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; - protected static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private int id; - private CpuCapacity cpu; - private DiskCapacity disk; - private String inboundNetwork; - private String outboundNetwork; - private final String doc; - - /** - * Constructor - * - * @param brokerId ID of the broker - * @param cpu CPU capacity - * @param disk Disk capacity - * @param inboundNetwork Inbound network capacity - * @param outboundNetwork Outbound network capacity - */ - public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) { - this.id = brokerId; - this.cpu = cpu; - this.disk = disk; - this.inboundNetwork = inboundNetwork; - this.outboundNetwork = outboundNetwork; - this.doc = brokerId == -1 ? DEFAULT_BROKER_DOC : "Capacity for Broker " + brokerId; - } - - /** - * @return Broker ID - */ - protected Integer getId() { - return id; - } - - /** - * @return CPU capacity - */ - public CpuCapacity getCpu() { - return cpu; - } - - /** - * @return Disk capacity - */ - protected DiskCapacity getDisk() { - return disk; - } - - /** - * @return Inbound network capacity - */ - public String getInboundNetwork() { - return inboundNetwork; - } - - /** - * @return Outbound network capacity - */ - public String getOutboundNetwork() { - return outboundNetwork; - } - - protected String getDoc() { - return doc; - } - - protected void setCpu(CpuCapacity cpu) { - this.cpu = cpu; - } - - protected void setInboundNetwork(String inboundNetwork) { - this.inboundNetwork = inboundNetwork; - } - - protected void setOutboundNetwork(String outboundNetwork) { - this.outboundNetwork = outboundNetwork; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java deleted file mode 100644 index e1bc6e26e26..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -import io.fabric8.kubernetes.api.model.Quantity; -import io.fabric8.kubernetes.api.model.ResourceRequirements; -import io.strimzi.api.kafka.model.kafka.EphemeralStorage; -import io.strimzi.api.kafka.model.kafka.JbodStorage; -import io.strimzi.api.kafka.model.kafka.KafkaSpec; -import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; -import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; -import io.strimzi.api.kafka.model.kafka.Storage; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; -import io.strimzi.operator.cluster.model.NodeRef; -import io.strimzi.operator.cluster.model.Quantities; -import io.strimzi.operator.cluster.model.StorageUtils; -import io.strimzi.operator.cluster.model.VolumeUtils; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.ReconciliationLogger; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -/** - * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for - * Cruise Control's Broker Capacity File Resolver. - * - * - * For example, it takes a Kafka Custom Resource like the following: - * - * spec: - * kafka: - * replicas: 3 - * storage: - * type: jbod - * volumes: - * - id: 0 - * type: persistent-claim - * size: 100Gi - * deleteClaim: false - * - id: 1 - * type: persistent-claim - * size: 200Gi - * deleteClaim: false - * cruiseControl: - * brokerCapacity: - * cpu: "1" - * inboundNetwork: 10000KB/s - * outboundNetwork: 10000KB/s - * overrides: - * - brokers: [0] - * cpu: "2.345" - * outboundNetwork: 40000KB/s - * - brokers: [1, 2] - * cpu: 4000m - * inboundNetwork: 60000KB/s - * outboundNetwork: 20000KB/s - * - * and uses the information to create Cruise Control BrokerCapacityFileResolver config file like the following: - * - * { - * "brokerCapacities":[ - * { - * "brokerId": "-1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log-1": "100000", - * "/var/lib/kafka1/kafka-log-1": "200000" - * }, - * "CPU": {"num.cores": "1"}, - * "NW_IN": "10000", - * "NW_OUT": "10000" - * }, - * "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - * }, - * { - * "brokerId": "0", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log0": "100000", - * "/var/lib/kafka1/kafka-log0": "200000" - * }, - * "CPU": {"num.cores": "2.345"}, - * "NW_IN": "10000", - * "NW_OUT": "40000" - * }, - * "doc": "Capacity for Broker 0" - * }, - * { - * "brokerId": "1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log1": "100000", - * "/var/lib/kafka1/kafka-log1": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 1" - * }, - * "brokerId": "2", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log2": "100000", - * "/var/lib/kafka1/kafka-log2": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 2" - * } - * ] - * } - */ -public class Capacity { - protected static final ReconciliationLogger LOGGER = ReconciliationLogger.create(Capacity.class.getName()); - private final Reconciliation reconciliation; - private final TreeMap capacityEntries; - - /** - * Broker capacities key - */ - public static final String CAPACITIES_KEY = "brokerCapacities"; - - /** - * Capacity key - */ - public static final String CAPACITY_KEY = "capacity"; - - /** - * Disk key - */ - public static final String DISK_KEY = "DISK"; - - /** - * CPU key - */ - public static final String CPU_KEY = "CPU"; - - /** - * Inbound network key - */ - public static final String INBOUND_NETWORK_KEY = "NW_IN"; - - /** - * Outbound network key - */ - public static final String OUTBOUND_NETWORK_KEY = "NW_OUT"; - - /** - * Resource type - */ - public static final String RESOURCE_TYPE = "cpu"; - - private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; - private static final String KAFKA_LOG_DIR = "kafka-log"; - private static final String BROKER_ID_KEY = "brokerId"; - private static final String DOC_KEY = "doc"; - - private enum ResourceRequirementType { - REQUEST, - LIMIT; - - private Quantity getQuantity(ResourceRequirements resources) { - Map resourceRequirement = switch (this) { - case REQUEST -> resources.getRequests(); - case LIMIT -> resources.getLimits(); - }; - if (resourceRequirement != null) { - return resourceRequirement.get(RESOURCE_TYPE); - } - return null; - } - } - - /** - * Constructor - * - * @param reconciliation Reconciliation marker - * @param spec Spec of the Kafka custom resource - * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster - * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools - * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools - */ - public Capacity( - Reconciliation reconciliation, - KafkaSpec spec, - Set kafkaBrokerNodes, - Map kafkaStorage, - Map kafkaBrokerResources - ) { - this.reconciliation = reconciliation; - this.capacityEntries = new TreeMap<>(); - - processCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); - } - - private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) { - if (resources != null) { - Quantity quantity = requirementType.getQuantity(resources); - if (quantity != null) { - return Quantities.parseCpuAsMilliCpus(quantity.toString()); - } - } - return null; - } - - private static CpuCapacity getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { - Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST); - Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT); - - if (request != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(request)); - } else if (limit != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(limit)); - } else { - return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); - } - } - - /** - * The brokerCapacity overrides per broker take top precedence, then general brokerCapacity configuration, and then the Kafka resource requests, then the Kafka resource limits. - * For example: - * (1) brokerCapacity overrides - * (2) brokerCapacity - * (3) Kafka resource requests - * (4) Kafka resource limits - * When none of Cruise Control CPU capacity configurations mentioned above are configured, CPU capacity will be set to `1`. - * - * @param override brokerCapacity overrides (per broker) - * @param bc brokerCapacity (for all brokers) - * @param resourceRequirements Kafka resource requests and limits (for all brokers) - * @return A {@link CpuCapacity} object containing the specified capacity for a broker - */ - private CpuCapacity processCpu(BrokerCapacityOverride override, io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, ResourceRequirements resourceRequirements) { - if (override != null && override.getCpu() != null) { - return new CpuCapacity(override.getCpu()); - } else if (bc != null && bc.getCpu() != null) { - return new CpuCapacity(bc.getCpu()); - } - return getCpuBasedOnRequirements(resourceRequirements); - } - - private static DiskCapacity processDisk(Storage storage, int brokerId) { - if (storage instanceof JbodStorage) { - return generateJbodDiskCapacity(storage, brokerId); - } else { - return generateDiskCapacity(storage); - } - } - - private static String processInboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getInboundNetwork() != null) { - return getThroughputInKiB(override.getInboundNetwork()); - } else if (bc != null && bc.getInboundNetwork() != null) { - return getThroughputInKiB(bc.getInboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - private static String processOutboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getOutboundNetwork() != null) { - return getThroughputInKiB(override.getOutboundNetwork()); - } else if (bc != null && bc.getOutboundNetwork() != null) { - return getThroughputInKiB(bc.getOutboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - /** - * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @param brokerId Id of the broker - * @return Disk capacity configuration value for broker brokerId - */ - private static DiskCapacity generateJbodDiskCapacity(Storage storage, int brokerId) { - DiskCapacity disks = new DiskCapacity(); - String size = ""; - - for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { - String name = VolumeUtils.createVolumePrefix(volume.getId(), true); - String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; - - if (volume instanceof PersistentClaimStorage) { - size = ((PersistentClaimStorage) volume).getSize(); - } else if (volume instanceof EphemeralStorage) { - size = ((EphemeralStorage) volume).getSizeLimit(); - } - disks.add(path, String.valueOf(getSizeInMiB(size))); - } - return disks; - } - - /** - * Generate total disk capacity using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @return Disk capacity per broker - */ - private static DiskCapacity generateDiskCapacity(Storage storage) { - if (storage instanceof PersistentClaimStorage) { - return DiskCapacity.of(getSizeInMiB(((PersistentClaimStorage) storage).getSize())); - } else if (storage instanceof EphemeralStorage) { - if (((EphemeralStorage) storage).getSizeLimit() != null) { - return DiskCapacity.of(getSizeInMiB(((EphemeralStorage) storage).getSizeLimit())); - } else { - return DiskCapacity.of(BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB); - } - } else if (storage == null) { - throw new IllegalStateException("The storage declaration is missing"); - } else { - throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); - } - } - - /* - * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, - * into the equivalent number of mebibytes represented as a String. - * - * @param size The String representation of the volume size. - * @return The equivalent number of mebibytes. - */ - private static String getSizeInMiB(String size) { - if (size == null) { - return BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB; - } - return String.valueOf(StorageUtils.convertTo(size, "Mi")); - } - - /** - * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, - * into the equivalent number of kibibytes represented as a String. - * - * @param throughput The String representation of the throughput. - * @return The equivalent number of kibibytes. - */ - public static String getThroughputInKiB(String throughput) { - String size = throughput.substring(0, throughput.indexOf("B")); - return String.valueOf(StorageUtils.convertTo(size, "Ki")); - } - - private void processCapacityEntries(CruiseControlSpec spec, Set kafkaBrokerNodes, Map kafkaStorage, Map kafkaBrokerResources) { - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacity = spec.getBrokerCapacity(); - - String inboundNetwork = processInboundNetwork(brokerCapacity, null); - String outboundNetwork = processOutboundNetwork(brokerCapacity, null); - - // Initialize default capacities - for (NodeRef node : kafkaBrokerNodes) { - DiskCapacity disk = processDisk(kafkaStorage.get(node.poolName()), node.nodeId()); - CpuCapacity cpu = processCpu(null, brokerCapacity, kafkaBrokerResources.get(node.poolName())); - - BrokerCapacity broker = new BrokerCapacity(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); - capacityEntries.put(node.nodeId(), broker); - } - - // Override default capacities - if (brokerCapacity != null) { - List overrides = brokerCapacity.getOverrides(); - // Override broker entries - if (overrides != null) { - if (overrides.isEmpty()) { - LOGGER.warnCr(reconciliation, "Ignoring empty overrides list"); - } else { - // For checking for duplicate brokerIds - Set overrideIds = new HashSet<>(); - for (BrokerCapacityOverride override : overrides) { - List ids = override.getBrokers(); - inboundNetwork = processInboundNetwork(brokerCapacity, override); - outboundNetwork = processOutboundNetwork(brokerCapacity, override); - for (int id : ids) { - if (id == BrokerCapacity.DEFAULT_BROKER_ID) { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override with illegal broker id -1."); - } else { - if (capacityEntries.containsKey(id)) { - if (overrideIds.add(id)) { - BrokerCapacity brokerCapacityEntry = capacityEntries.get(id); - brokerCapacityEntry.setCpu(processCpu(override, brokerCapacity, kafkaBrokerResources.get(Integer.toString(id)))); - brokerCapacityEntry.setInboundNetwork(inboundNetwork); - brokerCapacityEntry.setOutboundNetwork(outboundNetwork); - } else { - LOGGER.warnCr(reconciliation, "Duplicate broker id {} found in overrides, using first occurrence.", id); - } - } else { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); - overrideIds.add(id); - } - } - } - } - } - } - } - } - /** - * Generate broker capacity entry for capacity configuration. - * - * @param brokerCapacity Broker capacity object - * @return Broker entry as a JsonObject - */ - private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) { - return new JsonObject() - .put(BROKER_ID_KEY, brokerCapacity.getId()) - .put(CAPACITY_KEY, new JsonObject() - .put(DISK_KEY, brokerCapacity.getDisk().getJson()) - .put(CPU_KEY, brokerCapacity.getCpu().getJson()) - .put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork()) - .put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork()) - ) - .put(DOC_KEY, brokerCapacity.getDoc()); - } - - /** - * Generate a capacity configuration for cluster - * - * @return Cruise Control capacity configuration as a JsonObject - */ - public JsonObject generateCapacityConfig() { - JsonArray brokerList = new JsonArray(); - for (BrokerCapacity brokerCapacity : capacityEntries.values()) { - JsonObject brokerEntry = generateBrokerCapacity(brokerCapacity); - brokerList.add(brokerEntry); - } - - JsonObject config = new JsonObject(); - config.put("brokerCapacities", brokerList); - - return config; - } - - @Override - public String toString() { - return generateCapacityConfig().encodePrettily(); - } - - /** - * @return Capacity entries - */ - public TreeMap getCapacityEntries() { - return capacityEntries; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java new file mode 100644 index 00000000000..c33a4a68b6d --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java @@ -0,0 +1,313 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.ReconciliationLogger; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.INBOUND_NETWORK; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; + +/** + * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for + * Cruise Control's Broker Capacity File Resolver. + * + * + * For example, it takes a Kafka Custom Resource like the following: + * + * spec: + * kafka: + * replicas: 3 + * storage: + * type: jbod + * volumes: + * - id: 0 + * type: persistent-claim + * size: 100Gi + * deleteClaim: false + * - id: 1 + * type: persistent-claim + * size: 200Gi + * deleteClaim: false + * cruiseControl: + * brokerCapacity: + * cpu: "1" + * inboundNetwork: 10000KB/s + * outboundNetwork: 10000KB/s + * overrides: + * - brokers: [0] + * cpu: "2.345" + * outboundNetwork: 40000KB/s + * - brokers: [1, 2] + * cpu: 4000m + * inboundNetwork: 60000KB/s + * outboundNetwork: 20000KB/s + * + * and uses the information to create Cruise Control BrokerCapacityFileResolver config file like the following: + * + * { + * "brokerCapacities":[ + * { + * "brokerId": "-1", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log-1": "100000", + * "/var/lib/kafka1/kafka-log-1": "200000" + * }, + * "CPU": {"num.cores": "1"}, + * "NW_IN": "10000", + * "NW_OUT": "10000" + * }, + * "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + * }, + * { + * "brokerId": "0", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log0": "100000", + * "/var/lib/kafka1/kafka-log0": "200000" + * }, + * "CPU": {"num.cores": "2.345"}, + * "NW_IN": "10000", + * "NW_OUT": "40000" + * }, + * "doc": "Capacity for Broker 0" + * }, + * { + * "brokerId": "1", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log1": "100000", + * "/var/lib/kafka1/kafka-log1": "200000" + * }, + * "CPU": {"num.cores": "4"}, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 1" + * }, + * "brokerId": "2", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log2": "100000", + * "/var/lib/kafka1/kafka-log2": "200000" + * }, + * "CPU": {"num.cores": "4"}, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 2" + * } + * ] + * } + */ +public class CapacityConfiguration { + /** + * Broker capacities key + */ + public static final String CAPACITIES_KEY = "brokerCapacities"; + /** + * Capacity key + */ + public static final String CAPACITY_KEY = "capacity"; + + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CapacityConfiguration.class.getName()); + + private static final String BROKER_ID_KEY = "brokerId"; + private static final String DOC_KEY = "doc"; + + private final Reconciliation reconciliation; + private final TreeMap capacityEntries; + + private boolean isCpuConfigured; + private boolean isInboundNetworkConfigured; + private boolean isOutboundNetworkConfigured; + + /** + * Constructor + * + * @param reconciliation Reconciliation marker + * @param spec Spec of the Kafka custom resource + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster + * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools + * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools + */ + public CapacityConfiguration( + Reconciliation reconciliation, + KafkaSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources + ) { + this.reconciliation = reconciliation; + this.capacityEntries = new TreeMap<>(); + + generateCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); + } + + private Map processBrokerCapacityOverrides(Set kafkaBrokerNodes, BrokerCapacity brokerCapacity) { + Map overrideMap = new HashMap<>(); + List overrides = null; + if (brokerCapacity != null) { + overrides = brokerCapacity.getOverrides(); + } + if (overrides != null) { + if (overrides.isEmpty()) { + LOGGER.warnCr(reconciliation, "Ignoring empty overrides list"); + } else { + for (BrokerCapacityOverride override : overrides) { + List ids = override.getBrokers(); + for (int id : ids) { + if (overrideMap.containsKey(id)) { + LOGGER.warnCr(reconciliation, "Duplicate broker id {} found in overrides, using first occurrence.", id); + } else if (kafkaBrokerNodes.stream().noneMatch(node -> node.nodeId() == id)) { + LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); + } else { + overrideMap.put(id, override); + } + } + } + } + } + return overrideMap; + } + + private void generateCapacityEntries(CruiseControlSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources) { + BrokerCapacity generalBrokerCapacity = spec.getBrokerCapacity(); + Map brokerCapacityOverrideMap = processBrokerCapacityOverrides(kafkaBrokerNodes, generalBrokerCapacity); + + for (NodeRef node : kafkaBrokerNodes) { + BrokerCapacityOverride brokerCapacityOverride = brokerCapacityOverrideMap.get(node.nodeId()); + + DiskCapacity disk = new DiskCapacity(kafkaStorage.get(node.poolName()), node.nodeId()); + CpuCapacity cpu = new CpuCapacity(generalBrokerCapacity, brokerCapacityOverride, kafkaBrokerResources.get(node.poolName())); + InboundNetworkCapacity inboundNetwork = new InboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride); + OutboundNetworkCapacity outboundNetwork = new OutboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride); + + CapacityEntry capacityEntry = new CapacityEntry(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); + capacityEntries.put(node.nodeId(), capacityEntry); + } + isCpuConfigured = CPU.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); + isInboundNetworkConfigured = INBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); + isOutboundNetworkConfigured = OUTBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); + } + + /** + * Indicates whether the CPU capacity settings were explicitly configured by the user. + * + * @return {@code true} if CPU capacity is user-configured; {@code false} otherwise. + */ + public boolean isCpuConfigured() { + return this.isCpuConfigured; + } + + /** + * Indicates whether the inbound network capacity settings were explicitly configured by the user. + * + * @return {@code true} if inbound network capacity is user-configured; {@code false} otherwise. + */ + public boolean isInboundNetworkConfigured() { + return this.isInboundNetworkConfigured; + } + + /** + * Indicates whether the outbound network capacity settings were explicitly configured by the user. + * + * @return {@code true} if outbound network capacity is user-configured; {@code false} otherwise. + */ + public boolean isOutboundNetworkConfigured() { + return this.isOutboundNetworkConfigured; + } + + /** + * Checks whether this resource type has its capacityConfiguration homogeneously configured across all brokers. + * @param resource The resource type. + * @return {@code true} if capacityConfiguration is homogeneously configured; {@code false} otherwise. + */ + public boolean isCapacityHomogeneouslyConfigured(String resource) { + return this.getCapacityEntries().values().stream() + .map(entry -> entry.capacity().get(resource)) + .distinct() + .limit(2) + .count() == 1; + } + + /** + * Represents a Cruise Control capacity entry configuration for a Kafka broker. + * + * @param id The broker ID. + * @param capacity The capacity map for the broker capacity entry. + * @param doc A human-readable description of this capacity entry. + */ + public record CapacityEntry( + int id, + Map capacity, + String doc + ) { + private CapacityEntry(int id, CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork, + OutboundNetworkCapacity outboundNetwork) { + this(id, buildCapacityMap(cpu, disk, inboundNetwork, outboundNetwork), "Capacity for Broker " + id); + } + + private static Map buildCapacityMap(CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork, + OutboundNetworkCapacity outboundNetwork) { + Map map = new HashMap<>(); + map.put(DiskCapacity.KEY, disk.getJson()); + map.put(CpuCapacity.KEY, cpu.getJson()); + map.put(InboundNetworkCapacity.KEY, inboundNetwork.toString()); + map.put(OutboundNetworkCapacity.KEY, outboundNetwork.toString()); + return map; + } + } + + /** + * Generate a capacity configuration for cluster. + * + * @return Cruise Control capacity configuration as a String. + */ + @Override + public String toString() { + JsonArray capacityList = new JsonArray(); + for (CapacityEntry capacityEntry : capacityEntries.values()) { + JsonObject capacityJson = new JsonObject(); + capacityEntry.capacity.forEach(capacityJson::put); + + JsonObject capacityEntryJson = new JsonObject() + .put(BROKER_ID_KEY, capacityEntry.id) + .put(CAPACITY_KEY, capacityJson) + .put(DOC_KEY, capacityEntry.doc()); + + capacityList.add(capacityEntryJson); + } + + return new JsonObject().put(CAPACITIES_KEY, capacityList).encodePrettily(); + } + + /** + * @return Capacity entries + */ + public TreeMap getCapacityEntries() { + return capacityEntries; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java index 352b70b3cb3..90012c50fd5 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java @@ -4,16 +4,31 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; import io.strimzi.operator.cluster.model.Quantities; +import io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsUtils; import io.vertx.core.json.JsonObject; +import java.util.Map; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; + /** - * Cruise Control CPU Capacity class + * Cruise Control CPU capacity configuration for broker. */ public class CpuCapacity { + protected static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "CPU"; + private static final String CORES_KEY = "num.cores"; - private final String cores; + private final JsonObject config = new JsonObject(); /** * Constructor @@ -21,24 +36,86 @@ public class CpuCapacity { * @param cores CPU cores configuration */ public CpuCapacity(String cores) { - this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores)); + config.put(CORES_KEY, milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores))); + } + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). + */ + public CpuCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + this(CPU.processResourceCapacity(brokerCapacity, brokerCapacityOverride, resourceRequirements)); } - protected static String milliCpuToCpu(int milliCPU) { + private static String milliCpuToCpu(int milliCPU) { return String.valueOf(milliCPU / 1000.0); } /** - * Returns CpuCapacity object as a JsonObject + * Checks whether all Kafka broker pods have their CPU resource requests equal to their CPU limits. + * + * @param kafkaBrokerResources a map of broker pod names to their {@link ResourceRequirements} + * @return {@code true} if all brokers have matching CPU requests and limits; {@code false} otherwise + */ + public static boolean cpuRequestsMatchLimits(Map kafkaBrokerResources) { + if (kafkaBrokerResources == null) { + return false; + } + for (ResourceRequirements resourceRequirements : kafkaBrokerResources.values()) { + Quantity request = ResourceRequirementsUtils.getCpuRequest(resourceRequirements); + Quantity limit = ResourceRequirementsUtils.getCpuLimit(resourceRequirements); + if (request == null || limit == null || request.compareTo(limit) != 0) { + return false; + } + } + return true; + } + + /** + * Derives the CPU capacity from Kubernetes {@link ResourceRequirements}. + *

+ * This method first attempts to extract the CPU request value from the resource requirements. + * If the request is defined, it is converted from millicores to cores and returned as a {@link String}. + * If no request is present, it falls back to the CPU limit (if available) and performs the same conversion. + * If neither a request nor a limit is specified, {@code null} is returned. + * + * @param resourceRequirements The Kubernetes resource requirements containing CPU requests and/or limits. + * @return The CPU capacity in cores as a {@link String}, or {@code null} if no CPU values are defined. + */ + public static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { + Quantity request = ResourceRequirementsUtils.getCpuRequest(resourceRequirements); + Quantity limit = ResourceRequirementsUtils.getCpuLimit(resourceRequirements); + + if (request != null) { + int milliCpus = Quantities.parseCpuAsMilliCpus(request.toString()); + return CpuCapacity.milliCpuToCpu(milliCpus); + } else if (limit != null) { + int milliCpus = Quantities.parseCpuAsMilliCpus(limit.toString()); + return CpuCapacity.milliCpuToCpu(milliCpus); + } else { + return null; + } + } + + /** + * Returns capacity value as a JsonObject. * - * @return The CpuCapacity object as a JsonObject + * @return The capacity value as a JsonObject. */ public JsonObject getJson() { - return new JsonObject().put(CORES_KEY, this.cores); + return config; } @Override public String toString() { - return this.getJson().toString(); + return config.toString(); } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java index 7f6596656e4..01e82600d9b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java @@ -2,7 +2,6 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ - package io.strimzi.operator.cluster.model.cruisecontrol; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; @@ -12,7 +11,7 @@ import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -65,8 +64,6 @@ public class CruiseControlConfiguration extends AbstractConfiguration { CruiseControlGoals.CPU_CAPACITY_GOAL.toString() ); - private static final String CRUISE_CONTROL_HARD_GOALS = String.join(",", CRUISE_CONTROL_HARD_GOALS_LIST); - protected static final List CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST = List.of( CruiseControlGoals.RACK_AWARENESS_GOAL.toString(), CruiseControlGoals.MIN_TOPIC_LEADERS_PER_BROKER_GOAL.toString(), @@ -81,26 +78,34 @@ public class CruiseControlConfiguration extends AbstractConfiguration { String.join(",", CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST); /** - * Map containing default values for required configuration properties. The map needs to be sorted so that the order + * Generates map containing default values for required configuration properties. The map needs to be sorted so that the order * of the entries in the Cruise Control configuration is deterministic and does not cause unnecessary rolling updates * of Cruise Control deployment. + * + * @param capacityConfiguration The object containing Cruise Control capacity configuration information. + * + * @return Map containing default values for required configuration properties. */ - private static final Map CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP = Collections.unmodifiableSortedMap(new TreeMap<>(Map.ofEntries( - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)), - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"), - Map.entry(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))), - Map.entry(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS), - Map.entry(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS), - Map.entry(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_HARD_GOALS), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)), - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME), - Map.entry(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME) - ))); + public static Map generateCruiseControlDefaultPropertiesMap(CapacityConfiguration capacityConfiguration) { + TreeMap map = new TreeMap<>(); + map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)); + map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"); + map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)); + map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"); + map.put(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))); + map.put(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS); + map.put(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), + String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_GOALS_LIST, capacityConfiguration))); + map.put(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), + String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_HARD_GOALS_LIST, capacityConfiguration))); + map.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)); + map.put(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE); + map.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)); + map.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME); + map.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME); + map.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME); + return map; + } private static final List FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIXES); private static final List FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIX_EXCEPTIONS); @@ -118,10 +123,42 @@ public CruiseControlConfiguration(Reconciliation reconciliation, Iterable getCruiseControlDefaultPropertiesMap() { - return CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP; + /* test */ static List filterResourceGoalsWithoutCapacityConfig(List goalList, + CapacityConfiguration capacityConfiguration) { + List filteredGoalList = new ArrayList<>(goalList); + if (!capacityConfiguration.isInboundNetworkConfigured()) { + filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + } else { + if (!capacityConfiguration.isCapacityHomogeneouslyConfigured(InboundNetworkCapacity.KEY)) { + /* + * The LeaderBytesInDistributionGoal does not account for heterogeneous inbound network capacities. + * See: https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal.java + * Therefore, if the INBOUND_NETWORK capacityConfiguration settings are not homogeneously configured, + * we remove this goal from the list. + */ + filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + } + } + if (!capacityConfiguration.isOutboundNetworkConfigured()) { + filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL.toString()); + } + if (!capacityConfiguration.isCpuConfigured()) { + filteredGoalList.remove(CruiseControlGoals.CPU_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.CPU_USAGE_DISTRIBUTION_GOAL.toString()); + } + + return filteredGoalList; } private boolean isEnabledInConfiguration(String s1, String s2) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java index 56eb91757c5..b4996c6cc6a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java @@ -4,50 +4,128 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.strimzi.api.kafka.model.kafka.EphemeralStorage; +import io.strimzi.api.kafka.model.kafka.JbodStorage; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; +import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.operator.cluster.model.StorageUtils; +import io.strimzi.operator.cluster.model.VolumeUtils; import io.vertx.core.json.JsonObject; import java.util.HashMap; import java.util.Map; /** - * Cruise Control disk capacity + * Cruise Control disk capacity configuration for broker. */ public class DiskCapacity { + protected static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "DISK"; + + private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; + private static final String KAFKA_LOG_DIR = "kafka-log"; private static final String SINGLE_DISK = ""; - private Map map; + + private final Map config; /** * Constructor + * + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. */ - public DiskCapacity() { - map = new HashMap<>(1); + public DiskCapacity(Storage storage, int brokerId) { + if (storage instanceof JbodStorage) { + config = generateJbodDiskConfig(storage, brokerId); + } else { + config = generateNonJbodDiskConfig(storage); + } } - private DiskCapacity(String size) { - this(); - map.put(SINGLE_DISK, size); + /** + * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, + * into the equivalent number of mebibytes represented as a String. + * + * @param size The String representation of the volume size. + * @return The equivalent number of mebibytes. + */ + private static String getSizeInMiB(String size) { + if (size == null) { + return ResourceCapacityType.DISK.getDefaultResourceCapacity(); + } + return String.valueOf(StorageUtils.convertTo(size, "Mi")); } - protected static DiskCapacity of(String size) { - return new DiskCapacity(size); + /** + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. + * + * @return Disk capacity configuration value for broker brokerId. + */ + private Map generateJbodDiskConfig(Storage storage, int brokerId) { + String size = ""; + Map diskConfig = new HashMap<>(); + for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { + String name = VolumeUtils.createVolumePrefix(volume.getId(), true); + String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; + + if (volume instanceof PersistentClaimStorage) { + size = ((PersistentClaimStorage) volume).getSize(); + } else if (volume instanceof EphemeralStorage) { + size = ((EphemeralStorage) volume).getSizeLimit(); + } + diskConfig.put(path, String.valueOf(getSizeInMiB(size))); + } + return diskConfig; } - protected void add(String path, String size) { - if (path == null || SINGLE_DISK.equals(path)) { - throw new IllegalArgumentException("The disk path cannot be null or empty"); + /** + * Generate total disk capacity using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * + * @return Disk capacity configuration value for broker. + */ + private static Map generateNonJbodDiskConfig(Storage storage) { + String size; + if (storage instanceof PersistentClaimStorage) { + size = getSizeInMiB(((PersistentClaimStorage) storage).getSize()); + } else if (storage instanceof EphemeralStorage) { + if (((EphemeralStorage) storage).getSizeLimit() != null) { + size = getSizeInMiB(((EphemeralStorage) storage).getSizeLimit()); + } else { + size = DEFAULT_DISK_CAPACITY_IN_MIB; + } + } else if (storage == null) { + throw new IllegalStateException("The storage declaration is missing"); + } else { + throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); } - map.put(path, size); + return Map.of(SINGLE_DISK, size); } protected Object getJson() { - if (map.size() == 1 && map.containsKey(SINGLE_DISK)) { - return map.get(SINGLE_DISK); + if (config.size() == 1 && config.containsKey(SINGLE_DISK)) { + return config.get(SINGLE_DISK); } else { JsonObject disks = new JsonObject(); - for (Map.Entry e : map.entrySet()) { + for (Map.Entry e : config.entrySet()) { disks.put(e.getKey(), e.getValue()); } return disks; } } + + @Override + public String toString() { + return this.getJson().toString(); + } } \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java new file mode 100644 index 00000000000..93c778ff535 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java @@ -0,0 +1,33 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.INBOUND_NETWORK; + +/** + * Cruise Control inbound network capacity configuration for broker. + */ +public class InboundNetworkCapacity extends NetworkCapacity { + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "NW_IN"; + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + public InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(INBOUND_NETWORK.processResourceCapacity(brokerCapacity, brokerCapacityOverride, null))); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java new file mode 100644 index 00000000000..126562e689a --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java @@ -0,0 +1,36 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.operator.cluster.model.StorageUtils; + +/** + * Cruise Control network capacity configuration for broker. + */ +public class NetworkCapacity { + protected static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s"; + protected String config; + + protected NetworkCapacity(String config) { + this.config = config; + } + + /** + * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, + * into the equivalent number of kibibytes represented as a String. + * + * @param throughput The String representation of the throughput. + * @return The equivalent number of kibibytes. + */ + public static String getThroughputInKiB(String throughput) { + String size = throughput.substring(0, throughput.indexOf("B")); + return String.valueOf(StorageUtils.convertTo(size, "Ki")); + } + + @Override + public String toString() { + return config; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java new file mode 100644 index 00000000000..8b9d2ae5387 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java @@ -0,0 +1,33 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; + +/** + * Cruise Control outbound network capacity configuration for broker. + */ +public class OutboundNetworkCapacity extends NetworkCapacity { + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "NW_OUT"; + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + public OutboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(OUTBOUND_NETWORK.processResourceCapacity(brokerCapacity, brokerCapacityOverride, null))); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java new file mode 100644 index 00000000000..2fef3ec526e --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java @@ -0,0 +1,153 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity.DEFAULT_CPU_CORE_CAPACITY; +import static io.strimzi.operator.cluster.model.cruisecontrol.DiskCapacity.DEFAULT_DISK_CAPACITY_IN_MIB; +import static io.strimzi.operator.cluster.model.cruisecontrol.NetworkCapacity.DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND; + +/** + * Enum representing the various types of resource capacities used in Cruise Control configuration. + * Each enum constant maps to a specific resource type (e.g. CPU, Disk, Inbound Network, and Outbound Network), + * with methods for retrieving configured values from both default broker capacity and broker-specific overrides. + */ +public enum ResourceCapacityType { + /** + * Disk capacity configuration. + *

+ * Disk capacity configuration is handled automatically by Strimzi, there is no need for cluster or broker-specific overrides. + */ + DISK(null, null, DEFAULT_DISK_CAPACITY_IN_MIB), + + /** + * CPU capacity configuration. + */ + CPU(BrokerCapacity::getCpu, BrokerCapacityOverride::getCpu, DEFAULT_CPU_CORE_CAPACITY), + + /** + * Inbound network capacity configuration. + */ + INBOUND_NETWORK(BrokerCapacity::getInboundNetwork, BrokerCapacityOverride::getInboundNetwork, DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND), + + /** + * Outbound network capacity configuration. + */ + OUTBOUND_NETWORK(BrokerCapacity::getOutboundNetwork, BrokerCapacityOverride::getOutboundNetwork, DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND); + + private final Function generalResourceCapacityGetter; + private final Function overriddenResourceCapacityGetter; + private final String defaultResourceCapacity; + + ResourceCapacityType(Function generalResourceCapacityGetter, + Function overriddenResourceCapacityGetter, + String defaultResourceCapacity + ) { + this.generalResourceCapacityGetter = generalResourceCapacityGetter; + this.overriddenResourceCapacityGetter = overriddenResourceCapacityGetter; + this.defaultResourceCapacity = defaultResourceCapacity; + } + + private String getGeneralResourceCapacity(BrokerCapacity brokerCapacity) { + return generalResourceCapacityGetter.apply(brokerCapacity); + } + + private String getOverriddenResourceCapacity(BrokerCapacityOverride override) { + return overriddenResourceCapacityGetter.apply(override); + } + + /** + * @return The default capacity value for given resource. + */ + public String getDefaultResourceCapacity() { + return defaultResourceCapacity; + } + + /** + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + *

+ * The broker-specific capacity override takes top precedence, then general brokerCapacity configuration, + * and then the Kafka resource requests (only for CPU), then the Kafka resource limits (only for CPU), then resource + * default. + * + * For example: + *

    + *
  • (1) The brokerCapacityOverride for a specific broker. + *
  • (2) The general brokerCapacity configuration. + *
  • (3) Kafka resource requests (CPU specific) + *
  • (4) Kafka resource limits (CPU specific) + *
  • (5) The resource default. + *
+ * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). + * + * @return The capacity of resource represented as a String. + */ + public String processResourceCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + if (brokerCapacityOverride != null && getOverriddenResourceCapacity(brokerCapacityOverride) != null) { + return getOverriddenResourceCapacity(brokerCapacityOverride); + } + + if (brokerCapacity != null && getGeneralResourceCapacity(brokerCapacity) != null) { + return getGeneralResourceCapacity(brokerCapacity); + } + + if (this == CPU) { + String cpuBasedOnRequirements = CpuCapacity.getCpuBasedOnRequirements(resourceRequirements); + if (cpuBasedOnRequirements != null) { + return cpuBasedOnRequirements; + } + } + + return getDefaultResourceCapacity(); + } + + /** + * Checks whether this resource type has its capacity properly configured for Cruise Control goals that are + * dependent on that resource usage. + *

+ * The configuration can come from: + *

    + *
  • The default capacity defined in the {@link BrokerCapacity} section of the Cruise Control spec
  • + *
  • Broker-specific overrides, where each override must define a value for this resource type
  • + *
  • For {@link #CPU}, it may also be configured via Kafka custom resource requests and limits. + *
+ * + * @param brokerCapacity The brokerCapacity section of the Cruise Control specification from the Kafka custom resource. + * @param kafkaBrokerResources A map of broker IDs to their Kubernetes resource requirements. + * @return {@code true} if the capacity for this resource type is considered configured, otherwise {@code false.} + */ + public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, Map kafkaBrokerResources) { + if (brokerCapacity != null) { + if (getGeneralResourceCapacity(brokerCapacity) != null) { + return true; + } + + List overrides = brokerCapacity.getOverrides(); + if (overrides != null && overrides.stream().allMatch(o -> getOverriddenResourceCapacity(o) != null)) { + return true; + } + } + + if (this == CPU && CpuCapacity.cpuRequestsMatchLimits(kafkaBrokerResources)) { + return true; + } + + return false; + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index 260fa716f2e..f03399ae94f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -56,9 +56,12 @@ import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; -import io.strimzi.operator.cluster.model.cruisecontrol.BrokerCapacity; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CapacityConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.DiskCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.InboundNetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.NetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.OutboundNetworkCapacity; import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; @@ -66,6 +69,7 @@ import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; +import io.strimzi.operator.common.model.resourcerequirements.ResourceType; import io.strimzi.platform.KubernetesVersion; import io.strimzi.plugin.security.profiles.impl.RestrictedPodSecurityProvider; import io.strimzi.test.TestUtils; @@ -87,6 +91,8 @@ import java.util.Set; import static io.strimzi.operator.cluster.model.CruiseControl.API_HEALTHCHECK_PATH; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters.ANOMALY_DETECTION_CONFIG_KEY; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY; import static java.lang.String.format; @@ -171,17 +177,22 @@ public void testBrokerCapacity() { .endSpec() .build(); Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = + Map.of("brokers", + new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("400m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + Object diskCapacity = brokerCapacity.getValue(DiskCapacity.KEY); + JsonObject cpuCapacity = brokerCapacity.getJsonObject(CpuCapacity.KEY); assertThat(isJBOD(diskCapacity), is(true)); assertThat(cpuCapacity, is(expectedCpuCapacity)); @@ -226,34 +237,38 @@ public void testBrokerCapacityOverrides() { .endSpec() .build(); Map storage = Map.of("brokers", new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = + Map.of("brokers", new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("400m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + Object diskCapacity = brokerCapacity.getValue(DiskCapacity.KEY); assertThat(isJBOD(diskCapacity), is(false)); } - JsonObject brokerEntry0 = brokerEntries.getJsonObject(broker0).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry0.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry0.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry0.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); + JsonObject brokerEntry0 = brokerEntries.getJsonObject(broker0).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry0.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry0.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); + assertThat(brokerEntry0.getString(OutboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(OUTBOUND_NETWORK.getDefaultResourceCapacity()))); // When the same broker id is specified in brokers list of multiple overrides, use the value specified in the first override. - JsonObject brokerEntry1 = brokerEntries.getJsonObject(broker1).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry1.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry1.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry1.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); - - JsonObject brokerEntry2 = brokerEntries.getJsonObject(broker2).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry2.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry2.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); + JsonObject brokerEntry1 = brokerEntries.getJsonObject(broker1).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry1.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry1.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); + assertThat(brokerEntry1.getString(OutboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(OUTBOUND_NETWORK.getDefaultResourceCapacity()))); + + JsonObject brokerEntry2 = brokerEntries.getJsonObject(broker2).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry2.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry2.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); } @ParallelTest @@ -293,16 +308,20 @@ public void testBrokerCapacityGeneratedCpu() { .endSpec() .build(); Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("500m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = Map.of("brokers", + new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("500m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); JsonObject expectedCpuCapacity = new CpuCapacity(userDefinedCpuCapacityOverride0).getJson(); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + JsonObject cpuCapacity = brokerCapacity.getJsonObject(CpuCapacity.KEY); assertThat(cpuCapacity, is(expectedCpuCapacity)); } } @@ -330,56 +349,56 @@ public void testBrokerCapacitiesWithPools() { CruiseControl cc = createCruiseControl(KAFKA, nodes, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); assertThat(brokerEntries.size(), is(6)); // Broker 0 JsonObject brokerEntry = brokerEntries.getJsonObject(0); assertThat(brokerEntry.getInteger("brokerId"), is(0)); - JsonObject brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - JsonObject brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + JsonObject brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log0"), is("102400.0")); // Broker 1 brokerEntry = brokerEntries.getJsonObject(1); assertThat(brokerEntry.getInteger("brokerId"), is(1)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log1"), is("102400.0")); // Broker 2 brokerEntry = brokerEntries.getJsonObject(2); assertThat(brokerEntry.getInteger("brokerId"), is(2)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log2"), is("102400.0")); // Broker 10 brokerEntry = brokerEntries.getJsonObject(3); assertThat(brokerEntry.getInteger("brokerId"), is(10)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log10"), is("1048576.0")); // Broker 11 brokerEntry = brokerEntries.getJsonObject(4); assertThat(brokerEntry.getInteger("brokerId"), is(11)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log11"), is("1048576.0")); // Broker 12 brokerEntry = brokerEntries.getJsonObject(5); assertThat(brokerEntry.getInteger("brokerId"), is(12)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log12"), is("1048576.0")); } @@ -791,7 +810,7 @@ public void testCpuCapacityGeneration() { the CPU capacity will be set to DEFAULT_CPU_CORE_CAPACITY */ resources = Map.of("brokers", new ResourceRequirementsBuilder().build()); - verifyBrokerCapacity(storage, resources, brokerCapacityThree, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); + verifyBrokerCapacity(storage, resources, brokerCapacityThree, CPU.getDefaultResourceCapacity(), CPU.getDefaultResourceCapacity(), CPU.getDefaultResourceCapacity()); } private void verifyBrokerCapacity(Map storage, @@ -812,11 +831,11 @@ private void verifyBrokerCapacity(Map storage, ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); - assertThat(brokerEntries.getJsonObject(0).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerOneCpuValue))); - assertThat(brokerEntries.getJsonObject(1).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerTwoCpuValue))); - assertThat(brokerEntries.getJsonObject(2).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerThreeCpuValue))); + assertThat(brokerEntries.getJsonObject(0).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerOneCpuValue))); + assertThat(brokerEntries.getJsonObject(1).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerTwoCpuValue))); + assertThat(brokerEntries.getJsonObject(2).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerThreeCpuValue))); } @ParallelTest diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java new file mode 100644 index 00000000000..6dfb028b607 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java @@ -0,0 +1,213 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityBuilder; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverrideBuilder; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals; +import io.strimzi.operator.common.model.resourcerequirements.ResourceType; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.filterResourceGoalsWithoutCapacityConfig; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.CPU_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.CPU_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class CruiseControlConfigurationTest { + + private static final String DEFAULT_CPU_CAPACITY = "1000m"; + private static final String DEFAULT_NETWORK_CAPACITY = "10000KiB/s"; + private static final Set NODES = Set.of( + new NodeRef("my-cluster-brokers-0", 0, "brokers", false, true), + new NodeRef("my-cluster-brokers-1", 1, "brokers", false, true), + new NodeRef("my-cluster-brokers-2", 2, "brokers", false, true) + ); + private static final Map STORAGE = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); + private static final List GOAL_LIST = List.of( + NETWORK_INBOUND_CAPACITY_GOAL.toString(), + NETWORK_OUTBOUND_CAPACITY_GOAL.toString(), + CPU_CAPACITY_GOAL.toString(), + POTENTIAL_NETWORK_OUTAGE_GOAL.toString(), + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString(), + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString(), + CPU_USAGE_DISTRIBUTION_GOAL.toString(), + LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + + private static KafkaSpec createKafkaSpec(BrokerCapacity brokerCapacity) { + return new KafkaBuilder() + .withNewSpec() + .withNewCruiseControl() + .withBrokerCapacity(brokerCapacity) + .endCruiseControl() + .endSpec() + .build() + .getSpec(); + } + + private static BrokerCapacity createBrokerCapacity(String cpuCapacity, + String inboundNetworkCapacity, + String outboundNetworkCapacity) { + return new BrokerCapacityBuilder() + .withCpu(cpuCapacity) + .withInboundNetwork(inboundNetworkCapacity) + .withOutboundNetwork(outboundNetworkCapacity) + .build(); + } + + private static BrokerCapacityOverride createBrokerCapacityOverride(List brokerIds, + String cpuCapacity, + String inboundNetworkCapacity, + String outboundNetworkCapacity) { + return new BrokerCapacityOverrideBuilder() + .addAllToBrokers(brokerIds) + .withCpu(cpuCapacity) + .withInboundNetwork(inboundNetworkCapacity) + .withOutboundNetwork(outboundNetworkCapacity) + .build(); + } + + private static ResourceRequirements createCpuResourceRequirement(String request, String limit) { + return new ResourceRequirementsBuilder() + .addToRequests(ResourceType.CPU.value(), new Quantity(request)) + .addToLimits(ResourceType.CPU.value(), new Quantity(limit)) + .build(); + } + + private void assertGoalsPresent(List actual, CruiseControlGoals... expected) { + for (CruiseControlGoals goal : expected) { + assertThat("Expected goal to be present: " + goal, actual.contains(goal.toString()), is(true)); + } + } + + private void assertGoalsAbsent(List actual, CruiseControlGoals... expected) { + for (CruiseControlGoals goal : expected) { + assertThat("Expected goal to be absent: " + goal, actual.contains(goal.toString()), is(false)); + } + } + + @Test + public void testFilterResourceGoalsWithoutCapacityConfig() { + // The capacities of resources are all properly configured - no goals should be filtered. + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = null; + Map kafkaBrokerResources = Map.of("pool1", new ResourceRequirementsBuilder().build()); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertThat(filteredGoals, containsInAnyOrder(GOAL_LIST.toArray())); + + // Default CPU capacity not specified; Filter CPU goals. + bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + assertGoalsPresent(filteredGoals, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // CPU resource limits == requests; Don't filter CPU goals. + kafkaBrokerResources = Map.of("pool1", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // CPU resource limits != requests; Filter CPU goals. + kafkaBrokerResources = Map.of("pool1", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + assertGoalsPresent(filteredGoals, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // CPU override capacities provided; Don't filter CPU goals. + bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, null, null); + bc.setOverrides(List.of(bco)); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // Default inbound network capacity not specified; Filter inbound network related goals. + bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, null, DEFAULT_NETWORK_CAPACITY); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + assertGoalsPresent(filteredGoals, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, + POTENTIAL_NETWORK_OUTAGE_GOAL, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + + // Inbound network override capacities not homogenous; Filter LeaderBytesInDistributionGoa goal. + bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco0 = createBrokerCapacityOverride(List.of(0), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco1 = createBrokerCapacityOverride(List.of(1), DEFAULT_CPU_CAPACITY, "5000KiB/s", DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco0, bco1)); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + assertGoalsPresent(filteredGoals, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // Homogeneous inbound network override capacities provided; Don't filter inbound network goals. + bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco)); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, LEADER_BYTES_IN_DISTRIBUTION_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + + // Default outbound network capacity not specified; Filter outbound network related goals. + bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, null); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + assertGoalsPresent(filteredGoals, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + + // Outbound network override capacities provided; Don't filter outbound network goals. + bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco)); + capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, + CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java index ed808f0ca24..25cb3c1eac7 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java @@ -244,8 +244,8 @@ public void reconcileEnabledCruiseControl(boolean topicOperatorEnabled, boolean assertThat(deployCaptor.getValue(), is(notNullValue())); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION), is("0")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION), is("0")); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("67b9cda0")); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("3a5e63e7")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("e104ddb6")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("e8a2dfec")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_SERVER_CERT_HASH), is("4d715cdd")); if (topicOperatorEnabled && apiUsersEnabled) { assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_AUTH_HASH), is("8c2972b2")); diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java new file mode 100644 index 00000000000..bcc1451481c --- /dev/null +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java @@ -0,0 +1,30 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.common.model.resourcerequirements; + +/** + * Enum representing the types of resource requirements used in Kubernetes container specifications. + * + *

In Kubernetes, containers can specify both requests and limits for compute resources + * such as CPU and memory. These values influence scheduling and runtime resource enforcement:

+ * + *
    + *
  • {@link #REQUEST} - The minimum amount of a resource that the container is guaranteed.
  • + *
  • {@link #LIMIT} - The maximum amount of a resource that the container is allowed to use.
  • + *
+ * + * @see Kubernetes: Managing Resources for Containers + */ +public enum ResourceRequirementsType { + /** + * Represents the resource request value. + */ + REQUEST, + + /** + * Represents the resource limit value. + */ + LIMIT; +} diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java new file mode 100644 index 00000000000..c9f00b01ecd --- /dev/null +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.common.model.resourcerequirements; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; + +import java.util.Map; + +import static io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsType.LIMIT; +import static io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsType.REQUEST; +import static io.strimzi.operator.common.model.resourcerequirements.ResourceType.CPU; +import static io.strimzi.operator.common.model.resourcerequirements.ResourceType.MEMORY; + +/** + * Utility methods for working with the resources section of custom resources. + */ +public class ResourceRequirementsUtils { + + private static Quantity getQuantity(ResourceRequirements resources, + ResourceRequirementsType requirementType, + ResourceType resourceType) { + if (resources == null || resourceType == null) { + return null; + } + + Map resourceRequirement = switch (requirementType) { + case REQUEST -> resources.getRequests(); + case LIMIT -> resources.getLimits(); + }; + + return resourceRequirement != null ? resourceRequirement.get(resourceType.value()) : null; + } + + /** + * Retrieves the CPU resource request quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the requested CPU quantity, or {@code null} if not specified + */ + public static Quantity getCpuRequest(ResourceRequirements resources) { + return getQuantity(resources, REQUEST, CPU); + } + + /** + * Retrieves the CPU resource limit quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the CPU limit quantity, or {@code null} if not specified + */ + public static Quantity getCpuLimit(ResourceRequirements resources) { + return getQuantity(resources, LIMIT, CPU); + } + + /** + * Retrieves the memory resource request quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the requested memory quantity, or {@code null} if not specified + */ + public static Quantity getMemoryRequest(ResourceRequirements resources) { + return getQuantity(resources, REQUEST, MEMORY); + } + + /** + * Retrieves the memory resource limit quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the memory limit quantity, or {@code null} if not specified + */ + public static Quantity getMemoryLimit(ResourceRequirements resources) { + return getQuantity(resources, LIMIT, MEMORY); + } +} diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java new file mode 100644 index 00000000000..124f06a2816 --- /dev/null +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java @@ -0,0 +1,35 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.common.model.resourcerequirements; + +/** + * Enum representing Kubernetes resource types used in container resource specifications. + */ +public enum ResourceType { + /** + * Represents the memory resource type (e.g., "memory"). + */ + MEMORY("memory"), + + /** + * Represents the CPU resource type (e.g., "cpu"). + */ + CPU("cpu"); + + private final String value; + + ResourceType(String value) { + this.value = value; + } + + /** + * Returns the string value of the resource type. + * + * @return the resource type name as used in Kubernetes, e.g., "cpu" or "memory" + */ + public String value() { + return value; + } +} From 510a8076a04a92a94cb7e0d803d63181ab066c79 Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Thu, 12 Jun 2025 17:37:14 -0400 Subject: [PATCH 2/4] Addressing feedback - js Signed-off-by: Kyle Liberti --- .../operator/cluster/model/CruiseControl.java | 6 +- .../cruisecontrol/CapacityConfiguration.java | 20 +-- .../model/cruisecontrol/CpuCapacity.java | 50 ------- .../cruisecontrol/ResourceCapacityType.java | 22 ++- .../ResourceRequirementsUtils.java | 137 ++++++++++++++++++ .../cluster/model/CruiseControlTest.java | 2 +- .../CruiseControlConfigurationTest.java | 2 +- .../assembly/CruiseControlReconcilerTest.java | 2 +- .../ResourceRequirementsType.java | 30 ---- .../ResourceRequirementsUtils.java | 76 ---------- .../resourcerequirements/ResourceType.java | 35 ----- 11 files changed, 171 insertions(+), 211 deletions(-) create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java delete mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java delete mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java delete mode 100644 operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 0476537b8dd..b70debfa7f3 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -199,7 +199,7 @@ public static CruiseControl fromCrd( KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); result.capacityConfiguration = new CapacityConfiguration(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); - result.updateConfigurationWithDefaults(ccSpec, result.capacityConfiguration, kafkaConfiguration); + result.updateConfigurationWithDefaults(ccSpec, kafkaConfiguration); CruiseControlConfiguration ccConfiguration = result.configuration; result.sslEnabled = ccConfiguration.isApiSslEnabled(); @@ -244,9 +244,7 @@ public static CruiseControl fromCrd( } } - private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, - CapacityConfiguration capacityConfiguration, - KafkaConfiguration kafkaConfiguration) { + private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) { Map defaultCruiseControlProperties = generateCruiseControlDefaultPropertiesMap(capacityConfiguration); if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) { defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR)); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java index c33a4a68b6d..236e5e3914e 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java @@ -27,8 +27,8 @@ import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; /** - * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for - * Cruise Control's Broker Capacity File Resolver. + * Uses information in a `Kafka` and `KafkaNodePool` custom resources to generate a capacity configuration file to + * be used for Cruise Control's Broker Capacity File Resolver. * * * For example, it takes a Kafka Custom Resource like the following: @@ -144,11 +144,11 @@ public class CapacityConfiguration { /** * Constructor * - * @param reconciliation Reconciliation marker - * @param spec Spec of the Kafka custom resource - * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster - * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools - * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools + * @param reconciliation Reconciliation marker. + * @param spec Spec of the Kafka custom resource. + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster. + * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools. + * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools. */ public CapacityConfiguration( Reconciliation reconciliation, @@ -208,9 +208,9 @@ private void generateCapacityEntries(CruiseControlSpec spec, CapacityEntry capacityEntry = new CapacityEntry(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); capacityEntries.put(node.nodeId(), capacityEntry); } - isCpuConfigured = CPU.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); - isInboundNetworkConfigured = INBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); - isOutboundNetworkConfigured = OUTBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerResources); + isCpuConfigured = CPU.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); + isInboundNetworkConfigured = INBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); + isOutboundNetworkConfigured = OUTBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); } /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java index 90012c50fd5..5a853853a2c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java @@ -4,16 +4,12 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; -import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; import io.strimzi.operator.cluster.model.Quantities; -import io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsUtils; import io.vertx.core.json.JsonObject; -import java.util.Map; - import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; /** @@ -59,52 +55,6 @@ private static String milliCpuToCpu(int milliCPU) { return String.valueOf(milliCPU / 1000.0); } - /** - * Checks whether all Kafka broker pods have their CPU resource requests equal to their CPU limits. - * - * @param kafkaBrokerResources a map of broker pod names to their {@link ResourceRequirements} - * @return {@code true} if all brokers have matching CPU requests and limits; {@code false} otherwise - */ - public static boolean cpuRequestsMatchLimits(Map kafkaBrokerResources) { - if (kafkaBrokerResources == null) { - return false; - } - for (ResourceRequirements resourceRequirements : kafkaBrokerResources.values()) { - Quantity request = ResourceRequirementsUtils.getCpuRequest(resourceRequirements); - Quantity limit = ResourceRequirementsUtils.getCpuLimit(resourceRequirements); - if (request == null || limit == null || request.compareTo(limit) != 0) { - return false; - } - } - return true; - } - - /** - * Derives the CPU capacity from Kubernetes {@link ResourceRequirements}. - *

- * This method first attempts to extract the CPU request value from the resource requirements. - * If the request is defined, it is converted from millicores to cores and returned as a {@link String}. - * If no request is present, it falls back to the CPU limit (if available) and performs the same conversion. - * If neither a request nor a limit is specified, {@code null} is returned. - * - * @param resourceRequirements The Kubernetes resource requirements containing CPU requests and/or limits. - * @return The CPU capacity in cores as a {@link String}, or {@code null} if no CPU values are defined. - */ - public static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { - Quantity request = ResourceRequirementsUtils.getCpuRequest(resourceRequirements); - Quantity limit = ResourceRequirementsUtils.getCpuLimit(resourceRequirements); - - if (request != null) { - int milliCpus = Quantities.parseCpuAsMilliCpus(request.toString()); - return CpuCapacity.milliCpuToCpu(milliCpus); - } else if (limit != null) { - int milliCpus = Quantities.parseCpuAsMilliCpus(limit.toString()); - return CpuCapacity.milliCpuToCpu(milliCpus); - } else { - return null; - } - } - /** * Returns capacity value as a JsonObject. * diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java index 2fef3ec526e..0a3123cd4af 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java @@ -7,9 +7,11 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.operator.cluster.model.NodeRef; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity.DEFAULT_CPU_CORE_CAPACITY; @@ -108,7 +110,7 @@ public String processResourceCapacity(BrokerCapacity brokerCapacity, } if (this == CPU) { - String cpuBasedOnRequirements = CpuCapacity.getCpuBasedOnRequirements(resourceRequirements); + String cpuBasedOnRequirements = ResourceRequirementsUtils.getCpuBasedOnRequirements(resourceRequirements); if (cpuBasedOnRequirements != null) { return cpuBasedOnRequirements; } @@ -129,10 +131,13 @@ public String processResourceCapacity(BrokerCapacity brokerCapacity, * * * @param brokerCapacity The brokerCapacity section of the Cruise Control specification from the Kafka custom resource. + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster. * @param kafkaBrokerResources A map of broker IDs to their Kubernetes resource requirements. * @return {@code true} if the capacity for this resource type is considered configured, otherwise {@code false.} */ - public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, Map kafkaBrokerResources) { + public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, + Set kafkaBrokerNodes, + Map kafkaBrokerResources) { if (brokerCapacity != null) { if (getGeneralResourceCapacity(brokerCapacity) != null) { return true; @@ -144,7 +149,18 @@ public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, Map kafkaBrokerResources.containsKey(node.poolName())) + && ResourceRequirementsUtils.cpuRequestsMatchLimits(kafkaBrokerResources)) { return true; } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java new file mode 100644 index 00000000000..08f9a0344e1 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java @@ -0,0 +1,137 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; + +import java.util.Map; + +/** + * Utility methods for working with resource request and limit values from the resource requirements section + * of Strimzi custom resources. Used for evaluating whether Cruise Control capacity settings for CPU are properly + * configured. + */ +public class ResourceRequirementsUtils { + + /** + * Enum representing resource types used in the resource requirements section of Strimzi custom resources. + */ + public enum ResourceType { + /** + * Represents the memory resource type (e.g., "memory"). + */ + MEMORY("memory"), + + /** + * Represents the CPU resource type (e.g., "cpu"). + */ + CPU("cpu"); + + private final String value; + + ResourceType(String value) { + this.value = value; + } + + /** + * Returns the string value of the resource type. + * + * @return the resource type name. + */ + public String value() { + return value; + } + } + + /** + * Enum representing resource requirement types used in the resource requirements section Strimzi custom resources. + */ + private enum ResourceRequirementsType { + /** + * Represents the resource request value. + */ + REQUEST, + + /** + * Represents the resource limit value. + */ + LIMIT; + } + + private static Quantity getQuantity(ResourceRequirements resources, + ResourceRequirementsType requirementType, + ResourceRequirementsUtils.ResourceType resourceType) { + if (resources == null || resourceType == null) { + return null; + } + + Map resourceRequirement = switch (requirementType) { + case REQUEST -> resources.getRequests(); + case LIMIT -> resources.getLimits(); + }; + + return resourceRequirement != null ? resourceRequirement.get(resourceType.value()) : null; + } + + /** + * Retrieves the CPU resource request quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the requested CPU quantity, or {@code null} if not specified + */ + private static Quantity getCpuRequest(ResourceRequirements resources) { + return getQuantity(resources, ResourceRequirementsType.REQUEST, ResourceType.CPU); + } + + /** + * Retrieves the CPU resource limit quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the CPU limit quantity, or {@code null} if not specified + */ + private static Quantity getCpuLimit(ResourceRequirements resources) { + return getQuantity(resources, ResourceRequirementsType.LIMIT, ResourceType.CPU); + } + + /** + * Checks whether all Kafka broker pods have their CPU resource requests equal to their CPU limits. + * + * @param kafkaBrokerResources a map of broker pod names to their {@link ResourceRequirements} + * @return {@code true} if all brokers have matching CPU requests and limits; {@code false} otherwise + */ + protected static boolean cpuRequestsMatchLimits(Map kafkaBrokerResources) { + if (kafkaBrokerResources == null) { + return false; + } + for (ResourceRequirements resourceRequirements : kafkaBrokerResources.values()) { + Quantity request = getCpuRequest(resourceRequirements); + Quantity limit = getCpuLimit(resourceRequirements); + if (request == null || limit == null || request.compareTo(limit) != 0) { + return false; + } + } + return true; + } + + /** + * Derives the CPU capacity from the resource requirements section of Strimzi custom resource. + * + * @param resourceRequirements The Strimzi custom resource requirements containing CPU requests and/or limits. + * @return The CPU capacity as a {@link String}, or {@code null} if no CPU values are defined. + */ + protected static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { + Quantity request = getCpuRequest(resourceRequirements); + Quantity limit = getCpuLimit(resourceRequirements); + + if (request != null) { + return request.toString(); + } else if (limit != null) { + return limit.toString(); + } else { + return null; + } + } +} \ No newline at end of file diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index f03399ae94f..51536003b71 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -62,6 +62,7 @@ import io.strimzi.operator.cluster.model.cruisecontrol.InboundNetworkCapacity; import io.strimzi.operator.cluster.model.cruisecontrol.NetworkCapacity; import io.strimzi.operator.cluster.model.cruisecontrol.OutboundNetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.ResourceRequirementsUtils.ResourceType; import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; @@ -69,7 +70,6 @@ import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; -import io.strimzi.operator.common.model.resourcerequirements.ResourceType; import io.strimzi.platform.KubernetesVersion; import io.strimzi.plugin.security.profiles.impl.RestrictedPodSecurityProvider; import io.strimzi.test.TestUtils; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java index 6dfb028b607..8adacb88262 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java @@ -17,8 +17,8 @@ import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverrideBuilder; import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.cluster.model.cruisecontrol.ResourceRequirementsUtils.ResourceType; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals; -import io.strimzi.operator.common.model.resourcerequirements.ResourceType; import org.junit.jupiter.api.Test; import java.util.List; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java index 25cb3c1eac7..954989b6f55 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java @@ -244,7 +244,7 @@ public void reconcileEnabledCruiseControl(boolean topicOperatorEnabled, boolean assertThat(deployCaptor.getValue(), is(notNullValue())); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION), is("0")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION), is("0")); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("e104ddb6")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("8943477b")); assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("e8a2dfec")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_SERVER_CERT_HASH), is("4d715cdd")); if (topicOperatorEnabled && apiUsersEnabled) { diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java deleted file mode 100644 index bcc1451481c..00000000000 --- a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsType.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.common.model.resourcerequirements; - -/** - * Enum representing the types of resource requirements used in Kubernetes container specifications. - * - *

In Kubernetes, containers can specify both requests and limits for compute resources - * such as CPU and memory. These values influence scheduling and runtime resource enforcement:

- * - *
    - *
  • {@link #REQUEST} - The minimum amount of a resource that the container is guaranteed.
  • - *
  • {@link #LIMIT} - The maximum amount of a resource that the container is allowed to use.
  • - *
- * - * @see Kubernetes: Managing Resources for Containers - */ -public enum ResourceRequirementsType { - /** - * Represents the resource request value. - */ - REQUEST, - - /** - * Represents the resource limit value. - */ - LIMIT; -} diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java deleted file mode 100644 index c9f00b01ecd..00000000000 --- a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceRequirementsUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.common.model.resourcerequirements; - -import io.fabric8.kubernetes.api.model.Quantity; -import io.fabric8.kubernetes.api.model.ResourceRequirements; - -import java.util.Map; - -import static io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsType.LIMIT; -import static io.strimzi.operator.common.model.resourcerequirements.ResourceRequirementsType.REQUEST; -import static io.strimzi.operator.common.model.resourcerequirements.ResourceType.CPU; -import static io.strimzi.operator.common.model.resourcerequirements.ResourceType.MEMORY; - -/** - * Utility methods for working with the resources section of custom resources. - */ -public class ResourceRequirementsUtils { - - private static Quantity getQuantity(ResourceRequirements resources, - ResourceRequirementsType requirementType, - ResourceType resourceType) { - if (resources == null || resourceType == null) { - return null; - } - - Map resourceRequirement = switch (requirementType) { - case REQUEST -> resources.getRequests(); - case LIMIT -> resources.getLimits(); - }; - - return resourceRequirement != null ? resourceRequirement.get(resourceType.value()) : null; - } - - /** - * Retrieves the CPU resource request quantity from the given resource requirements. - * - * @param resources the resource requirements containing CPU and memory requests and limits - * @return the requested CPU quantity, or {@code null} if not specified - */ - public static Quantity getCpuRequest(ResourceRequirements resources) { - return getQuantity(resources, REQUEST, CPU); - } - - /** - * Retrieves the CPU resource limit quantity from the given resource requirements. - * - * @param resources the resource requirements containing CPU and memory requests and limits - * @return the CPU limit quantity, or {@code null} if not specified - */ - public static Quantity getCpuLimit(ResourceRequirements resources) { - return getQuantity(resources, LIMIT, CPU); - } - - /** - * Retrieves the memory resource request quantity from the given resource requirements. - * - * @param resources the resource requirements containing CPU and memory requests and limits - * @return the requested memory quantity, or {@code null} if not specified - */ - public static Quantity getMemoryRequest(ResourceRequirements resources) { - return getQuantity(resources, REQUEST, MEMORY); - } - - /** - * Retrieves the memory resource limit quantity from the given resource requirements. - * - * @param resources the resource requirements containing CPU and memory requests and limits - * @return the memory limit quantity, or {@code null} if not specified - */ - public static Quantity getMemoryLimit(ResourceRequirements resources) { - return getQuantity(resources, LIMIT, MEMORY); - } -} diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java b/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java deleted file mode 100644 index 124f06a2816..00000000000 --- a/operator-common/src/main/java/io/strimzi/operator/common/model/resourcerequirements/ResourceType.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.common.model.resourcerequirements; - -/** - * Enum representing Kubernetes resource types used in container resource specifications. - */ -public enum ResourceType { - /** - * Represents the memory resource type (e.g., "memory"). - */ - MEMORY("memory"), - - /** - * Represents the CPU resource type (e.g., "cpu"). - */ - CPU("cpu"); - - private final String value; - - ResourceType(String value) { - this.value = value; - } - - /** - * Returns the string value of the resource type. - * - * @return the resource type name as used in Kubernetes, e.g., "cpu" or "memory" - */ - public String value() { - return value; - } -} From ac6af2f4d530646b73aab0f76785e5aa760e63e9 Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Fri, 13 Jun 2025 12:09:46 -0400 Subject: [PATCH 3/4] Addressing feedback - ts Signed-off-by: Kyle Liberti --- .../CruiseControlConfigurationTest.java | 164 +++++++++--------- 1 file changed, 85 insertions(+), 79 deletions(-) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java index 8adacb88262..027cbfd07fa 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java @@ -111,103 +111,109 @@ private void assertGoalsAbsent(List actual, CruiseControlGoals... expect } @Test - public void testFilterResourceGoalsWithoutCapacityConfig() { - // The capacities of resources are all properly configured - no goals should be filtered. + public void shouldNotFilterGoalsWhenAllCapacitiesAreConfigured() { BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); - BrokerCapacityOverride bco = null; - Map kafkaBrokerResources = Map.of("pool1", new ResourceRequirementsBuilder().build()); + Map kafkaBrokerResources = Map.of("brokers", new ResourceRequirementsBuilder().build()); CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); assertThat(filteredGoals, containsInAnyOrder(GOAL_LIST.toArray())); + } - // Default CPU capacity not specified; Filter CPU goals. - bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + @Test + public void shouldFilterCpuGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", new ResourceRequirementsBuilder().build()); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); - assertGoalsPresent(filteredGoals, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); - - // CPU resource limits == requests; Don't filter CPU goals. - kafkaBrokerResources = Map.of("pool1", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); - assertGoalsPresent(filteredGoals, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); - - // CPU resource limits != requests; Filter CPU goals. - kafkaBrokerResources = Map.of("pool1", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + } + + @Test + public void shouldNotFilterCpuGoalsWhenRequestsEqualLimits() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterCpuGoalsWhenRequestsDoNotEqualLimits() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); - assertGoalsPresent(filteredGoals, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } - // CPU override capacities provided; Don't filter CPU goals. - bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, null, null); + @Test + public void shouldNotFilterCpuGoalsWhenOverridesAreDefined() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, null, null); bc.setOverrides(List.of(bco)); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); - assertGoalsPresent(filteredGoals, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); - - // Default inbound network capacity not specified; Filter inbound network related goals. - bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, null, DEFAULT_NETWORK_CAPACITY); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterInboundNetworkGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, null, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); assertGoalsAbsent(filteredGoals, NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, LEADER_BYTES_IN_DISTRIBUTION_GOAL); - assertGoalsPresent(filteredGoals, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, - POTENTIAL_NETWORK_OUTAGE_GOAL, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } - // Inbound network override capacities not homogenous; Filter LeaderBytesInDistributionGoa goal. - bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + @Test + public void shouldFilterLeaderBytesInGoalWhenOverridesAreHeterogeneous() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); BrokerCapacityOverride bco0 = createBrokerCapacityOverride(List.of(0), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); BrokerCapacityOverride bco1 = createBrokerCapacityOverride(List.of(1), DEFAULT_CPU_CAPACITY, "5000KiB/s", DEFAULT_NETWORK_CAPACITY); bc.setOverrides(List.of(bco0, bco1)); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); assertGoalsAbsent(filteredGoals, LEADER_BYTES_IN_DISTRIBUTION_GOAL); - assertGoalsPresent(filteredGoals, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } - // Homogeneous inbound network override capacities provided; Don't filter inbound network goals. - bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + @Test + public void shouldNotFilterInboundNetworkGoalsWhenOverridesAreHomogeneous() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); bc.setOverrides(List.of(bco)); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); - assertGoalsPresent(filteredGoals, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, LEADER_BYTES_IN_DISTRIBUTION_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); - - // Default outbound network capacity not specified; Filter outbound network related goals. - bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, null); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); - assertGoalsAbsent(filteredGoals, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); - assertGoalsPresent(filteredGoals, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); - - // Outbound network override capacities provided; Don't filter outbound network goals. - bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterOutboundNetworkGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, null); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsAbsent(filteredGoals, NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } + + @Test + public void shouldNotFilterOutboundNetworkGoalsWhenOverridesAreDefined() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); bc.setOverrides(List.of(bco)); - capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); - filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); - assertGoalsPresent(filteredGoals, - CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL, - NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, - NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); } } From 582c6bd752e7a44145c6769ce44e48fb5ebb185b Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Mon, 16 Jun 2025 11:45:19 -0400 Subject: [PATCH 4/4] Addressing feedback - js Signed-off-by: Kyle Liberti --- CHANGELOG.md | 2 +- .../model/cruisecontrol/ResourceCapacityType.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f3aad63dd0..10e2859f0d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,6 @@ ## 0.47.0 -* Disable CC resource goals when resource capacities are not set. * Adding progress tracking for Cruise Control rebalances * Add support for Kafka 3.9.1 * Fixed MirrorMaker 2 client rack init container override being ignored. @@ -10,6 +9,7 @@ ### Major changes, deprecations and removals +* Disable CC resource goals when resource capacities are not set. * Strimzi 0.47.0 (and any of its patch releases) is the last Strimzi version with support for Kubernetes 1.25 and 1.26. From Strimzi 0.48.0 on, we will support only Kubernetes 1.27 and newer. diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java index 0a3123cd4af..3fd09ad7542 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java @@ -155,8 +155,15 @@ public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, * (1) The resource requirements are defined for every broker in the cluster. * (2) The CPU requests == limits for every broker in the cluster. * - * This ensures that each broker's CPU capacity has been explicitly defined as a guaranteed resource, - * enabling more accurate rebalancing decisions based on CPU availability. + * Strimzi Cruise Control uses the CPU request value as the broker's capacity since it's the guaranteed CPU allocation. + * However, if the CPU limit is higher than the request, the broker may use more CPU than its configured capacity under load. + * This can lead Cruise Control to make poor balancing decisions. For example, a broker with 1000m request + * and 2000m limit could temporarily operate at double its defined CPU capacity. In this case, Cruise Control, + * thinking 1000m is 100% of the broker's CPU capacity, could shift double that CPU load to another broker - + * effectively transferring double the expected load and potentially overloading the target broker. + * + * Requiring request == limit ensures predictable, enforceable CPU limits across all brokers, resulting in more + * accurate rebalancing decisions. */ if (this == CPU && kafkaBrokerNodes.stream().allMatch(node -> kafkaBrokerResources.containsKey(node.poolName()))