diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 060fb22ccc2..b43f1a24c2e 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -216,12 +216,14 @@ fn attempt_place_unassigned_shards( ) -> Result { let mut solution = partial_solution.clone(); for source in unassigned_shards { - let indexers_with_most_available_capacity = - compute_indexer_available_capacity(problem, &solution) - .sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord))); + let mut indexers_with_most_available_capacity = + compute_indexer_available_capacity(problem, &solution).collect_vec(); + indexers_with_most_available_capacity + .sort_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord))); place_unassigned_shards_single_source( source, indexers_with_most_available_capacity, + problem.unscaled_indexer_cpu_capacities(), &mut solution, )?; } @@ -241,7 +243,7 @@ fn place_unassigned_shards_with_affinity( for source in &unassigned_shards { // List of indexer with a non-null affinity and some available capacity, sorted by // (affinity, available capacity) in that order. - let indexers_with_affinity_and_available_capacity = source + let indexers_with_available_capacity = source .affinities .iter() .filter(|&(_, &affinity)| affinity != 0u32) @@ -254,10 +256,12 @@ fn place_unassigned_shards_with_affinity( .sorted_by_key(|(indexer_ord, affinity, capacity)| { Reverse((*affinity, *capacity, *indexer_ord)) }) - .map(|(indexer_ord, _, capacity)| (indexer_ord, capacity)); + .map(|(indexer_ord, _, capacity)| (indexer_ord, capacity)) + .collect_vec(); let _ = place_unassigned_shards_single_source( source, - indexers_with_affinity_and_available_capacity, + indexers_with_available_capacity, + problem.unscaled_indexer_cpu_capacities(), solution, ); } @@ -346,16 +350,62 @@ struct NotEnoughCapacity; /// amongst the node with their given node capacity. fn place_unassigned_shards_single_source( source: &Source, - mut indexer_with_capacities: impl Iterator, + mut indexer_with_capacities: Vec<(IndexerOrd, CpuCapacity)>, + unscaled_capacities: &[CpuCapacity], solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; - while num_shards > 0 { - let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else { - return Err(NotEnoughCapacity); - }; - let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; - let num_shards_to_place = num_placable_shards.min(num_shards); + let mut previous_num_shards = u32::MAX; + while previous_num_shards > num_shards { + previous_num_shards = num_shards; + let indexer_with_capacities_iter = indexer_with_capacities + .iter_mut() + .map(|(indexer_ord, available_capacity)| (*indexer_ord, available_capacity)); + place_unassigned_shards_single_source_iteration( + source, + &mut num_shards, + indexer_with_capacities_iter, + unscaled_capacities, + solution, + ); + if num_shards == 0 { + // All shards have been placed. + return Ok(()); + } + } + // Last placement iteration didn't make progress, + // we won't be able to place the remaining shards + Err(NotEnoughCapacity) +} + +/// Places as many shards as possible to indexers while respecting both the the +/// remaining scaled node capacities and the original unscaled node capacities. +fn place_unassigned_shards_single_source_iteration<'a>( + source: &Source, + remaining_shards_to_place: &mut u32, + indexer_with_capacities: impl Iterator, + unscaled_capacities: &[CpuCapacity], + solution: &mut SchedulingSolution, +) { + for (indexer_ord, available_capacity) in indexer_with_capacities { + if *remaining_shards_to_place == 0 { + return; + } + let num_placable_shards_into_scaled_capacity = + available_capacity.cpu_millis() / source.load_per_shard; + + // We limit each node's shard allocation per iteration to what fits in + // its original capacity. This introduces a behavior that distributes + // shards more evenly accross nodes when the system capacity is + // over-subscribed. If the shard's load doesn't fit into the original + // capacity, we still allow one shard to be placed. + let num_placable_shards_into_original_capacity = + (unscaled_capacities[indexer_ord].cpu_millis() / source.load_per_shard).max(1); + + let num_shards_to_place = num_placable_shards_into_scaled_capacity + .min(num_placable_shards_into_original_capacity) + .min(*remaining_shards_to_place); + // Update the solution, the shard load, and the number of shards to place. if num_shards_to_place == 0u32 { // No need to fill indexer_assignments with empty assignments. @@ -363,9 +413,10 @@ fn place_unassigned_shards_single_source( } solution.indexer_assignments[indexer_ord] .add_shards(source.source_ord, num_shards_to_place); - num_shards -= num_shards_to_place; + *remaining_shards_to_place -= num_shards_to_place; + *available_capacity = *available_capacity + - CpuCapacity::from_cpu_millis(num_shards_to_place * source.load_per_shard.get()); } - Ok(()) } /// Compute the sources/shards that have not been assigned to any indexer yet. @@ -419,7 +470,7 @@ mod tests { use std::num::NonZeroU32; use proptest::prelude::*; - use quickwit_proto::indexing::mcpu; + use quickwit_proto::indexing::{PIPELINE_FULL_CAPACITY, mcpu}; use super::*; @@ -783,4 +834,28 @@ mod tests { solve(problem, solution); } } + + #[test] + fn test_oversubscribing_sources_get_balanced() { + let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![ + mcpu(8000), + mcpu(8000), + mcpu(8000), + mcpu(8000), + ]); + for _ in 0..12 { + problem.add_source( + 4, + NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(), + ); + } + + let old_solution = problem.new_solution(); + let solution = solve(problem, old_solution); + for assignement in &solution.indexer_assignments { + for &num_shards in assignement.num_shards_per_source.values() { + assert_eq!(num_shards, 2); + } + } + } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 7401ac7c0aa..4689b151ae8 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -78,6 +78,7 @@ impl Source { pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec, + unscaled_cpu_capacities: Vec, } impl SchedulingProblem { @@ -97,6 +98,7 @@ impl SchedulingProblem { // TODO assert for affinity. SchedulingProblem { sources: Vec::new(), + unscaled_cpu_capacities: indexer_cpu_capacities.clone(), indexer_cpu_capacities, } } @@ -109,6 +111,11 @@ impl SchedulingProblem { self.indexer_cpu_capacities[indexer_ord] } + /// Gets the original cpu capacities before scaling. + pub fn unscaled_indexer_cpu_capacities(&self) -> &[CpuCapacity] { + &self.unscaled_cpu_capacities + } + /// Scales the cpu capacity by the given scaling factor. /// /// Resulting cpu capacity are ceiled to the next integer millicpus value.