12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: cmp:: Reverse ;
15
+ use std:: cmp:: { Ordering , Reverse } ;
16
16
use std:: collections:: BTreeMap ;
17
17
use std:: collections:: btree_map:: Entry ;
18
18
19
- use itertools:: Itertools ;
20
19
use quickwit_proto:: indexing:: CpuCapacity ;
21
20
22
21
use super :: scheduling_logic_model:: * ;
@@ -229,21 +228,61 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
229
228
// If this algorithm fails to place all remaining shards, we inflate
230
229
// the node capacities by 20% in the scheduling problem and start from the beginning.
231
230
231
+ #[ derive( Debug , PartialEq , Eq , Ord ) ]
232
+ struct PlacementCandidate {
233
+ indexer_ord : IndexerOrd ,
234
+ current_num_shards : u32 ,
235
+ available_capacity : CpuCapacity ,
236
+ affinity : u32 ,
237
+ }
238
+
239
+ impl PartialOrd for PlacementCandidate {
240
+ fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
241
+ // Higher affinity is better
242
+ match self . affinity . cmp ( & other. affinity ) {
243
+ Ordering :: Equal => { }
244
+ ordering => return Some ( ordering. reverse ( ) ) ,
245
+ }
246
+ // If tie, pick the node with shards already assigned first
247
+ match self . current_num_shards . cmp ( & other. current_num_shards ) {
248
+ Ordering :: Equal => { }
249
+ ordering => return Some ( ordering. reverse ( ) ) ,
250
+ }
251
+ // If tie, pick the node with the highest available capacity
252
+ match self . available_capacity . cmp ( & other. available_capacity ) {
253
+ Ordering :: Equal => { }
254
+ ordering => return Some ( ordering. reverse ( ) ) ,
255
+ }
256
+ // Final tie-breaker: indexer ID for deterministic ordering
257
+ Some ( self . indexer_ord . cmp ( & other. indexer_ord ) . reverse ( ) )
258
+ }
259
+ }
260
+
232
261
fn attempt_place_unassigned_shards (
233
262
unassigned_shards : & [ Source ] ,
234
263
problem : & SchedulingProblem ,
235
264
partial_solution : & SchedulingSolution ,
236
265
) -> Result < SchedulingSolution , NotEnoughCapacity > {
237
266
let mut solution = partial_solution. clone ( ) ;
238
267
for source in unassigned_shards {
239
- let indexers_with_most_available_capacity =
240
- compute_indexer_available_capacity ( problem, & solution)
241
- . sorted_by_key ( |( indexer_ord, capacity) | Reverse ( ( * capacity, * indexer_ord) ) ) ;
242
- place_unassigned_shards_single_source (
243
- source,
244
- indexers_with_most_available_capacity,
245
- & mut solution,
246
- ) ?;
268
+ let mut placements: Vec < PlacementCandidate > = solution
269
+ . indexer_assignments
270
+ . iter ( )
271
+ . map ( |indexer_assignment : & IndexerAssignment | {
272
+ let available_capacity = indexer_assignment. indexer_available_capacity ( problem) ;
273
+ assert ! ( available_capacity >= 0i32 ) ;
274
+ let available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
275
+ let current_num_shards = indexer_assignment. num_shards ( source. source_ord ) ;
276
+ PlacementCandidate {
277
+ affinity : 0 ,
278
+ current_num_shards,
279
+ available_capacity,
280
+ indexer_ord : indexer_assignment. indexer_ord ,
281
+ }
282
+ } )
283
+ . collect ( ) ;
284
+ placements. sort ( ) ;
285
+ place_unassigned_shards_single_source ( source, & placements, & mut solution) ?;
247
286
}
248
287
assert_place_unassigned_shards_post_condition ( problem, & solution) ;
249
288
Ok ( solution)
@@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity(
259
298
Reverse ( load)
260
299
} ) ;
261
300
for source in & unassigned_shards {
262
- // List of indexer with a non-null affinity and some available capacity, sorted by
263
- // (affinity, available capacity) in that order.
264
- let indexers_with_affinity_and_available_capacity = source
301
+ let mut placements: Vec < PlacementCandidate > = source
265
302
. affinities
266
303
. iter ( )
267
304
. filter ( |& ( _, & affinity) | affinity != 0u32 )
268
- . map ( |( & indexer_ord, affinity) | {
305
+ . map ( |( & indexer_ord, & affinity) | {
269
306
let available_capacity =
270
307
solution. indexer_assignments [ indexer_ord] . indexer_available_capacity ( problem) ;
271
- let capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
272
- ( indexer_ord, affinity, capacity)
273
- } )
274
- . sorted_by_key ( |( indexer_ord, affinity, capacity) | {
275
- Reverse ( ( * affinity, * capacity, * indexer_ord) )
308
+ let available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
309
+ let current_num_shards =
310
+ solution. indexer_assignments [ indexer_ord] . num_shards ( source. source_ord ) ;
311
+ PlacementCandidate {
312
+ affinity,
313
+ current_num_shards,
314
+ available_capacity,
315
+ indexer_ord,
316
+ }
276
317
} )
277
- . map ( |( indexer_ord, _, capacity) | ( indexer_ord, capacity) ) ;
278
- let _ = place_unassigned_shards_single_source (
279
- source,
280
- indexers_with_affinity_and_available_capacity,
281
- solution,
282
- ) ;
318
+ . collect ( ) ;
319
+ placements. sort ( ) ;
320
+ let _ = place_unassigned_shards_single_source ( source, & placements, solution) ;
283
321
}
284
322
}
285
323
@@ -350,22 +388,27 @@ struct NotEnoughCapacity;
350
388
/// amongst the node with their given node capacity.
351
389
fn place_unassigned_shards_single_source (
352
390
source : & Source ,
353
- mut indexer_with_capacities : impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > ,
391
+ sorted_candidates : & [ PlacementCandidate ] ,
354
392
solution : & mut SchedulingSolution ,
355
393
) -> Result < ( ) , NotEnoughCapacity > {
356
394
let mut num_shards = source. num_shards ;
357
- while num_shards > 0 {
358
- let Some ( ( indexer_ord, available_capacity) ) = indexer_with_capacities. next ( ) else {
359
- return Err ( NotEnoughCapacity ) ;
360
- } ;
395
+ for PlacementCandidate {
396
+ indexer_ord,
397
+ available_capacity,
398
+ ..
399
+ } in sorted_candidates
400
+ {
361
401
let num_placable_shards = available_capacity. cpu_millis ( ) / source. load_per_shard ;
362
402
let num_shards_to_place = num_placable_shards. min ( num_shards) ;
363
403
// Update the solution, the shard load, and the number of shards to place.
364
- solution. indexer_assignments [ indexer_ord]
404
+ solution. indexer_assignments [ * indexer_ord]
365
405
. add_shards ( source. source_ord , num_shards_to_place) ;
366
406
num_shards -= num_shards_to_place;
407
+ if num_shards == 0 {
408
+ return Ok ( ( ) ) ;
409
+ }
367
410
}
368
- Ok ( ( ) )
411
+ Err ( NotEnoughCapacity )
369
412
}
370
413
371
414
/// Compute the sources/shards that have not been assigned to any indexer yet.
@@ -394,30 +437,11 @@ fn compute_unassigned_sources(
394
437
unassigned_sources. into_values ( ) . collect ( )
395
438
}
396
439
397
- /// Builds a BinaryHeap with the different indexer capacities.
398
- ///
399
- /// Panics if one of the indexer is over-assigned.
400
- fn compute_indexer_available_capacity < ' a > (
401
- problem : & ' a SchedulingProblem ,
402
- solution : & ' a SchedulingSolution ,
403
- ) -> impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > + ' a {
404
- solution
405
- . indexer_assignments
406
- . iter ( )
407
- . map ( |indexer_assignment| {
408
- let available_capacity: i32 = indexer_assignment. indexer_available_capacity ( problem) ;
409
- assert ! ( available_capacity >= 0i32 ) ;
410
- (
411
- indexer_assignment. indexer_ord ,
412
- CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ,
413
- )
414
- } )
415
- }
416
-
417
440
#[ cfg( test) ]
418
441
mod tests {
419
442
use std:: num:: NonZeroU32 ;
420
443
444
+ use itertools:: Itertools ;
421
445
use proptest:: prelude:: * ;
422
446
use quickwit_proto:: indexing:: mcpu;
423
447
0 commit comments