diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 166f5c241ae..12d78c99219 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = Duration::from_secs(30) }; +/// That's 80% of a pipeline capacity +const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); + #[derive(Debug, Clone, Default, Serialize)] pub struct IndexingSchedulerState { pub num_applied_physical_indexing_plan: usize, @@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() } else { - NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap() + NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap() } } @@ -220,8 +223,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { source_uid, source_type: SourceToScheduleType::NonSharded { num_pipelines: source_config.num_pipelines.get() as u32, - // FIXME - load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) + // FIXME: + // - implementing adaptative load contains the risk of generating + // rebalancing storms for sources like Kafka + // - this is coupled with the scheduling logic that misses the notion of + // pipeline + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()) .unwrap(), }, params_fingerprint, diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index ca5ac4d1fc7..eb7875097e0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -757,8 +757,8 @@ mod tests { convert_scheduling_solution_to_physical_plan_single_node_single_source, }; use crate::indexing_plan::PhysicalIndexingPlan; - use crate::indexing_scheduler::get_shard_locality_metrics; use crate::indexing_scheduler::scheduling::assign_shards; + use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics}; use crate::model::ShardLocations; fn source_id() -> SourceUid { @@ -939,6 +939,146 @@ mod tests { } } + #[test] + fn test_build_physical_plan_with_pipeline_limit() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let source_uid0 = source_id(); + let source_uid1 = source_id(); + let source_0 = SourceToSchedule { + source_uid: source_uid0.clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: (0..16).map(ShardId::from).collect(), + load_per_shard: NonZeroU32::new(800).unwrap(), + }, + params_fingerprint: 0, + }; + let source_1 = SourceToSchedule { + source_uid: source_uid1.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }; + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &[source_0, source_1], + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); + + let node1_plan = indexing_plan.indexer(&indexer1).unwrap(); + let node2_plan = indexing_plan.indexer(&indexer2).unwrap(); + + let source_0_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + let source_0_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + assert!(source_0_on_node1 <= 3); + assert!(source_0_on_node2 <= 3); + assert_eq!(source_0_on_node1 + source_0_on_node2, 4); + + let source_1_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + let source_1_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + assert!(source_1_on_node1 <= 3); + assert!(source_1_on_node2 <= 3); + assert_eq!(source_1_on_node1 + source_1_on_node2, 4); + } + + #[test] + fn test_build_physical_plan_second_iteration() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let indexer3 = "indexer3".to_string(); + let mut sources = Vec::new(); + for _ in 0..10 { + sources.push(SourceToSchedule { + source_uid: source_id(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }); + } + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3); + + for source in &sources { + let pipelines_per_indexer_for_source = indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&1)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 4); + } + + for source in &mut sources { + if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type + { + *num_pipelines = 5; + } + } + + let new_indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + Some(&indexing_plan), + &shard_locations, + ); + + for source in &sources { + let pipelines_per_indexer_for_source = new_indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&2)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 5); + } + } + fn make_indexing_tasks( source_uid: &SourceUid, shards: &[(PipelineUid, &[ShardId])], diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 698e8b4fb30..def8503c6b2 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -19,6 +19,7 @@ use std::collections::btree_map::Entry; use quickwit_proto::indexing::CpuCapacity; use super::scheduling_logic_model::*; +use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE; use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary; // ------------------------------------------------------------------------------------ @@ -288,7 +289,12 @@ fn attempt_place_unassigned_shards( }) .collect(); placements.sort(); - place_unassigned_shards_single_source(source, &placements, &mut solution)?; + place_unassigned_shards_single_source( + source, + &placements, + problem.num_indexers(), + &mut solution, + )?; } assert_place_unassigned_shards_post_condition(problem, &solution); Ok(solution) @@ -323,7 +329,12 @@ fn place_unassigned_shards_with_affinity( }) .collect(); placements.sort(); - let _ = place_unassigned_shards_single_source(source, &placements, solution); + let _ = place_unassigned_shards_single_source( + source, + &placements, + problem.num_indexers(), + solution, + ); } } @@ -393,17 +404,30 @@ struct NotEnoughCapacity; fn place_unassigned_shards_single_source( source: &Source, sorted_candidates: &[PlacementCandidate], + num_indexers: usize, solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; + // To ensure that merges can keep up, we try not to assign more than 3 + // pipelines per indexer for a source (except if there aren't enough nodes). + let target_limit_num_shards_per_indexer_per_source = + 3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get(); + let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source + .max(num_shards.div_ceil(num_indexers as u32)); for PlacementCandidate { indexer_ord, available_capacity, + current_num_shards, .. } in sorted_candidates { - let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; - let num_shards_to_place = num_placable_shards.min(num_shards); + let num_placable_shards_for_available_capacity = + available_capacity.cpu_millis() / source.load_per_shard; + let num_placable_shards_for_limit = + limit_num_shards_per_indexer_per_source.saturating_sub(*current_num_shards); + let num_shards_to_place = num_shards + .min(num_placable_shards_for_available_capacity) + .min(num_placable_shards_for_limit); // Update the solution, the shard load, and the number of shards to place. solution.indexer_assignments[*indexer_ord] .add_shards(source.source_ord, num_shards_to_place); @@ -630,6 +654,27 @@ mod tests { assert_eq!(solution.indexer_assignments[1].num_shards(0), 4); } + #[test] + fn test_placement_limit_with_affinity() { + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]); + let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(); + problem.add_source(4, max_load_per_pipeline); + problem.add_source(4, max_load_per_pipeline); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 0); + problem.inc_affinity(1, 0); + let mut solution = problem.new_solution(); + place_unassigned_shards_with_affinity(&problem, &mut solution); + assert_eq!(solution.indexer_assignments[0].num_shards(1), 3); + assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); + assert_eq!(solution.indexer_assignments[1].num_shards(0), 3); + // one shard was not placed because indexer 0 was full and it had no + // affinity with indexer 1 + assert_eq!(solution.indexer_assignments[1].num_shards(1), 0); + } + #[test] fn test_place_unassigned_shards_reach_capacity() { let mut problem =