From fa8d5d95570bb144fc6e6862221dee733dea38b0 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:14 +0200 Subject: [PATCH 1/6] Add test showing the fragmentation --- .../scheduling/scheduling_logic.rs | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index a8a5c4f64c0..6de4c9c0e06 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -807,4 +807,35 @@ mod tests { assert_eq!(solution.capacity_scaling_iterations, 1); } + + #[test] + fn test_shard_fragmentation_when_iterating() { + // Create a problem where affinity constraints cause suboptimal placement + // requiring iterative scaling despite initial capacity scaling. + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + let empty_solution = problem.new_solution(); + let first_solution = solve(problem, empty_solution); + + let mut updated_problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + + let second_solution = solve(updated_problem, first_solution); + + for source in 0..2 { + let num_shards_per_indexer = second_solution + .indexer_assignments + .iter() + .map(|indexer_assignment| indexer_assignment.num_shards(source)) + .collect_vec(); + assert!(num_shards_per_indexer.contains(&2)); + assert!(num_shards_per_indexer.contains(&0)); + } + } } From 1c4c73edaa807e3b99d1d8a1ebcac333f75496f6 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:14 +0200 Subject: [PATCH 2/6] Add placement affinity on the number of existing shards --- .../scheduling/scheduling_logic.rs | 128 +++++++++++------- 1 file changed, 76 insertions(+), 52 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 6de4c9c0e06..c043f44a5ad 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Reverse; +use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::collections::btree_map::Entry; -use itertools::Itertools; use quickwit_proto::indexing::CpuCapacity; use super::scheduling_logic_model::*; @@ -229,6 +228,36 @@ fn assert_enforce_nodes_cpu_capacity_post_condition( // If this algorithm fails to place all remaining shards, we inflate // the node capacities by 20% in the scheduling problem and start from the beginning. +#[derive(Debug, PartialEq, Eq, Ord)] +struct PlacementCandidate { + indexer_ord: IndexerOrd, + current_num_shards: u32, + available_capacity: CpuCapacity, + affinity: u32, +} + +impl PartialOrd for PlacementCandidate { + fn partial_cmp(&self, other: &Self) -> Option { + // Higher affinity is better + match self.affinity.cmp(&other.affinity) { + Ordering::Equal => {} + ordering => return Some(ordering.reverse()), + } + // If tie, pick the node with shards already assigned first + match self.current_num_shards.cmp(&other.current_num_shards) { + Ordering::Equal => {} + ordering => return Some(ordering.reverse()), + } + // If tie, pick the node with the highest available capacity + match self.available_capacity.cmp(&other.available_capacity) { + Ordering::Equal => {} + ordering => return Some(ordering.reverse()), + } + // Final tie-breaker: indexer ID for deterministic ordering + Some(self.indexer_ord.cmp(&other.indexer_ord).reverse()) + } +} + fn attempt_place_unassigned_shards( unassigned_shards: &[Source], problem: &SchedulingProblem, @@ -236,14 +265,24 @@ fn attempt_place_unassigned_shards( ) -> Result { let mut solution = partial_solution.clone(); for source in unassigned_shards { - let indexers_with_most_available_capacity = - compute_indexer_available_capacity(problem, &solution) - .sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord))); - place_unassigned_shards_single_source( - source, - indexers_with_most_available_capacity, - &mut solution, - )?; + let mut placements: Vec = solution + .indexer_assignments + .iter() + .map(|indexer_assignment: &IndexerAssignment| { + let available_capacity = indexer_assignment.indexer_available_capacity(problem); + assert!(available_capacity >= 0i32); + let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); + let current_num_shards = indexer_assignment.num_shards(source.source_ord); + PlacementCandidate { + affinity: 0, + current_num_shards, + available_capacity, + indexer_ord: indexer_assignment.indexer_ord, + } + }) + .collect(); + placements.sort(); + place_unassigned_shards_single_source(source, &placements, &mut solution)?; } assert_place_unassigned_shards_post_condition(problem, &solution); Ok(solution) @@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity( Reverse(load) }); for source in &unassigned_shards { - // List of indexer with a non-null affinity and some available capacity, sorted by - // (affinity, available capacity) in that order. - let indexers_with_affinity_and_available_capacity = source + let mut placements: Vec = source .affinities .iter() .filter(|&(_, &affinity)| affinity != 0u32) - .map(|(&indexer_ord, affinity)| { + .map(|(&indexer_ord, &affinity)| { let available_capacity = solution.indexer_assignments[indexer_ord].indexer_available_capacity(problem); - let capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); - (indexer_ord, affinity, capacity) - }) - .sorted_by_key(|(indexer_ord, affinity, capacity)| { - Reverse((*affinity, *capacity, *indexer_ord)) + let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); + let current_num_shards = + solution.indexer_assignments[indexer_ord].num_shards(source.source_ord); + PlacementCandidate { + affinity, + current_num_shards, + available_capacity, + indexer_ord, + } }) - .map(|(indexer_ord, _, capacity)| (indexer_ord, capacity)); - let _ = place_unassigned_shards_single_source( - source, - indexers_with_affinity_and_available_capacity, - solution, - ); + .collect(); + placements.sort(); + let _ = place_unassigned_shards_single_source(source, &placements, solution); } } @@ -350,22 +388,27 @@ struct NotEnoughCapacity; /// amongst the node with their given node capacity. fn place_unassigned_shards_single_source( source: &Source, - mut indexer_with_capacities: impl Iterator, + sorted_candidates: &[PlacementCandidate], solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; - while num_shards > 0 { - let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else { - return Err(NotEnoughCapacity); - }; + for PlacementCandidate { + indexer_ord, + available_capacity, + .. + } in sorted_candidates + { let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; let num_shards_to_place = num_placable_shards.min(num_shards); // Update the solution, the shard load, and the number of shards to place. - solution.indexer_assignments[indexer_ord] + solution.indexer_assignments[*indexer_ord] .add_shards(source.source_ord, num_shards_to_place); num_shards -= num_shards_to_place; + if num_shards == 0 { + return Ok(()); + } } - Ok(()) + Err(NotEnoughCapacity) } /// Compute the sources/shards that have not been assigned to any indexer yet. @@ -394,30 +437,11 @@ fn compute_unassigned_sources( unassigned_sources.into_values().collect() } -/// Builds a BinaryHeap with the different indexer capacities. -/// -/// Panics if one of the indexer is over-assigned. -fn compute_indexer_available_capacity<'a>( - problem: &'a SchedulingProblem, - solution: &'a SchedulingSolution, -) -> impl Iterator + 'a { - solution - .indexer_assignments - .iter() - .map(|indexer_assignment| { - let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem); - assert!(available_capacity >= 0i32); - ( - indexer_assignment.indexer_ord, - CpuCapacity::from_cpu_millis(available_capacity as u32), - ) - }) -} - #[cfg(test)] mod tests { use std::num::NonZeroU32; + use itertools::Itertools; use proptest::prelude::*; use quickwit_proto::indexing::mcpu; From 4a48107a92134fd07a6d1574971507a771a6bcab Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:14 +0200 Subject: [PATCH 3/6] Fix ordering implem --- .../scheduling/scheduling_logic.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index c043f44a5ad..7d742650f8a 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -228,7 +228,7 @@ fn assert_enforce_nodes_cpu_capacity_post_condition( // If this algorithm fails to place all remaining shards, we inflate // the node capacities by 20% in the scheduling problem and start from the beginning. -#[derive(Debug, PartialEq, Eq, Ord)] +#[derive(Debug, PartialEq, Eq)] struct PlacementCandidate { indexer_ord: IndexerOrd, current_num_shards: u32, @@ -236,25 +236,31 @@ struct PlacementCandidate { affinity: u32, } -impl PartialOrd for PlacementCandidate { - fn partial_cmp(&self, other: &Self) -> Option { +impl Ord for PlacementCandidate { + fn cmp(&self, other: &Self) -> Ordering { // Higher affinity is better match self.affinity.cmp(&other.affinity) { Ordering::Equal => {} - ordering => return Some(ordering.reverse()), + ordering => return ordering.reverse(), } // If tie, pick the node with shards already assigned first match self.current_num_shards.cmp(&other.current_num_shards) { Ordering::Equal => {} - ordering => return Some(ordering.reverse()), + ordering => return ordering.reverse(), } // If tie, pick the node with the highest available capacity match self.available_capacity.cmp(&other.available_capacity) { Ordering::Equal => {} - ordering => return Some(ordering.reverse()), + ordering => return ordering.reverse(), } // Final tie-breaker: indexer ID for deterministic ordering - Some(self.indexer_ord.cmp(&other.indexer_ord).reverse()) + self.indexer_ord.cmp(&other.indexer_ord).reverse() + } +} + +impl PartialOrd for PlacementCandidate { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } From 9c9218573822bdec6c7d70c4057137cff3a97197 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:15 +0200 Subject: [PATCH 4/6] Limit to 3 pipelines per node per source (#5792) * Add limit of three pipelines per node * Small code simplification --- .../src/indexing_scheduler/mod.rs | 13 +- .../src/indexing_scheduler/scheduling/mod.rs | 142 +++++++++++++++++- .../scheduling/scheduling_logic.rs | 53 ++++++- 3 files changed, 200 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 5d51dcac13f..81191574118 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = Duration::from_secs(30) }; +/// That's 80% of a pipeline capacity +const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); + #[derive(Debug, Clone, Default, Serialize)] pub struct IndexingSchedulerState { pub num_applied_physical_indexing_plan: usize, @@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() } else { - NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap() + NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap() } } @@ -220,8 +223,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { source_uid, source_type: SourceToScheduleType::NonSharded { num_pipelines: source_config.num_pipelines.get() as u32, - // FIXME - load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) + // FIXME: + // - implementing adaptative load contains the risk of generating + // rebalancing storms for sources like Kafka + // - this is coupled with the scheduling logic that misses the notion of + // pipeline + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()) .unwrap(), }, params_fingerprint, diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index ca5ac4d1fc7..eb7875097e0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -757,8 +757,8 @@ mod tests { convert_scheduling_solution_to_physical_plan_single_node_single_source, }; use crate::indexing_plan::PhysicalIndexingPlan; - use crate::indexing_scheduler::get_shard_locality_metrics; use crate::indexing_scheduler::scheduling::assign_shards; + use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics}; use crate::model::ShardLocations; fn source_id() -> SourceUid { @@ -939,6 +939,146 @@ mod tests { } } + #[test] + fn test_build_physical_plan_with_pipeline_limit() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let source_uid0 = source_id(); + let source_uid1 = source_id(); + let source_0 = SourceToSchedule { + source_uid: source_uid0.clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: (0..16).map(ShardId::from).collect(), + load_per_shard: NonZeroU32::new(800).unwrap(), + }, + params_fingerprint: 0, + }; + let source_1 = SourceToSchedule { + source_uid: source_uid1.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }; + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &[source_0, source_1], + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); + + let node1_plan = indexing_plan.indexer(&indexer1).unwrap(); + let node2_plan = indexing_plan.indexer(&indexer2).unwrap(); + + let source_0_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + let source_0_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + assert!(source_0_on_node1 <= 3); + assert!(source_0_on_node2 <= 3); + assert_eq!(source_0_on_node1 + source_0_on_node2, 4); + + let source_1_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + let source_1_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + assert!(source_1_on_node1 <= 3); + assert!(source_1_on_node2 <= 3); + assert_eq!(source_1_on_node1 + source_1_on_node2, 4); + } + + #[test] + fn test_build_physical_plan_second_iteration() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let indexer3 = "indexer3".to_string(); + let mut sources = Vec::new(); + for _ in 0..10 { + sources.push(SourceToSchedule { + source_uid: source_id(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }); + } + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3); + + for source in &sources { + let pipelines_per_indexer_for_source = indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&1)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 4); + } + + for source in &mut sources { + if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type + { + *num_pipelines = 5; + } + } + + let new_indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + Some(&indexing_plan), + &shard_locations, + ); + + for source in &sources { + let pipelines_per_indexer_for_source = new_indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&2)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 5); + } + } + fn make_indexing_tasks( source_uid: &SourceUid, shards: &[(PipelineUid, &[ShardId])], diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 7d742650f8a..86b1db26f14 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -19,6 +19,7 @@ use std::collections::btree_map::Entry; use quickwit_proto::indexing::CpuCapacity; use super::scheduling_logic_model::*; +use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE; use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary; // ------------------------------------------------------------------------------------ @@ -288,7 +289,12 @@ fn attempt_place_unassigned_shards( }) .collect(); placements.sort(); - place_unassigned_shards_single_source(source, &placements, &mut solution)?; + place_unassigned_shards_single_source( + source, + &placements, + problem.num_indexers(), + &mut solution, + )?; } assert_place_unassigned_shards_post_condition(problem, &solution); Ok(solution) @@ -323,7 +329,12 @@ fn place_unassigned_shards_with_affinity( }) .collect(); placements.sort(); - let _ = place_unassigned_shards_single_source(source, &placements, solution); + let _ = place_unassigned_shards_single_source( + source, + &placements, + problem.num_indexers(), + solution, + ); } } @@ -395,17 +406,30 @@ struct NotEnoughCapacity; fn place_unassigned_shards_single_source( source: &Source, sorted_candidates: &[PlacementCandidate], + num_indexers: usize, solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; + // To ensure that merges can keep up, we try not to assign more than 3 + // pipelines per indexer for a source (except if there aren't enough nodes). + let target_limit_num_shards_per_indexer_per_source = + 3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get(); + let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source + .max(num_shards.div_ceil(num_indexers as u32)); for PlacementCandidate { indexer_ord, available_capacity, + current_num_shards, .. } in sorted_candidates { - let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; - let num_shards_to_place = num_placable_shards.min(num_shards); + let num_placable_shards_for_available_capacity = + available_capacity.cpu_millis() / source.load_per_shard; + let num_placable_shards_for_limit = + limit_num_shards_per_indexer_per_source.saturating_sub(*current_num_shards); + let num_shards_to_place = num_shards + .min(num_placable_shards_for_available_capacity) + .min(num_placable_shards_for_limit); // Update the solution, the shard load, and the number of shards to place. solution.indexer_assignments[*indexer_ord] .add_shards(source.source_ord, num_shards_to_place); @@ -632,6 +656,27 @@ mod tests { assert_eq!(solution.indexer_assignments[1].num_shards(0), 4); } + #[test] + fn test_placement_limit_with_affinity() { + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]); + let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(); + problem.add_source(4, max_load_per_pipeline); + problem.add_source(4, max_load_per_pipeline); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 0); + problem.inc_affinity(1, 0); + let mut solution = problem.new_solution(); + place_unassigned_shards_with_affinity(&problem, &mut solution); + assert_eq!(solution.indexer_assignments[0].num_shards(1), 3); + assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); + assert_eq!(solution.indexer_assignments[1].num_shards(0), 3); + // one shard was not placed because indexer 0 was full and it had no + // affinity with indexer 1 + assert_eq!(solution.indexer_assignments[1].num_shards(1), 0); + } + #[test] fn test_place_unassigned_shards_reach_capacity() { let mut problem = From 20c95a5279fd2a7f55177978260b564c4804cde2 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:15 +0200 Subject: [PATCH 5/6] Treat nodes with at least 1 shard equally --- .../src/indexing_scheduler/scheduling/scheduling_logic.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 86b1db26f14..bb668d53204 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -245,7 +245,9 @@ impl Ord for PlacementCandidate { ordering => return ordering.reverse(), } // If tie, pick the node with shards already assigned first - match self.current_num_shards.cmp(&other.current_num_shards) { + let current_shard_presence = self.current_num_shards.clamp(0, 1); + let other_shard_presence = other.current_num_shards.clamp(0, 1); + match current_shard_presence.cmp(&other_shard_presence) { Ordering::Equal => {} ordering => return ordering.reverse(), } From acdb18ad5f48cf488207da8cf5bb19c804174078 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Jul 2025 11:08:15 +0200 Subject: [PATCH 6/6] Add small clarification to scheduling README --- .../src/indexing_scheduler/scheduling/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md index f4ec64010a9..58ff4ae62d0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md @@ -15,8 +15,9 @@ We also want to observe some interesting properties such as: To simplify the logic and make it easier to test it, we first abstract this in the following optimization problem. In Quickwit, we have two types of source: -- The push api source: they have a given (changing) set of shards associated to them. - A shard is rate-limited to ensure their throughput is lower than `5MB/s` worth of +- The push api source: indexes have a given (changing) set of shards associated to them. + Shards are stored on indexer nodes and are spread randomly accross them. A shard is + rate-limited to ensure their throughput is lower than `5MB/s` worth of uncompressed data. This guarantees that a given shard can be indexed by a single indexing pipeline.