Skip to content

Commit 630c781

Browse files
committed
Add limit of three pipelines per node
1 parent 6944ad4 commit 630c781

File tree

3 files changed

+203
-8
lines changed

3 files changed

+203
-8
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
4949
Duration::from_secs(30)
5050
};
5151

52+
/// That's 80% of a pipeline capacity
53+
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
54+
5255
#[derive(Debug, Clone, Default, Serialize)]
5356
pub struct IndexingSchedulerState {
5457
pub num_applied_physical_indexing_plan: usize,
@@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
152155
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
153156
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
154157
} else {
155-
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
158+
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap()
156159
}
157160
}
158161

@@ -220,8 +223,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
220223
source_uid,
221224
source_type: SourceToScheduleType::NonSharded {
222225
num_pipelines: source_config.num_pipelines.get() as u32,
223-
// FIXME
224-
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
226+
// FIXME:
227+
// - implementing adaptative load contains the risk of generating
228+
// rebalancing storms for sources like Kafka
229+
// - this is coupled with the scheduling logic that misses the notion of
230+
// pipeline
231+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis())
225232
.unwrap(),
226233
},
227234
params_fingerprint,

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ mod tests {
757757
convert_scheduling_solution_to_physical_plan_single_node_single_source,
758758
};
759759
use crate::indexing_plan::PhysicalIndexingPlan;
760-
use crate::indexing_scheduler::get_shard_locality_metrics;
761760
use crate::indexing_scheduler::scheduling::assign_shards;
761+
use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics};
762762
use crate::model::ShardLocations;
763763

764764
fn source_id() -> SourceUid {
@@ -939,6 +939,146 @@ mod tests {
939939
}
940940
}
941941

942+
#[test]
943+
fn test_build_physical_plan_with_pipeline_limit() {
944+
let indexer1 = "indexer1".to_string();
945+
let indexer2 = "indexer2".to_string();
946+
let source_uid0 = source_id();
947+
let source_uid1 = source_id();
948+
let source_0 = SourceToSchedule {
949+
source_uid: source_uid0.clone(),
950+
source_type: SourceToScheduleType::Sharded {
951+
shard_ids: (0..16).map(ShardId::from).collect(),
952+
load_per_shard: NonZeroU32::new(800).unwrap(),
953+
},
954+
params_fingerprint: 0,
955+
};
956+
let source_1 = SourceToSchedule {
957+
source_uid: source_uid1.clone(),
958+
source_type: SourceToScheduleType::NonSharded {
959+
num_pipelines: 4,
960+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
961+
},
962+
params_fingerprint: 0,
963+
};
964+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
965+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
966+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
967+
let shard_locations = ShardLocations::default();
968+
let indexing_plan = build_physical_indexing_plan(
969+
&[source_0, source_1],
970+
&indexer_id_to_cpu_capacities,
971+
None,
972+
&shard_locations,
973+
);
974+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);
975+
976+
let node1_plan = indexing_plan.indexer(&indexer1).unwrap();
977+
let node2_plan = indexing_plan.indexer(&indexer2).unwrap();
978+
979+
let source_0_on_node1 = node1_plan
980+
.iter()
981+
.filter(|task| task.source_id == source_uid0.source_id)
982+
.count();
983+
let source_0_on_node2 = node2_plan
984+
.iter()
985+
.filter(|task| task.source_id == source_uid0.source_id)
986+
.count();
987+
assert!(source_0_on_node1 <= 3);
988+
assert!(source_0_on_node2 <= 3);
989+
assert_eq!(source_0_on_node1 + source_0_on_node2, 4);
990+
991+
let source_1_on_node1 = node1_plan
992+
.iter()
993+
.filter(|task| task.source_id == source_uid1.source_id)
994+
.count();
995+
let source_1_on_node2 = node2_plan
996+
.iter()
997+
.filter(|task| task.source_id == source_uid1.source_id)
998+
.count();
999+
assert!(source_1_on_node1 <= 3);
1000+
assert!(source_1_on_node2 <= 3);
1001+
assert_eq!(source_1_on_node1 + source_1_on_node2, 4);
1002+
}
1003+
1004+
#[test]
1005+
fn test_build_physical_plan_second_iteration() {
1006+
let indexer1 = "indexer1".to_string();
1007+
let indexer2 = "indexer2".to_string();
1008+
let indexer3 = "indexer3".to_string();
1009+
let mut sources = Vec::new();
1010+
for _ in 0..10 {
1011+
sources.push(SourceToSchedule {
1012+
source_uid: source_id(),
1013+
source_type: SourceToScheduleType::NonSharded {
1014+
num_pipelines: 4,
1015+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
1016+
},
1017+
params_fingerprint: 0,
1018+
});
1019+
}
1020+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
1021+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
1022+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
1023+
indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000));
1024+
let shard_locations = ShardLocations::default();
1025+
let indexing_plan = build_physical_indexing_plan(
1026+
&sources,
1027+
&indexer_id_to_cpu_capacities,
1028+
None,
1029+
&shard_locations,
1030+
);
1031+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3);
1032+
1033+
for source in &sources {
1034+
let pipelines_per_indexer_for_source = indexing_plan
1035+
.indexing_tasks_per_indexer()
1036+
.values()
1037+
.map(|tasks| {
1038+
tasks
1039+
.iter()
1040+
.filter(|t| t.source_id == source.source_uid.source_id)
1041+
.count()
1042+
})
1043+
.collect_vec();
1044+
assert!(pipelines_per_indexer_for_source.contains(&3));
1045+
assert!(pipelines_per_indexer_for_source.contains(&1));
1046+
assert!(pipelines_per_indexer_for_source.contains(&0));
1047+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 4);
1048+
}
1049+
1050+
for source in &mut sources {
1051+
if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type
1052+
{
1053+
*num_pipelines = 5;
1054+
}
1055+
}
1056+
1057+
let new_indexing_plan = build_physical_indexing_plan(
1058+
&sources,
1059+
&indexer_id_to_cpu_capacities,
1060+
Some(&indexing_plan),
1061+
&shard_locations,
1062+
);
1063+
1064+
for source in &sources {
1065+
let pipelines_per_indexer_for_source = new_indexing_plan
1066+
.indexing_tasks_per_indexer()
1067+
.values()
1068+
.map(|tasks| {
1069+
tasks
1070+
.iter()
1071+
.filter(|t| t.source_id == source.source_uid.source_id)
1072+
.count()
1073+
})
1074+
.collect_vec();
1075+
assert!(pipelines_per_indexer_for_source.contains(&3));
1076+
assert!(pipelines_per_indexer_for_source.contains(&2));
1077+
assert!(pipelines_per_indexer_for_source.contains(&0));
1078+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 5);
1079+
}
1080+
}
1081+
9421082
fn make_indexing_tasks(
9431083
source_uid: &SourceUid,
9441084
shards: &[(PipelineUid, &[ShardId])],

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::collections::btree_map::Entry;
1919
use quickwit_proto::indexing::CpuCapacity;
2020

2121
use super::scheduling_logic_model::*;
22+
use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE;
2223
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
2324

2425
// ------------------------------------------------------------------------------------
@@ -288,7 +289,12 @@ fn attempt_place_unassigned_shards(
288289
})
289290
.collect();
290291
placements.sort();
291-
place_unassigned_shards_single_source(source, &placements, &mut solution)?;
292+
place_unassigned_shards_single_source(
293+
source,
294+
&placements,
295+
problem.num_indexers(),
296+
&mut solution,
297+
)?;
292298
}
293299
assert_place_unassigned_shards_post_condition(problem, &solution);
294300
Ok(solution)
@@ -323,7 +329,12 @@ fn place_unassigned_shards_with_affinity(
323329
})
324330
.collect();
325331
placements.sort();
326-
let _ = place_unassigned_shards_single_source(source, &placements, solution);
332+
let _ = place_unassigned_shards_single_source(
333+
source,
334+
&placements,
335+
problem.num_indexers(),
336+
solution,
337+
);
327338
}
328339
}
329340

@@ -393,17 +404,33 @@ struct NotEnoughCapacity;
393404
fn place_unassigned_shards_single_source(
394405
source: &Source,
395406
sorted_candidates: &[PlacementCandidate],
407+
num_indexers: usize,
396408
solution: &mut SchedulingSolution,
397409
) -> Result<(), NotEnoughCapacity> {
398410
let mut num_shards = source.num_shards;
411+
// To ensure that merges can keep up, we try not to assign more than 3
412+
// pipelines per indexer for a source (except if there aren't enough nodes).
413+
let target_limit_num_shards_per_indexer_per_source =
414+
3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get();
415+
let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source
416+
.max(num_shards.div_ceil(num_indexers as u32));
399417
for PlacementCandidate {
400418
indexer_ord,
401419
available_capacity,
402420
..
403421
} in sorted_candidates
404422
{
405-
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
406-
let num_shards_to_place = num_placable_shards.min(num_shards);
423+
let current_num_shards_for_indexer_and_source = *solution.indexer_assignments[*indexer_ord]
424+
.num_shards_per_source
425+
.get(&source.source_ord)
426+
.unwrap_or(&0);
427+
let num_placable_shards_for_available_capacity =
428+
available_capacity.cpu_millis() / source.load_per_shard;
429+
let num_placable_shards_for_limit = limit_num_shards_per_indexer_per_source
430+
.saturating_sub(current_num_shards_for_indexer_and_source);
431+
let num_shards_to_place = num_shards
432+
.min(num_placable_shards_for_available_capacity)
433+
.min(num_placable_shards_for_limit);
407434
// Update the solution, the shard load, and the number of shards to place.
408435
solution.indexer_assignments[*indexer_ord]
409436
.add_shards(source.source_ord, num_shards_to_place);
@@ -630,6 +657,27 @@ mod tests {
630657
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
631658
}
632659

660+
#[test]
661+
fn test_placement_limit_with_affinity() {
662+
let mut problem =
663+
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]);
664+
let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap();
665+
problem.add_source(4, max_load_per_pipeline);
666+
problem.add_source(4, max_load_per_pipeline);
667+
problem.inc_affinity(0, 1);
668+
problem.inc_affinity(0, 1);
669+
problem.inc_affinity(0, 0);
670+
problem.inc_affinity(1, 0);
671+
let mut solution = problem.new_solution();
672+
place_unassigned_shards_with_affinity(&problem, &mut solution);
673+
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
674+
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
675+
assert_eq!(solution.indexer_assignments[1].num_shards(0), 3);
676+
// one shard was not placed because indexer 0 was full and it had no
677+
// affinity with indexer 1
678+
assert_eq!(solution.indexer_assignments[1].num_shards(1), 0);
679+
}
680+
633681
#[test]
634682
fn test_place_unassigned_shards_reach_capacity() {
635683
let mut problem =

0 commit comments

Comments
 (0)