diff --git a/CHANGELOG.md b/CHANGELOG.md index c8b3c27959e..9f3aad63dd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.47.0 +* Disable CC resource goals when resource capacities are not set. * Adding progress tracking for Cruise Control rebalances * Add support for Kafka 3.9.1 * Fixed MirrorMaker 2 client rack init container override being ignored. diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 8b965894d9c..b70debfa7f3 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -35,7 +35,7 @@ import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlTemplate; import io.strimzi.certs.CertAndKey; import io.strimzi.operator.cluster.ClusterOperatorConfig; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CapacityConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.HashLoginServiceApiCredentials; import io.strimzi.operator.cluster.model.logging.LoggingModel; @@ -65,6 +65,7 @@ import static io.strimzi.api.kafka.model.common.template.DeploymentStrategy.ROLLING_UPDATE; import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS; import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_GOALS; +import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.generateCruiseControlDefaultPropertiesMap; import static java.lang.String.format; /** @@ -111,7 +112,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup private boolean authEnabled; private HashLoginServiceApiCredentials apiCredentials; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method - protected Capacity capacity; + protected CapacityConfiguration capacityConfiguration; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method private MetricsModel metrics; private LoggingModel logging; @@ -197,6 +198,7 @@ public static CruiseControl fromCrd( result.image = image; KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); + result.capacityConfiguration = new CapacityConfiguration(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); result.updateConfigurationWithDefaults(ccSpec, kafkaConfiguration); CruiseControlConfiguration ccConfiguration = result.configuration; @@ -207,7 +209,6 @@ public static CruiseControl fromCrd( // To avoid illegal storage configurations provided by the user, // we rely on the storage configuration provided by the KafkaAssemblyOperator - result.capacity = new Capacity(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.gcLoggingEnabled = ccSpec.getJvmOptions() == null ? JvmOptions.DEFAULT_GC_LOGGING_ENABLED : ccSpec.getJvmOptions().isGcLoggingEnabled(); @@ -244,7 +245,7 @@ public static CruiseControl fromCrd( } private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) { - Map defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap()); + Map defaultCruiseControlProperties = generateCruiseControlDefaultPropertiesMap(capacityConfiguration); if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) { defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR)); } @@ -530,7 +531,7 @@ public LoggingModel logging() { public ConfigMap generateConfigMap(MetricsAndLogging metricsAndLogging) { Map configMapData = new HashMap<>(); configMapData.put(SERVER_CONFIG_FILENAME, configuration.asOrderedProperties().asPairs()); - configMapData.put(CAPACITY_CONFIG_FILENAME, capacity.toString()); + configMapData.put(CAPACITY_CONFIG_FILENAME, capacityConfiguration.toString()); configMapData.putAll(ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)); return ConfigMapUtils diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java deleted file mode 100644 index ff45addd045..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -/** - * Configures the Kafka broker capacity - */ -public class BrokerCapacity { - // CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry. - // CC designates the id of this default broker entry as "-1". - /** - * Default broker ID - */ - public static final int DEFAULT_BROKER_ID = -1; - - /** - * Default outbound network capacity - */ - public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB."; - /** - * Default cpu core capacity - */ - public static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; - protected static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; - protected static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private int id; - private CpuCapacity cpu; - private DiskCapacity disk; - private String inboundNetwork; - private String outboundNetwork; - private final String doc; - - /** - * Constructor - * - * @param brokerId ID of the broker - * @param cpu CPU capacity - * @param disk Disk capacity - * @param inboundNetwork Inbound network capacity - * @param outboundNetwork Outbound network capacity - */ - public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) { - this.id = brokerId; - this.cpu = cpu; - this.disk = disk; - this.inboundNetwork = inboundNetwork; - this.outboundNetwork = outboundNetwork; - this.doc = brokerId == -1 ? DEFAULT_BROKER_DOC : "Capacity for Broker " + brokerId; - } - - /** - * @return Broker ID - */ - protected Integer getId() { - return id; - } - - /** - * @return CPU capacity - */ - public CpuCapacity getCpu() { - return cpu; - } - - /** - * @return Disk capacity - */ - protected DiskCapacity getDisk() { - return disk; - } - - /** - * @return Inbound network capacity - */ - public String getInboundNetwork() { - return inboundNetwork; - } - - /** - * @return Outbound network capacity - */ - public String getOutboundNetwork() { - return outboundNetwork; - } - - protected String getDoc() { - return doc; - } - - protected void setCpu(CpuCapacity cpu) { - this.cpu = cpu; - } - - protected void setInboundNetwork(String inboundNetwork) { - this.inboundNetwork = inboundNetwork; - } - - protected void setOutboundNetwork(String outboundNetwork) { - this.outboundNetwork = outboundNetwork; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java deleted file mode 100644 index e1bc6e26e26..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -import io.fabric8.kubernetes.api.model.Quantity; -import io.fabric8.kubernetes.api.model.ResourceRequirements; -import io.strimzi.api.kafka.model.kafka.EphemeralStorage; -import io.strimzi.api.kafka.model.kafka.JbodStorage; -import io.strimzi.api.kafka.model.kafka.KafkaSpec; -import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; -import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; -import io.strimzi.api.kafka.model.kafka.Storage; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; -import io.strimzi.operator.cluster.model.NodeRef; -import io.strimzi.operator.cluster.model.Quantities; -import io.strimzi.operator.cluster.model.StorageUtils; -import io.strimzi.operator.cluster.model.VolumeUtils; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.ReconciliationLogger; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -/** - * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for - * Cruise Control's Broker Capacity File Resolver. - * - * - * For example, it takes a Kafka Custom Resource like the following: - * - * spec: - * kafka: - * replicas: 3 - * storage: - * type: jbod - * volumes: - * - id: 0 - * type: persistent-claim - * size: 100Gi - * deleteClaim: false - * - id: 1 - * type: persistent-claim - * size: 200Gi - * deleteClaim: false - * cruiseControl: - * brokerCapacity: - * cpu: "1" - * inboundNetwork: 10000KB/s - * outboundNetwork: 10000KB/s - * overrides: - * - brokers: [0] - * cpu: "2.345" - * outboundNetwork: 40000KB/s - * - brokers: [1, 2] - * cpu: 4000m - * inboundNetwork: 60000KB/s - * outboundNetwork: 20000KB/s - * - * and uses the information to create Cruise Control BrokerCapacityFileResolver config file like the following: - * - * { - * "brokerCapacities":[ - * { - * "brokerId": "-1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log-1": "100000", - * "/var/lib/kafka1/kafka-log-1": "200000" - * }, - * "CPU": {"num.cores": "1"}, - * "NW_IN": "10000", - * "NW_OUT": "10000" - * }, - * "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - * }, - * { - * "brokerId": "0", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log0": "100000", - * "/var/lib/kafka1/kafka-log0": "200000" - * }, - * "CPU": {"num.cores": "2.345"}, - * "NW_IN": "10000", - * "NW_OUT": "40000" - * }, - * "doc": "Capacity for Broker 0" - * }, - * { - * "brokerId": "1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log1": "100000", - * "/var/lib/kafka1/kafka-log1": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 1" - * }, - * "brokerId": "2", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log2": "100000", - * "/var/lib/kafka1/kafka-log2": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 2" - * } - * ] - * } - */ -public class Capacity { - protected static final ReconciliationLogger LOGGER = ReconciliationLogger.create(Capacity.class.getName()); - private final Reconciliation reconciliation; - private final TreeMap capacityEntries; - - /** - * Broker capacities key - */ - public static final String CAPACITIES_KEY = "brokerCapacities"; - - /** - * Capacity key - */ - public static final String CAPACITY_KEY = "capacity"; - - /** - * Disk key - */ - public static final String DISK_KEY = "DISK"; - - /** - * CPU key - */ - public static final String CPU_KEY = "CPU"; - - /** - * Inbound network key - */ - public static final String INBOUND_NETWORK_KEY = "NW_IN"; - - /** - * Outbound network key - */ - public static final String OUTBOUND_NETWORK_KEY = "NW_OUT"; - - /** - * Resource type - */ - public static final String RESOURCE_TYPE = "cpu"; - - private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; - private static final String KAFKA_LOG_DIR = "kafka-log"; - private static final String BROKER_ID_KEY = "brokerId"; - private static final String DOC_KEY = "doc"; - - private enum ResourceRequirementType { - REQUEST, - LIMIT; - - private Quantity getQuantity(ResourceRequirements resources) { - Map resourceRequirement = switch (this) { - case REQUEST -> resources.getRequests(); - case LIMIT -> resources.getLimits(); - }; - if (resourceRequirement != null) { - return resourceRequirement.get(RESOURCE_TYPE); - } - return null; - } - } - - /** - * Constructor - * - * @param reconciliation Reconciliation marker - * @param spec Spec of the Kafka custom resource - * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster - * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools - * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools - */ - public Capacity( - Reconciliation reconciliation, - KafkaSpec spec, - Set kafkaBrokerNodes, - Map kafkaStorage, - Map kafkaBrokerResources - ) { - this.reconciliation = reconciliation; - this.capacityEntries = new TreeMap<>(); - - processCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); - } - - private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) { - if (resources != null) { - Quantity quantity = requirementType.getQuantity(resources); - if (quantity != null) { - return Quantities.parseCpuAsMilliCpus(quantity.toString()); - } - } - return null; - } - - private static CpuCapacity getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { - Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST); - Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT); - - if (request != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(request)); - } else if (limit != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(limit)); - } else { - return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); - } - } - - /** - * The brokerCapacity overrides per broker take top precedence, then general brokerCapacity configuration, and then the Kafka resource requests, then the Kafka resource limits. - * For example: - * (1) brokerCapacity overrides - * (2) brokerCapacity - * (3) Kafka resource requests - * (4) Kafka resource limits - * When none of Cruise Control CPU capacity configurations mentioned above are configured, CPU capacity will be set to `1`. - * - * @param override brokerCapacity overrides (per broker) - * @param bc brokerCapacity (for all brokers) - * @param resourceRequirements Kafka resource requests and limits (for all brokers) - * @return A {@link CpuCapacity} object containing the specified capacity for a broker - */ - private CpuCapacity processCpu(BrokerCapacityOverride override, io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, ResourceRequirements resourceRequirements) { - if (override != null && override.getCpu() != null) { - return new CpuCapacity(override.getCpu()); - } else if (bc != null && bc.getCpu() != null) { - return new CpuCapacity(bc.getCpu()); - } - return getCpuBasedOnRequirements(resourceRequirements); - } - - private static DiskCapacity processDisk(Storage storage, int brokerId) { - if (storage instanceof JbodStorage) { - return generateJbodDiskCapacity(storage, brokerId); - } else { - return generateDiskCapacity(storage); - } - } - - private static String processInboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getInboundNetwork() != null) { - return getThroughputInKiB(override.getInboundNetwork()); - } else if (bc != null && bc.getInboundNetwork() != null) { - return getThroughputInKiB(bc.getInboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - private static String processOutboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getOutboundNetwork() != null) { - return getThroughputInKiB(override.getOutboundNetwork()); - } else if (bc != null && bc.getOutboundNetwork() != null) { - return getThroughputInKiB(bc.getOutboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - /** - * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @param brokerId Id of the broker - * @return Disk capacity configuration value for broker brokerId - */ - private static DiskCapacity generateJbodDiskCapacity(Storage storage, int brokerId) { - DiskCapacity disks = new DiskCapacity(); - String size = ""; - - for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { - String name = VolumeUtils.createVolumePrefix(volume.getId(), true); - String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; - - if (volume instanceof PersistentClaimStorage) { - size = ((PersistentClaimStorage) volume).getSize(); - } else if (volume instanceof EphemeralStorage) { - size = ((EphemeralStorage) volume).getSizeLimit(); - } - disks.add(path, String.valueOf(getSizeInMiB(size))); - } - return disks; - } - - /** - * Generate total disk capacity using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @return Disk capacity per broker - */ - private static DiskCapacity generateDiskCapacity(Storage storage) { - if (storage instanceof PersistentClaimStorage) { - return DiskCapacity.of(getSizeInMiB(((PersistentClaimStorage) storage).getSize())); - } else if (storage instanceof EphemeralStorage) { - if (((EphemeralStorage) storage).getSizeLimit() != null) { - return DiskCapacity.of(getSizeInMiB(((EphemeralStorage) storage).getSizeLimit())); - } else { - return DiskCapacity.of(BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB); - } - } else if (storage == null) { - throw new IllegalStateException("The storage declaration is missing"); - } else { - throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); - } - } - - /* - * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, - * into the equivalent number of mebibytes represented as a String. - * - * @param size The String representation of the volume size. - * @return The equivalent number of mebibytes. - */ - private static String getSizeInMiB(String size) { - if (size == null) { - return BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB; - } - return String.valueOf(StorageUtils.convertTo(size, "Mi")); - } - - /** - * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, - * into the equivalent number of kibibytes represented as a String. - * - * @param throughput The String representation of the throughput. - * @return The equivalent number of kibibytes. - */ - public static String getThroughputInKiB(String throughput) { - String size = throughput.substring(0, throughput.indexOf("B")); - return String.valueOf(StorageUtils.convertTo(size, "Ki")); - } - - private void processCapacityEntries(CruiseControlSpec spec, Set kafkaBrokerNodes, Map kafkaStorage, Map kafkaBrokerResources) { - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacity = spec.getBrokerCapacity(); - - String inboundNetwork = processInboundNetwork(brokerCapacity, null); - String outboundNetwork = processOutboundNetwork(brokerCapacity, null); - - // Initialize default capacities - for (NodeRef node : kafkaBrokerNodes) { - DiskCapacity disk = processDisk(kafkaStorage.get(node.poolName()), node.nodeId()); - CpuCapacity cpu = processCpu(null, brokerCapacity, kafkaBrokerResources.get(node.poolName())); - - BrokerCapacity broker = new BrokerCapacity(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); - capacityEntries.put(node.nodeId(), broker); - } - - // Override default capacities - if (brokerCapacity != null) { - List overrides = brokerCapacity.getOverrides(); - // Override broker entries - if (overrides != null) { - if (overrides.isEmpty()) { - LOGGER.warnCr(reconciliation, "Ignoring empty overrides list"); - } else { - // For checking for duplicate brokerIds - Set overrideIds = new HashSet<>(); - for (BrokerCapacityOverride override : overrides) { - List ids = override.getBrokers(); - inboundNetwork = processInboundNetwork(brokerCapacity, override); - outboundNetwork = processOutboundNetwork(brokerCapacity, override); - for (int id : ids) { - if (id == BrokerCapacity.DEFAULT_BROKER_ID) { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override with illegal broker id -1."); - } else { - if (capacityEntries.containsKey(id)) { - if (overrideIds.add(id)) { - BrokerCapacity brokerCapacityEntry = capacityEntries.get(id); - brokerCapacityEntry.setCpu(processCpu(override, brokerCapacity, kafkaBrokerResources.get(Integer.toString(id)))); - brokerCapacityEntry.setInboundNetwork(inboundNetwork); - brokerCapacityEntry.setOutboundNetwork(outboundNetwork); - } else { - LOGGER.warnCr(reconciliation, "Duplicate broker id {} found in overrides, using first occurrence.", id); - } - } else { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); - overrideIds.add(id); - } - } - } - } - } - } - } - } - /** - * Generate broker capacity entry for capacity configuration. - * - * @param brokerCapacity Broker capacity object - * @return Broker entry as a JsonObject - */ - private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) { - return new JsonObject() - .put(BROKER_ID_KEY, brokerCapacity.getId()) - .put(CAPACITY_KEY, new JsonObject() - .put(DISK_KEY, brokerCapacity.getDisk().getJson()) - .put(CPU_KEY, brokerCapacity.getCpu().getJson()) - .put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork()) - .put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork()) - ) - .put(DOC_KEY, brokerCapacity.getDoc()); - } - - /** - * Generate a capacity configuration for cluster - * - * @return Cruise Control capacity configuration as a JsonObject - */ - public JsonObject generateCapacityConfig() { - JsonArray brokerList = new JsonArray(); - for (BrokerCapacity brokerCapacity : capacityEntries.values()) { - JsonObject brokerEntry = generateBrokerCapacity(brokerCapacity); - brokerList.add(brokerEntry); - } - - JsonObject config = new JsonObject(); - config.put("brokerCapacities", brokerList); - - return config; - } - - @Override - public String toString() { - return generateCapacityConfig().encodePrettily(); - } - - /** - * @return Capacity entries - */ - public TreeMap getCapacityEntries() { - return capacityEntries; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java new file mode 100644 index 00000000000..236e5e3914e --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java @@ -0,0 +1,313 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.ReconciliationLogger; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.INBOUND_NETWORK; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; + +/** + * Uses information in a `Kafka` and `KafkaNodePool` custom resources to generate a capacity configuration file to + * be used for Cruise Control's Broker Capacity File Resolver. + * + * + * For example, it takes a Kafka Custom Resource like the following: + * + * spec: + * kafka: + * replicas: 3 + * storage: + * type: jbod + * volumes: + * - id: 0 + * type: persistent-claim + * size: 100Gi + * deleteClaim: false + * - id: 1 + * type: persistent-claim + * size: 200Gi + * deleteClaim: false + * cruiseControl: + * brokerCapacity: + * cpu: "1" + * inboundNetwork: 10000KB/s + * outboundNetwork: 10000KB/s + * overrides: + * - brokers: [0] + * cpu: "2.345" + * outboundNetwork: 40000KB/s + * - brokers: [1, 2] + * cpu: 4000m + * inboundNetwork: 60000KB/s + * outboundNetwork: 20000KB/s + * + * and uses the information to create Cruise Control BrokerCapacityFileResolver config file like the following: + * + * { + * "brokerCapacities":[ + * { + * "brokerId": "-1", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log-1": "100000", + * "/var/lib/kafka1/kafka-log-1": "200000" + * }, + * "CPU": {"num.cores": "1"}, + * "NW_IN": "10000", + * "NW_OUT": "10000" + * }, + * "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." + * }, + * { + * "brokerId": "0", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log0": "100000", + * "/var/lib/kafka1/kafka-log0": "200000" + * }, + * "CPU": {"num.cores": "2.345"}, + * "NW_IN": "10000", + * "NW_OUT": "40000" + * }, + * "doc": "Capacity for Broker 0" + * }, + * { + * "brokerId": "1", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log1": "100000", + * "/var/lib/kafka1/kafka-log1": "200000" + * }, + * "CPU": {"num.cores": "4"}, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 1" + * }, + * "brokerId": "2", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log2": "100000", + * "/var/lib/kafka1/kafka-log2": "200000" + * }, + * "CPU": {"num.cores": "4"}, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 2" + * } + * ] + * } + */ +public class CapacityConfiguration { + /** + * Broker capacities key + */ + public static final String CAPACITIES_KEY = "brokerCapacities"; + /** + * Capacity key + */ + public static final String CAPACITY_KEY = "capacity"; + + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CapacityConfiguration.class.getName()); + + private static final String BROKER_ID_KEY = "brokerId"; + private static final String DOC_KEY = "doc"; + + private final Reconciliation reconciliation; + private final TreeMap capacityEntries; + + private boolean isCpuConfigured; + private boolean isInboundNetworkConfigured; + private boolean isOutboundNetworkConfigured; + + /** + * Constructor + * + * @param reconciliation Reconciliation marker. + * @param spec Spec of the Kafka custom resource. + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster. + * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools. + * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools. + */ + public CapacityConfiguration( + Reconciliation reconciliation, + KafkaSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources + ) { + this.reconciliation = reconciliation; + this.capacityEntries = new TreeMap<>(); + + generateCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); + } + + private Map processBrokerCapacityOverrides(Set kafkaBrokerNodes, BrokerCapacity brokerCapacity) { + Map overrideMap = new HashMap<>(); + List overrides = null; + if (brokerCapacity != null) { + overrides = brokerCapacity.getOverrides(); + } + if (overrides != null) { + if (overrides.isEmpty()) { + LOGGER.warnCr(reconciliation, "Ignoring empty overrides list"); + } else { + for (BrokerCapacityOverride override : overrides) { + List ids = override.getBrokers(); + for (int id : ids) { + if (overrideMap.containsKey(id)) { + LOGGER.warnCr(reconciliation, "Duplicate broker id {} found in overrides, using first occurrence.", id); + } else if (kafkaBrokerNodes.stream().noneMatch(node -> node.nodeId() == id)) { + LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); + } else { + overrideMap.put(id, override); + } + } + } + } + } + return overrideMap; + } + + private void generateCapacityEntries(CruiseControlSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources) { + BrokerCapacity generalBrokerCapacity = spec.getBrokerCapacity(); + Map brokerCapacityOverrideMap = processBrokerCapacityOverrides(kafkaBrokerNodes, generalBrokerCapacity); + + for (NodeRef node : kafkaBrokerNodes) { + BrokerCapacityOverride brokerCapacityOverride = brokerCapacityOverrideMap.get(node.nodeId()); + + DiskCapacity disk = new DiskCapacity(kafkaStorage.get(node.poolName()), node.nodeId()); + CpuCapacity cpu = new CpuCapacity(generalBrokerCapacity, brokerCapacityOverride, kafkaBrokerResources.get(node.poolName())); + InboundNetworkCapacity inboundNetwork = new InboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride); + OutboundNetworkCapacity outboundNetwork = new OutboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride); + + CapacityEntry capacityEntry = new CapacityEntry(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); + capacityEntries.put(node.nodeId(), capacityEntry); + } + isCpuConfigured = CPU.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); + isInboundNetworkConfigured = INBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); + isOutboundNetworkConfigured = OUTBOUND_NETWORK.isCapacityConfigured(generalBrokerCapacity, kafkaBrokerNodes, kafkaBrokerResources); + } + + /** + * Indicates whether the CPU capacity settings were explicitly configured by the user. + * + * @return {@code true} if CPU capacity is user-configured; {@code false} otherwise. + */ + public boolean isCpuConfigured() { + return this.isCpuConfigured; + } + + /** + * Indicates whether the inbound network capacity settings were explicitly configured by the user. + * + * @return {@code true} if inbound network capacity is user-configured; {@code false} otherwise. + */ + public boolean isInboundNetworkConfigured() { + return this.isInboundNetworkConfigured; + } + + /** + * Indicates whether the outbound network capacity settings were explicitly configured by the user. + * + * @return {@code true} if outbound network capacity is user-configured; {@code false} otherwise. + */ + public boolean isOutboundNetworkConfigured() { + return this.isOutboundNetworkConfigured; + } + + /** + * Checks whether this resource type has its capacityConfiguration homogeneously configured across all brokers. + * @param resource The resource type. + * @return {@code true} if capacityConfiguration is homogeneously configured; {@code false} otherwise. + */ + public boolean isCapacityHomogeneouslyConfigured(String resource) { + return this.getCapacityEntries().values().stream() + .map(entry -> entry.capacity().get(resource)) + .distinct() + .limit(2) + .count() == 1; + } + + /** + * Represents a Cruise Control capacity entry configuration for a Kafka broker. + * + * @param id The broker ID. + * @param capacity The capacity map for the broker capacity entry. + * @param doc A human-readable description of this capacity entry. + */ + public record CapacityEntry( + int id, + Map capacity, + String doc + ) { + private CapacityEntry(int id, CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork, + OutboundNetworkCapacity outboundNetwork) { + this(id, buildCapacityMap(cpu, disk, inboundNetwork, outboundNetwork), "Capacity for Broker " + id); + } + + private static Map buildCapacityMap(CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork, + OutboundNetworkCapacity outboundNetwork) { + Map map = new HashMap<>(); + map.put(DiskCapacity.KEY, disk.getJson()); + map.put(CpuCapacity.KEY, cpu.getJson()); + map.put(InboundNetworkCapacity.KEY, inboundNetwork.toString()); + map.put(OutboundNetworkCapacity.KEY, outboundNetwork.toString()); + return map; + } + } + + /** + * Generate a capacity configuration for cluster. + * + * @return Cruise Control capacity configuration as a String. + */ + @Override + public String toString() { + JsonArray capacityList = new JsonArray(); + for (CapacityEntry capacityEntry : capacityEntries.values()) { + JsonObject capacityJson = new JsonObject(); + capacityEntry.capacity.forEach(capacityJson::put); + + JsonObject capacityEntryJson = new JsonObject() + .put(BROKER_ID_KEY, capacityEntry.id) + .put(CAPACITY_KEY, capacityJson) + .put(DOC_KEY, capacityEntry.doc()); + + capacityList.add(capacityEntryJson); + } + + return new JsonObject().put(CAPACITIES_KEY, capacityList).encodePrettily(); + } + + /** + * @return Capacity entries + */ + public TreeMap getCapacityEntries() { + return capacityEntries; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java index 352b70b3cb3..5a853853a2c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java @@ -4,16 +4,27 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; import io.strimzi.operator.cluster.model.Quantities; import io.vertx.core.json.JsonObject; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; + /** - * Cruise Control CPU Capacity class + * Cruise Control CPU capacity configuration for broker. */ public class CpuCapacity { + protected static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "CPU"; + private static final String CORES_KEY = "num.cores"; - private final String cores; + private final JsonObject config = new JsonObject(); /** * Constructor @@ -21,24 +32,40 @@ public class CpuCapacity { * @param cores CPU cores configuration */ public CpuCapacity(String cores) { - this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores)); + config.put(CORES_KEY, milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores))); + } + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). + */ + public CpuCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + this(CPU.processResourceCapacity(brokerCapacity, brokerCapacityOverride, resourceRequirements)); } - protected static String milliCpuToCpu(int milliCPU) { + private static String milliCpuToCpu(int milliCPU) { return String.valueOf(milliCPU / 1000.0); } /** - * Returns CpuCapacity object as a JsonObject + * Returns capacity value as a JsonObject. * - * @return The CpuCapacity object as a JsonObject + * @return The capacity value as a JsonObject. */ public JsonObject getJson() { - return new JsonObject().put(CORES_KEY, this.cores); + return config; } @Override public String toString() { - return this.getJson().toString(); + return config.toString(); } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java index 7f6596656e4..01e82600d9b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfiguration.java @@ -2,7 +2,6 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ - package io.strimzi.operator.cluster.model.cruisecontrol; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; @@ -12,7 +11,7 @@ import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -65,8 +64,6 @@ public class CruiseControlConfiguration extends AbstractConfiguration { CruiseControlGoals.CPU_CAPACITY_GOAL.toString() ); - private static final String CRUISE_CONTROL_HARD_GOALS = String.join(",", CRUISE_CONTROL_HARD_GOALS_LIST); - protected static final List CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST = List.of( CruiseControlGoals.RACK_AWARENESS_GOAL.toString(), CruiseControlGoals.MIN_TOPIC_LEADERS_PER_BROKER_GOAL.toString(), @@ -81,26 +78,34 @@ public class CruiseControlConfiguration extends AbstractConfiguration { String.join(",", CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST); /** - * Map containing default values for required configuration properties. The map needs to be sorted so that the order + * Generates map containing default values for required configuration properties. The map needs to be sorted so that the order * of the entries in the Cruise Control configuration is deterministic and does not cause unnecessary rolling updates * of Cruise Control deployment. + * + * @param capacityConfiguration The object containing Cruise Control capacity configuration information. + * + * @return Map containing default values for required configuration properties. */ - private static final Map CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP = Collections.unmodifiableSortedMap(new TreeMap<>(Map.ofEntries( - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)), - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"), - Map.entry(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))), - Map.entry(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS), - Map.entry(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS), - Map.entry(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_HARD_GOALS), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE), - Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)), - Map.entry(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME), - Map.entry(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME), - Map.entry(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME) - ))); + public static Map generateCruiseControlDefaultPropertiesMap(CapacityConfiguration capacityConfiguration) { + TreeMap map = new TreeMap<>(); + map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)); + map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"); + map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)); + map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"); + map.put(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))); + map.put(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS); + map.put(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), + String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_GOALS_LIST, capacityConfiguration))); + map.put(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), + String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_HARD_GOALS_LIST, capacityConfiguration))); + map.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)); + map.put(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE); + map.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)); + map.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME); + map.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME); + map.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME); + return map; + } private static final List FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIXES); private static final List FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIX_EXCEPTIONS); @@ -118,10 +123,42 @@ public CruiseControlConfiguration(Reconciliation reconciliation, Iterable getCruiseControlDefaultPropertiesMap() { - return CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP; + /* test */ static List filterResourceGoalsWithoutCapacityConfig(List goalList, + CapacityConfiguration capacityConfiguration) { + List filteredGoalList = new ArrayList<>(goalList); + if (!capacityConfiguration.isInboundNetworkConfigured()) { + filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + } else { + if (!capacityConfiguration.isCapacityHomogeneouslyConfigured(InboundNetworkCapacity.KEY)) { + /* + * The LeaderBytesInDistributionGoal does not account for heterogeneous inbound network capacities. + * See: https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal.java + * Therefore, if the INBOUND_NETWORK capacityConfiguration settings are not homogeneously configured, + * we remove this goal from the list. + */ + filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + } + } + if (!capacityConfiguration.isOutboundNetworkConfigured()) { + filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL.toString()); + } + if (!capacityConfiguration.isCpuConfigured()) { + filteredGoalList.remove(CruiseControlGoals.CPU_CAPACITY_GOAL.toString()); + filteredGoalList.remove(CruiseControlGoals.CPU_USAGE_DISTRIBUTION_GOAL.toString()); + } + + return filteredGoalList; } private boolean isEnabledInConfiguration(String s1, String s2) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java index 56eb91757c5..b4996c6cc6a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java @@ -4,50 +4,128 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.strimzi.api.kafka.model.kafka.EphemeralStorage; +import io.strimzi.api.kafka.model.kafka.JbodStorage; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; +import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.operator.cluster.model.StorageUtils; +import io.strimzi.operator.cluster.model.VolumeUtils; import io.vertx.core.json.JsonObject; import java.util.HashMap; import java.util.Map; /** - * Cruise Control disk capacity + * Cruise Control disk capacity configuration for broker. */ public class DiskCapacity { + protected static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "DISK"; + + private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; + private static final String KAFKA_LOG_DIR = "kafka-log"; private static final String SINGLE_DISK = ""; - private Map map; + + private final Map config; /** * Constructor + * + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. */ - public DiskCapacity() { - map = new HashMap<>(1); + public DiskCapacity(Storage storage, int brokerId) { + if (storage instanceof JbodStorage) { + config = generateJbodDiskConfig(storage, brokerId); + } else { + config = generateNonJbodDiskConfig(storage); + } } - private DiskCapacity(String size) { - this(); - map.put(SINGLE_DISK, size); + /** + * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, + * into the equivalent number of mebibytes represented as a String. + * + * @param size The String representation of the volume size. + * @return The equivalent number of mebibytes. + */ + private static String getSizeInMiB(String size) { + if (size == null) { + return ResourceCapacityType.DISK.getDefaultResourceCapacity(); + } + return String.valueOf(StorageUtils.convertTo(size, "Mi")); } - protected static DiskCapacity of(String size) { - return new DiskCapacity(size); + /** + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. + * + * @return Disk capacity configuration value for broker brokerId. + */ + private Map generateJbodDiskConfig(Storage storage, int brokerId) { + String size = ""; + Map diskConfig = new HashMap<>(); + for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { + String name = VolumeUtils.createVolumePrefix(volume.getId(), true); + String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; + + if (volume instanceof PersistentClaimStorage) { + size = ((PersistentClaimStorage) volume).getSize(); + } else if (volume instanceof EphemeralStorage) { + size = ((EphemeralStorage) volume).getSizeLimit(); + } + diskConfig.put(path, String.valueOf(getSizeInMiB(size))); + } + return diskConfig; } - protected void add(String path, String size) { - if (path == null || SINGLE_DISK.equals(path)) { - throw new IllegalArgumentException("The disk path cannot be null or empty"); + /** + * Generate total disk capacity using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * + * @return Disk capacity configuration value for broker. + */ + private static Map generateNonJbodDiskConfig(Storage storage) { + String size; + if (storage instanceof PersistentClaimStorage) { + size = getSizeInMiB(((PersistentClaimStorage) storage).getSize()); + } else if (storage instanceof EphemeralStorage) { + if (((EphemeralStorage) storage).getSizeLimit() != null) { + size = getSizeInMiB(((EphemeralStorage) storage).getSizeLimit()); + } else { + size = DEFAULT_DISK_CAPACITY_IN_MIB; + } + } else if (storage == null) { + throw new IllegalStateException("The storage declaration is missing"); + } else { + throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); } - map.put(path, size); + return Map.of(SINGLE_DISK, size); } protected Object getJson() { - if (map.size() == 1 && map.containsKey(SINGLE_DISK)) { - return map.get(SINGLE_DISK); + if (config.size() == 1 && config.containsKey(SINGLE_DISK)) { + return config.get(SINGLE_DISK); } else { JsonObject disks = new JsonObject(); - for (Map.Entry e : map.entrySet()) { + for (Map.Entry e : config.entrySet()) { disks.put(e.getKey(), e.getValue()); } return disks; } } + + @Override + public String toString() { + return this.getJson().toString(); + } } \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java new file mode 100644 index 00000000000..93c778ff535 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java @@ -0,0 +1,33 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.INBOUND_NETWORK; + +/** + * Cruise Control inbound network capacity configuration for broker. + */ +public class InboundNetworkCapacity extends NetworkCapacity { + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "NW_IN"; + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + public InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(INBOUND_NETWORK.processResourceCapacity(brokerCapacity, brokerCapacityOverride, null))); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java new file mode 100644 index 00000000000..126562e689a --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java @@ -0,0 +1,36 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.operator.cluster.model.StorageUtils; + +/** + * Cruise Control network capacity configuration for broker. + */ +public class NetworkCapacity { + protected static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s"; + protected String config; + + protected NetworkCapacity(String config) { + this.config = config; + } + + /** + * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, + * into the equivalent number of kibibytes represented as a String. + * + * @param throughput The String representation of the throughput. + * @return The equivalent number of kibibytes. + */ + public static String getThroughputInKiB(String throughput) { + String size = throughput.substring(0, throughput.indexOf("B")); + return String.valueOf(StorageUtils.convertTo(size, "Ki")); + } + + @Override + public String toString() { + return config; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java new file mode 100644 index 00000000000..8b9d2ae5387 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java @@ -0,0 +1,33 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; + +/** + * Cruise Control outbound network capacity configuration for broker. + */ +public class OutboundNetworkCapacity extends NetworkCapacity { + /** + * Key used to identify resource in broker entry in Cruise Control capacity configuration. + */ + public static final String KEY = "NW_OUT"; + + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + public OutboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(OUTBOUND_NETWORK.processResourceCapacity(brokerCapacity, brokerCapacityOverride, null))); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java new file mode 100644 index 00000000000..0a3123cd4af --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceCapacityType.java @@ -0,0 +1,169 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.operator.cluster.model.NodeRef; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity.DEFAULT_CPU_CORE_CAPACITY; +import static io.strimzi.operator.cluster.model.cruisecontrol.DiskCapacity.DEFAULT_DISK_CAPACITY_IN_MIB; +import static io.strimzi.operator.cluster.model.cruisecontrol.NetworkCapacity.DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND; + +/** + * Enum representing the various types of resource capacities used in Cruise Control configuration. + * Each enum constant maps to a specific resource type (e.g. CPU, Disk, Inbound Network, and Outbound Network), + * with methods for retrieving configured values from both default broker capacity and broker-specific overrides. + */ +public enum ResourceCapacityType { + /** + * Disk capacity configuration. + *

+ * Disk capacity configuration is handled automatically by Strimzi, there is no need for cluster or broker-specific overrides. + */ + DISK(null, null, DEFAULT_DISK_CAPACITY_IN_MIB), + + /** + * CPU capacity configuration. + */ + CPU(BrokerCapacity::getCpu, BrokerCapacityOverride::getCpu, DEFAULT_CPU_CORE_CAPACITY), + + /** + * Inbound network capacity configuration. + */ + INBOUND_NETWORK(BrokerCapacity::getInboundNetwork, BrokerCapacityOverride::getInboundNetwork, DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND), + + /** + * Outbound network capacity configuration. + */ + OUTBOUND_NETWORK(BrokerCapacity::getOutboundNetwork, BrokerCapacityOverride::getOutboundNetwork, DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND); + + private final Function generalResourceCapacityGetter; + private final Function overriddenResourceCapacityGetter; + private final String defaultResourceCapacity; + + ResourceCapacityType(Function generalResourceCapacityGetter, + Function overriddenResourceCapacityGetter, + String defaultResourceCapacity + ) { + this.generalResourceCapacityGetter = generalResourceCapacityGetter; + this.overriddenResourceCapacityGetter = overriddenResourceCapacityGetter; + this.defaultResourceCapacity = defaultResourceCapacity; + } + + private String getGeneralResourceCapacity(BrokerCapacity brokerCapacity) { + return generalResourceCapacityGetter.apply(brokerCapacity); + } + + private String getOverriddenResourceCapacity(BrokerCapacityOverride override) { + return overriddenResourceCapacityGetter.apply(override); + } + + /** + * @return The default capacity value for given resource. + */ + public String getDefaultResourceCapacity() { + return defaultResourceCapacity; + } + + /** + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + *

+ * The broker-specific capacity override takes top precedence, then general brokerCapacity configuration, + * and then the Kafka resource requests (only for CPU), then the Kafka resource limits (only for CPU), then resource + * default. + * + * For example: + *

    + *
  • (1) The brokerCapacityOverride for a specific broker. + *
  • (2) The general brokerCapacity configuration. + *
  • (3) Kafka resource requests (CPU specific) + *
  • (4) Kafka resource limits (CPU specific) + *
  • (5) The resource default. + *
+ * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). + * + * @return The capacity of resource represented as a String. + */ + public String processResourceCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + if (brokerCapacityOverride != null && getOverriddenResourceCapacity(brokerCapacityOverride) != null) { + return getOverriddenResourceCapacity(brokerCapacityOverride); + } + + if (brokerCapacity != null && getGeneralResourceCapacity(brokerCapacity) != null) { + return getGeneralResourceCapacity(brokerCapacity); + } + + if (this == CPU) { + String cpuBasedOnRequirements = ResourceRequirementsUtils.getCpuBasedOnRequirements(resourceRequirements); + if (cpuBasedOnRequirements != null) { + return cpuBasedOnRequirements; + } + } + + return getDefaultResourceCapacity(); + } + + /** + * Checks whether this resource type has its capacity properly configured for Cruise Control goals that are + * dependent on that resource usage. + *

+ * The configuration can come from: + *

    + *
  • The default capacity defined in the {@link BrokerCapacity} section of the Cruise Control spec
  • + *
  • Broker-specific overrides, where each override must define a value for this resource type
  • + *
  • For {@link #CPU}, it may also be configured via Kafka custom resource requests and limits. + *
+ * + * @param brokerCapacity The brokerCapacity section of the Cruise Control specification from the Kafka custom resource. + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster. + * @param kafkaBrokerResources A map of broker IDs to their Kubernetes resource requirements. + * @return {@code true} if the capacity for this resource type is considered configured, otherwise {@code false.} + */ + public boolean isCapacityConfigured(BrokerCapacity brokerCapacity, + Set kafkaBrokerNodes, + Map kafkaBrokerResources) { + if (brokerCapacity != null) { + if (getGeneralResourceCapacity(brokerCapacity) != null) { + return true; + } + + List overrides = brokerCapacity.getOverrides(); + if (overrides != null && overrides.stream().allMatch(o -> getOverriddenResourceCapacity(o) != null)) { + return true; + } + } + + /* + * Although some resource requirements may be configured for some brokers, the CPU capacity for the cluster is + * only considered to be properly configured for CPU-dependent Cruise Control goals if: + * (1) The resource requirements are defined for every broker in the cluster. + * (2) The CPU requests == limits for every broker in the cluster. + * + * This ensures that each broker's CPU capacity has been explicitly defined as a guaranteed resource, + * enabling more accurate rebalancing decisions based on CPU availability. + */ + if (this == CPU + && kafkaBrokerNodes.stream().allMatch(node -> kafkaBrokerResources.containsKey(node.poolName())) + && ResourceRequirementsUtils.cpuRequestsMatchLimits(kafkaBrokerResources)) { + return true; + } + + return false; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java new file mode 100644 index 00000000000..08f9a0344e1 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ResourceRequirementsUtils.java @@ -0,0 +1,137 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; + +import java.util.Map; + +/** + * Utility methods for working with resource request and limit values from the resource requirements section + * of Strimzi custom resources. Used for evaluating whether Cruise Control capacity settings for CPU are properly + * configured. + */ +public class ResourceRequirementsUtils { + + /** + * Enum representing resource types used in the resource requirements section of Strimzi custom resources. + */ + public enum ResourceType { + /** + * Represents the memory resource type (e.g., "memory"). + */ + MEMORY("memory"), + + /** + * Represents the CPU resource type (e.g., "cpu"). + */ + CPU("cpu"); + + private final String value; + + ResourceType(String value) { + this.value = value; + } + + /** + * Returns the string value of the resource type. + * + * @return the resource type name. + */ + public String value() { + return value; + } + } + + /** + * Enum representing resource requirement types used in the resource requirements section Strimzi custom resources. + */ + private enum ResourceRequirementsType { + /** + * Represents the resource request value. + */ + REQUEST, + + /** + * Represents the resource limit value. + */ + LIMIT; + } + + private static Quantity getQuantity(ResourceRequirements resources, + ResourceRequirementsType requirementType, + ResourceRequirementsUtils.ResourceType resourceType) { + if (resources == null || resourceType == null) { + return null; + } + + Map resourceRequirement = switch (requirementType) { + case REQUEST -> resources.getRequests(); + case LIMIT -> resources.getLimits(); + }; + + return resourceRequirement != null ? resourceRequirement.get(resourceType.value()) : null; + } + + /** + * Retrieves the CPU resource request quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the requested CPU quantity, or {@code null} if not specified + */ + private static Quantity getCpuRequest(ResourceRequirements resources) { + return getQuantity(resources, ResourceRequirementsType.REQUEST, ResourceType.CPU); + } + + /** + * Retrieves the CPU resource limit quantity from the given resource requirements. + * + * @param resources the resource requirements containing CPU and memory requests and limits + * @return the CPU limit quantity, or {@code null} if not specified + */ + private static Quantity getCpuLimit(ResourceRequirements resources) { + return getQuantity(resources, ResourceRequirementsType.LIMIT, ResourceType.CPU); + } + + /** + * Checks whether all Kafka broker pods have their CPU resource requests equal to their CPU limits. + * + * @param kafkaBrokerResources a map of broker pod names to their {@link ResourceRequirements} + * @return {@code true} if all brokers have matching CPU requests and limits; {@code false} otherwise + */ + protected static boolean cpuRequestsMatchLimits(Map kafkaBrokerResources) { + if (kafkaBrokerResources == null) { + return false; + } + for (ResourceRequirements resourceRequirements : kafkaBrokerResources.values()) { + Quantity request = getCpuRequest(resourceRequirements); + Quantity limit = getCpuLimit(resourceRequirements); + if (request == null || limit == null || request.compareTo(limit) != 0) { + return false; + } + } + return true; + } + + /** + * Derives the CPU capacity from the resource requirements section of Strimzi custom resource. + * + * @param resourceRequirements The Strimzi custom resource requirements containing CPU requests and/or limits. + * @return The CPU capacity as a {@link String}, or {@code null} if no CPU values are defined. + */ + protected static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { + Quantity request = getCpuRequest(resourceRequirements); + Quantity limit = getCpuLimit(resourceRequirements); + + if (request != null) { + return request.toString(); + } else if (limit != null) { + return limit.toString(); + } else { + return null; + } + } +} \ No newline at end of file diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index 260fa716f2e..51536003b71 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -56,9 +56,13 @@ import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; -import io.strimzi.operator.cluster.model.cruisecontrol.BrokerCapacity; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CapacityConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.DiskCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.InboundNetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.NetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.OutboundNetworkCapacity; +import io.strimzi.operator.cluster.model.cruisecontrol.ResourceRequirementsUtils.ResourceType; import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; @@ -87,6 +91,8 @@ import java.util.Set; import static io.strimzi.operator.cluster.model.CruiseControl.API_HEALTHCHECK_PATH; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU; +import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.OUTBOUND_NETWORK; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters.ANOMALY_DETECTION_CONFIG_KEY; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY; import static java.lang.String.format; @@ -171,17 +177,22 @@ public void testBrokerCapacity() { .endSpec() .build(); Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = + Map.of("brokers", + new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("400m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + Object diskCapacity = brokerCapacity.getValue(DiskCapacity.KEY); + JsonObject cpuCapacity = brokerCapacity.getJsonObject(CpuCapacity.KEY); assertThat(isJBOD(diskCapacity), is(true)); assertThat(cpuCapacity, is(expectedCpuCapacity)); @@ -226,34 +237,38 @@ public void testBrokerCapacityOverrides() { .endSpec() .build(); Map storage = Map.of("brokers", new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = + Map.of("brokers", new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("400m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + Object diskCapacity = brokerCapacity.getValue(DiskCapacity.KEY); assertThat(isJBOD(diskCapacity), is(false)); } - JsonObject brokerEntry0 = brokerEntries.getJsonObject(broker0).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry0.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry0.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry0.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); + JsonObject brokerEntry0 = brokerEntries.getJsonObject(broker0).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry0.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry0.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); + assertThat(brokerEntry0.getString(OutboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(OUTBOUND_NETWORK.getDefaultResourceCapacity()))); // When the same broker id is specified in brokers list of multiple overrides, use the value specified in the first override. - JsonObject brokerEntry1 = brokerEntries.getJsonObject(broker1).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry1.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry1.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry1.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); - - JsonObject brokerEntry2 = brokerEntries.getJsonObject(broker2).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry2.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry2.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); + JsonObject brokerEntry1 = brokerEntries.getJsonObject(broker1).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry1.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry1.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); + assertThat(brokerEntry1.getString(OutboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(OUTBOUND_NETWORK.getDefaultResourceCapacity()))); + + JsonObject brokerEntry2 = brokerEntries.getJsonObject(broker2).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + assertThat(brokerEntry2.getJsonObject(CpuCapacity.KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); + assertThat(brokerEntry2.getString(InboundNetworkCapacity.KEY), is(NetworkCapacity.getThroughputInKiB(inboundNetworkOverride0))); } @ParallelTest @@ -293,16 +308,20 @@ public void testBrokerCapacityGeneratedCpu() { .endSpec() .build(); Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("500m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); + Map resources = Map.of("brokers", + new ResourceRequirementsBuilder() + .withRequests(Map.of(ResourceType.CPU.value(), new Quantity("500m"))) + .withLimits(Map.of(ResourceType.CPU.value(), new Quantity("0.5"))) + .build()); CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); JsonObject expectedCpuCapacity = new CpuCapacity(userDefinedCpuCapacityOverride0).getJson(); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(CapacityConfiguration.CAPACITY_KEY); + JsonObject cpuCapacity = brokerCapacity.getJsonObject(CpuCapacity.KEY); assertThat(cpuCapacity, is(expectedCpuCapacity)); } } @@ -330,56 +349,56 @@ public void testBrokerCapacitiesWithPools() { CruiseControl cc = createCruiseControl(KAFKA, nodes, storage, resources); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); assertThat(brokerEntries.size(), is(6)); // Broker 0 JsonObject brokerEntry = brokerEntries.getJsonObject(0); assertThat(brokerEntry.getInteger("brokerId"), is(0)); - JsonObject brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + JsonObject brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - JsonObject brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + JsonObject brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log0"), is("102400.0")); // Broker 1 brokerEntry = brokerEntries.getJsonObject(1); assertThat(brokerEntry.getInteger("brokerId"), is(1)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log1"), is("102400.0")); // Broker 2 brokerEntry = brokerEntries.getJsonObject(2); assertThat(brokerEntry.getInteger("brokerId"), is(2)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log2"), is("102400.0")); // Broker 10 brokerEntry = brokerEntries.getJsonObject(3); assertThat(brokerEntry.getInteger("brokerId"), is(10)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log10"), is("1048576.0")); // Broker 11 brokerEntry = brokerEntries.getJsonObject(4); assertThat(brokerEntry.getInteger("brokerId"), is(11)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log11"), is("1048576.0")); // Broker 12 brokerEntry = brokerEntries.getJsonObject(5); assertThat(brokerEntry.getInteger("brokerId"), is(12)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); + brokerCpuCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY); assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); + brokerDiskCapacity = brokerEntry.getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(DiskCapacity.KEY); assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log12"), is("1048576.0")); } @@ -791,7 +810,7 @@ public void testCpuCapacityGeneration() { the CPU capacity will be set to DEFAULT_CPU_CORE_CAPACITY */ resources = Map.of("brokers", new ResourceRequirementsBuilder().build()); - verifyBrokerCapacity(storage, resources, brokerCapacityThree, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); + verifyBrokerCapacity(storage, resources, brokerCapacityThree, CPU.getDefaultResourceCapacity(), CPU.getDefaultResourceCapacity(), CPU.getDefaultResourceCapacity()); } private void verifyBrokerCapacity(Map storage, @@ -812,11 +831,11 @@ private void verifyBrokerCapacity(Map storage, ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); + JsonArray brokerEntries = capacity.getJsonArray(CapacityConfiguration.CAPACITIES_KEY); - assertThat(brokerEntries.getJsonObject(0).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerOneCpuValue))); - assertThat(brokerEntries.getJsonObject(1).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerTwoCpuValue))); - assertThat(brokerEntries.getJsonObject(2).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerThreeCpuValue))); + assertThat(brokerEntries.getJsonObject(0).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerOneCpuValue))); + assertThat(brokerEntries.getJsonObject(1).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerTwoCpuValue))); + assertThat(brokerEntries.getJsonObject(2).getJsonObject(CapacityConfiguration.CAPACITY_KEY).getJsonObject(CpuCapacity.KEY).getString("num.cores"), is(Matchers.equalTo(brokerThreeCpuValue))); } @ParallelTest diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java new file mode 100644 index 00000000000..027cbfd07fa --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CruiseControlConfigurationTest.java @@ -0,0 +1,219 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityBuilder; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverrideBuilder; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.cluster.model.cruisecontrol.ResourceRequirementsUtils.ResourceType; +import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.filterResourceGoalsWithoutCapacityConfig; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.CPU_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.CPU_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL; +import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class CruiseControlConfigurationTest { + + private static final String DEFAULT_CPU_CAPACITY = "1000m"; + private static final String DEFAULT_NETWORK_CAPACITY = "10000KiB/s"; + private static final Set NODES = Set.of( + new NodeRef("my-cluster-brokers-0", 0, "brokers", false, true), + new NodeRef("my-cluster-brokers-1", 1, "brokers", false, true), + new NodeRef("my-cluster-brokers-2", 2, "brokers", false, true) + ); + private static final Map STORAGE = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); + private static final List GOAL_LIST = List.of( + NETWORK_INBOUND_CAPACITY_GOAL.toString(), + NETWORK_OUTBOUND_CAPACITY_GOAL.toString(), + CPU_CAPACITY_GOAL.toString(), + POTENTIAL_NETWORK_OUTAGE_GOAL.toString(), + NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString(), + NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString(), + CPU_USAGE_DISTRIBUTION_GOAL.toString(), + LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString()); + + private static KafkaSpec createKafkaSpec(BrokerCapacity brokerCapacity) { + return new KafkaBuilder() + .withNewSpec() + .withNewCruiseControl() + .withBrokerCapacity(brokerCapacity) + .endCruiseControl() + .endSpec() + .build() + .getSpec(); + } + + private static BrokerCapacity createBrokerCapacity(String cpuCapacity, + String inboundNetworkCapacity, + String outboundNetworkCapacity) { + return new BrokerCapacityBuilder() + .withCpu(cpuCapacity) + .withInboundNetwork(inboundNetworkCapacity) + .withOutboundNetwork(outboundNetworkCapacity) + .build(); + } + + private static BrokerCapacityOverride createBrokerCapacityOverride(List brokerIds, + String cpuCapacity, + String inboundNetworkCapacity, + String outboundNetworkCapacity) { + return new BrokerCapacityOverrideBuilder() + .addAllToBrokers(brokerIds) + .withCpu(cpuCapacity) + .withInboundNetwork(inboundNetworkCapacity) + .withOutboundNetwork(outboundNetworkCapacity) + .build(); + } + + private static ResourceRequirements createCpuResourceRequirement(String request, String limit) { + return new ResourceRequirementsBuilder() + .addToRequests(ResourceType.CPU.value(), new Quantity(request)) + .addToLimits(ResourceType.CPU.value(), new Quantity(limit)) + .build(); + } + + private void assertGoalsPresent(List actual, CruiseControlGoals... expected) { + for (CruiseControlGoals goal : expected) { + assertThat("Expected goal to be present: " + goal, actual.contains(goal.toString()), is(true)); + } + } + + private void assertGoalsAbsent(List actual, CruiseControlGoals... expected) { + for (CruiseControlGoals goal : expected) { + assertThat("Expected goal to be absent: " + goal, actual.contains(goal.toString()), is(false)); + } + } + + @Test + public void shouldNotFilterGoalsWhenAllCapacitiesAreConfigured() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", new ResourceRequirementsBuilder().build()); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertThat(filteredGoals, containsInAnyOrder(GOAL_LIST.toArray())); + } + + @Test + public void shouldFilterCpuGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", new ResourceRequirementsBuilder().build()); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldNotFilterCpuGoalsWhenRequestsEqualLimits() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsPresent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterCpuGoalsWhenRequestsDoNotEqualLimits() { + BrokerCapacity bc = createBrokerCapacity(null, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); + CapacityConfiguration capacityConfiguration = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, capacityConfiguration); + assertGoalsAbsent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldNotFilterCpuGoalsWhenOverridesAreDefined() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, null, null); + bc.setOverrides(List.of(bco)); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, "2000m")); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, CPU_CAPACITY_GOAL, CPU_USAGE_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterInboundNetworkGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, null, DEFAULT_NETWORK_CAPACITY); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsAbsent(filteredGoals, NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_INBOUND_CAPACITY_GOAL, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterLeaderBytesInGoalWhenOverridesAreHeterogeneous() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco0 = createBrokerCapacityOverride(List.of(0), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco1 = createBrokerCapacityOverride(List.of(1), DEFAULT_CPU_CAPACITY, "5000KiB/s", DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco0, bco1)); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsAbsent(filteredGoals, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + } + + @Test + public void shouldNotFilterInboundNetworkGoalsWhenOverridesAreHomogeneous() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco)); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, LEADER_BYTES_IN_DISTRIBUTION_GOAL); + } + + @Test + public void shouldFilterOutboundNetworkGoalsWhenDefaultCapacityIsMissing() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, null); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsAbsent(filteredGoals, NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } + + @Test + public void shouldNotFilterOutboundNetworkGoalsWhenOverridesAreDefined() { + BrokerCapacity bc = createBrokerCapacity(DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + BrokerCapacityOverride bco = createBrokerCapacityOverride(List.of(0, 1, 2), DEFAULT_CPU_CAPACITY, DEFAULT_NETWORK_CAPACITY, DEFAULT_NETWORK_CAPACITY); + bc.setOverrides(List.of(bco)); + Map kafkaBrokerResources = Map.of("brokers", createCpuResourceRequirement(DEFAULT_CPU_CAPACITY, DEFAULT_CPU_CAPACITY)); + CapacityConfiguration cc = new CapacityConfiguration(null, createKafkaSpec(bc), NODES, STORAGE, kafkaBrokerResources); + + List filteredGoals = filterResourceGoalsWithoutCapacityConfig(GOAL_LIST, cc); + assertGoalsPresent(filteredGoals, NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL, NETWORK_OUTBOUND_CAPACITY_GOAL, POTENTIAL_NETWORK_OUTAGE_GOAL); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java index ed808f0ca24..954989b6f55 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CruiseControlReconcilerTest.java @@ -244,8 +244,8 @@ public void reconcileEnabledCruiseControl(boolean topicOperatorEnabled, boolean assertThat(deployCaptor.getValue(), is(notNullValue())); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION), is("0")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION), is("0")); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("67b9cda0")); - assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("3a5e63e7")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH), is("8943477b")); + assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("e8a2dfec")); assertThat(deployCaptor.getValue().getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_SERVER_CERT_HASH), is("4d715cdd")); if (topicOperatorEnabled && apiUsersEnabled) { assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_AUTH_HASH), is("8c2972b2"));