Skip to content

Commit 2c62834

Browse files
committed
Limit to 3 pipelines per node per source
1 parent dd9db21 commit 2c62834

File tree

1 file changed

+23
-4
lines changed

1 file changed

+23
-4
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ fn attempt_place_unassigned_shards(
221221
place_unassigned_shards_single_source(
222222
source,
223223
indexers_with_most_available_capacity,
224+
problem.num_indexers(),
224225
&mut solution,
225226
)?;
226227
}
@@ -257,6 +258,7 @@ fn place_unassigned_shards_with_affinity(
257258
let _ = place_unassigned_shards_single_source(
258259
source,
259260
indexers_with_affinity_and_available_capacity,
261+
problem.num_indexers(),
260262
solution,
261263
);
262264
}
@@ -348,15 +350,28 @@ struct NotEnoughCapacity;
348350
fn place_unassigned_shards_single_source(
349351
source: &Source,
350352
mut indexer_with_capacities: impl Iterator<Item = (IndexerOrd, CpuCapacity)>,
353+
num_indexers: usize,
351354
solution: &mut SchedulingSolution,
352355
) -> Result<(), NotEnoughCapacity> {
353356
let mut num_shards = source.num_shards;
357+
// To ensure that merges can keep up, try not to assign more than 3
358+
// shards per indexer for a source (except if there aren't enough nodes)
359+
let limit_num_shards_per_indexer_per_source = 3.max(num_shards.div_ceil(num_indexers as u32));
354360
while num_shards > 0 {
355361
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
356362
return Err(NotEnoughCapacity);
357363
};
358-
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
359-
let num_shards_to_place = num_placable_shards.min(num_shards);
364+
let current_num_shards_for_indexer_and_source = *solution.indexer_assignments[indexer_ord]
365+
.num_shards_per_source
366+
.get(&source.source_ord)
367+
.unwrap_or(&0);
368+
let num_placable_shards_for_available_capacity =
369+
available_capacity.cpu_millis() / source.load_per_shard;
370+
let num_placable_shards_for_limit = limit_num_shards_per_indexer_per_source
371+
.saturating_sub(current_num_shards_for_indexer_and_source);
372+
let num_shards_to_place = num_shards
373+
.min(num_placable_shards_for_available_capacity)
374+
.min(num_placable_shards_for_limit);
360375
// Update the solution, the shard load, and the number of shards to place.
361376
if num_shards_to_place == 0u32 {
362377
// No need to fill indexer_assignments with empty assignments.
@@ -596,11 +611,15 @@ mod tests {
596611
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
597612
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
598613
problem.inc_affinity(0, 1);
614+
problem.inc_affinity(0, 1);
615+
problem.inc_affinity(0, 0);
599616
problem.inc_affinity(1, 0);
600617
let mut solution = problem.new_solution();
601618
place_unassigned_shards_with_affinity(&problem, &mut solution);
602-
assert_eq!(solution.indexer_assignments[0].num_shards(1), 4);
603-
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
619+
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
620+
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
621+
assert_eq!(solution.indexer_assignments[1].num_shards(0), 3);
622+
assert_eq!(solution.indexer_assignments[1].num_shards(1), 0);
604623
}
605624

606625
#[test]

0 commit comments

Comments
 (0)