Skip to content

Commit 1c4c73e

Browse files
committed
Add placement affinity on the number of existing shards
1 parent fa8d5d9 commit 1c4c73e

File tree

1 file changed

+76
-52
lines changed

1 file changed

+76
-52
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 76 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cmp::Reverse;
15+
use std::cmp::{Ordering, Reverse};
1616
use std::collections::BTreeMap;
1717
use std::collections::btree_map::Entry;
1818

19-
use itertools::Itertools;
2019
use quickwit_proto::indexing::CpuCapacity;
2120

2221
use super::scheduling_logic_model::*;
@@ -229,21 +228,61 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
229228
// If this algorithm fails to place all remaining shards, we inflate
230229
// the node capacities by 20% in the scheduling problem and start from the beginning.
231230

231+
#[derive(Debug, PartialEq, Eq, Ord)]
232+
struct PlacementCandidate {
233+
indexer_ord: IndexerOrd,
234+
current_num_shards: u32,
235+
available_capacity: CpuCapacity,
236+
affinity: u32,
237+
}
238+
239+
impl PartialOrd for PlacementCandidate {
240+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
241+
// Higher affinity is better
242+
match self.affinity.cmp(&other.affinity) {
243+
Ordering::Equal => {}
244+
ordering => return Some(ordering.reverse()),
245+
}
246+
// If tie, pick the node with shards already assigned first
247+
match self.current_num_shards.cmp(&other.current_num_shards) {
248+
Ordering::Equal => {}
249+
ordering => return Some(ordering.reverse()),
250+
}
251+
// If tie, pick the node with the highest available capacity
252+
match self.available_capacity.cmp(&other.available_capacity) {
253+
Ordering::Equal => {}
254+
ordering => return Some(ordering.reverse()),
255+
}
256+
// Final tie-breaker: indexer ID for deterministic ordering
257+
Some(self.indexer_ord.cmp(&other.indexer_ord).reverse())
258+
}
259+
}
260+
232261
fn attempt_place_unassigned_shards(
233262
unassigned_shards: &[Source],
234263
problem: &SchedulingProblem,
235264
partial_solution: &SchedulingSolution,
236265
) -> Result<SchedulingSolution, NotEnoughCapacity> {
237266
let mut solution = partial_solution.clone();
238267
for source in unassigned_shards {
239-
let indexers_with_most_available_capacity =
240-
compute_indexer_available_capacity(problem, &solution)
241-
.sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord)));
242-
place_unassigned_shards_single_source(
243-
source,
244-
indexers_with_most_available_capacity,
245-
&mut solution,
246-
)?;
268+
let mut placements: Vec<PlacementCandidate> = solution
269+
.indexer_assignments
270+
.iter()
271+
.map(|indexer_assignment: &IndexerAssignment| {
272+
let available_capacity = indexer_assignment.indexer_available_capacity(problem);
273+
assert!(available_capacity >= 0i32);
274+
let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
275+
let current_num_shards = indexer_assignment.num_shards(source.source_ord);
276+
PlacementCandidate {
277+
affinity: 0,
278+
current_num_shards,
279+
available_capacity,
280+
indexer_ord: indexer_assignment.indexer_ord,
281+
}
282+
})
283+
.collect();
284+
placements.sort();
285+
place_unassigned_shards_single_source(source, &placements, &mut solution)?;
247286
}
248287
assert_place_unassigned_shards_post_condition(problem, &solution);
249288
Ok(solution)
@@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity(
259298
Reverse(load)
260299
});
261300
for source in &unassigned_shards {
262-
// List of indexer with a non-null affinity and some available capacity, sorted by
263-
// (affinity, available capacity) in that order.
264-
let indexers_with_affinity_and_available_capacity = source
301+
let mut placements: Vec<PlacementCandidate> = source
265302
.affinities
266303
.iter()
267304
.filter(|&(_, &affinity)| affinity != 0u32)
268-
.map(|(&indexer_ord, affinity)| {
305+
.map(|(&indexer_ord, &affinity)| {
269306
let available_capacity =
270307
solution.indexer_assignments[indexer_ord].indexer_available_capacity(problem);
271-
let capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
272-
(indexer_ord, affinity, capacity)
273-
})
274-
.sorted_by_key(|(indexer_ord, affinity, capacity)| {
275-
Reverse((*affinity, *capacity, *indexer_ord))
308+
let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
309+
let current_num_shards =
310+
solution.indexer_assignments[indexer_ord].num_shards(source.source_ord);
311+
PlacementCandidate {
312+
affinity,
313+
current_num_shards,
314+
available_capacity,
315+
indexer_ord,
316+
}
276317
})
277-
.map(|(indexer_ord, _, capacity)| (indexer_ord, capacity));
278-
let _ = place_unassigned_shards_single_source(
279-
source,
280-
indexers_with_affinity_and_available_capacity,
281-
solution,
282-
);
318+
.collect();
319+
placements.sort();
320+
let _ = place_unassigned_shards_single_source(source, &placements, solution);
283321
}
284322
}
285323

@@ -350,22 +388,27 @@ struct NotEnoughCapacity;
350388
/// amongst the node with their given node capacity.
351389
fn place_unassigned_shards_single_source(
352390
source: &Source,
353-
mut indexer_with_capacities: impl Iterator<Item = (IndexerOrd, CpuCapacity)>,
391+
sorted_candidates: &[PlacementCandidate],
354392
solution: &mut SchedulingSolution,
355393
) -> Result<(), NotEnoughCapacity> {
356394
let mut num_shards = source.num_shards;
357-
while num_shards > 0 {
358-
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
359-
return Err(NotEnoughCapacity);
360-
};
395+
for PlacementCandidate {
396+
indexer_ord,
397+
available_capacity,
398+
..
399+
} in sorted_candidates
400+
{
361401
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
362402
let num_shards_to_place = num_placable_shards.min(num_shards);
363403
// Update the solution, the shard load, and the number of shards to place.
364-
solution.indexer_assignments[indexer_ord]
404+
solution.indexer_assignments[*indexer_ord]
365405
.add_shards(source.source_ord, num_shards_to_place);
366406
num_shards -= num_shards_to_place;
407+
if num_shards == 0 {
408+
return Ok(());
409+
}
367410
}
368-
Ok(())
411+
Err(NotEnoughCapacity)
369412
}
370413

371414
/// Compute the sources/shards that have not been assigned to any indexer yet.
@@ -394,30 +437,11 @@ fn compute_unassigned_sources(
394437
unassigned_sources.into_values().collect()
395438
}
396439

397-
/// Builds a BinaryHeap with the different indexer capacities.
398-
///
399-
/// Panics if one of the indexer is over-assigned.
400-
fn compute_indexer_available_capacity<'a>(
401-
problem: &'a SchedulingProblem,
402-
solution: &'a SchedulingSolution,
403-
) -> impl Iterator<Item = (IndexerOrd, CpuCapacity)> + 'a {
404-
solution
405-
.indexer_assignments
406-
.iter()
407-
.map(|indexer_assignment| {
408-
let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem);
409-
assert!(available_capacity >= 0i32);
410-
(
411-
indexer_assignment.indexer_ord,
412-
CpuCapacity::from_cpu_millis(available_capacity as u32),
413-
)
414-
})
415-
}
416-
417440
#[cfg(test)]
418441
mod tests {
419442
use std::num::NonZeroU32;
420443

444+
use itertools::Itertools;
421445
use proptest::prelude::*;
422446
use quickwit_proto::indexing::mcpu;
423447

0 commit comments

Comments
 (0)