Skip to content

Commit 8baa632

Browse files
committed
Limit to 3 pipelines per node per source
1 parent b62e363 commit 8baa632

File tree

1 file changed

+23
-4
lines changed

1 file changed

+23
-4
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ fn attempt_place_unassigned_shards(
242242
place_unassigned_shards_single_source(
243243
source,
244244
indexers_with_most_available_capacity,
245+
problem.num_indexers(),
245246
&mut solution,
246247
)?;
247248
}
@@ -278,6 +279,7 @@ fn place_unassigned_shards_with_affinity(
278279
let _ = place_unassigned_shards_single_source(
279280
source,
280281
indexers_with_affinity_and_available_capacity,
282+
problem.num_indexers(),
281283
solution,
282284
);
283285
}
@@ -349,15 +351,28 @@ struct NotEnoughCapacity;
349351
fn place_unassigned_shards_single_source(
350352
source: &Source,
351353
mut indexer_with_capacities: impl Iterator<Item = (IndexerOrd, CpuCapacity)>,
354+
num_indexers: usize,
352355
solution: &mut SchedulingSolution,
353356
) -> Result<(), NotEnoughCapacity> {
354357
let mut num_shards = source.num_shards;
358+
// To ensure that merges can keep up, try not to assign more than 3
359+
// shards per indexer for a source (except if there aren't enough nodes)
360+
let limit_num_shards_per_indexer_per_source = 3.max(num_shards.div_ceil(num_indexers as u32));
355361
while num_shards > 0 {
356362
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
357363
return Err(NotEnoughCapacity);
358364
};
359-
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
360-
let num_shards_to_place = num_placable_shards.min(num_shards);
365+
let current_num_shards_for_indexer_and_source = *solution.indexer_assignments[indexer_ord]
366+
.num_shards_per_source
367+
.get(&source.source_ord)
368+
.unwrap_or(&0);
369+
let num_placable_shards_for_available_capacity =
370+
available_capacity.cpu_millis() / source.load_per_shard;
371+
let num_placable_shards_for_limit = limit_num_shards_per_indexer_per_source
372+
.saturating_sub(current_num_shards_for_indexer_and_source);
373+
let num_shards_to_place = num_shards
374+
.min(num_placable_shards_for_available_capacity)
375+
.min(num_placable_shards_for_limit);
361376
// Update the solution, the shard load, and the number of shards to place.
362377
solution.indexer_assignments[indexer_ord]
363378
.add_shards(source.source_ord, num_shards_to_place);
@@ -593,11 +608,15 @@ mod tests {
593608
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
594609
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
595610
problem.inc_affinity(0, 1);
611+
problem.inc_affinity(0, 1);
612+
problem.inc_affinity(0, 0);
596613
problem.inc_affinity(1, 0);
597614
let mut solution = problem.new_solution();
598615
place_unassigned_shards_with_affinity(&problem, &mut solution);
599-
assert_eq!(solution.indexer_assignments[0].num_shards(1), 4);
600-
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
616+
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
617+
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
618+
assert_eq!(solution.indexer_assignments[1].num_shards(0), 3);
619+
assert_eq!(solution.indexer_assignments[1].num_shards(1), 0);
601620
}
602621

603622
#[test]

0 commit comments

Comments
 (0)