@@ -26,6 +26,7 @@ use scheduling_logic_model::{IndexerOrd, SourceOrd};
26
26
use tracing:: { error, warn} ;
27
27
28
28
use crate :: indexing_plan:: PhysicalIndexingPlan ;
29
+ use crate :: indexing_scheduler:: MAX_LOAD_PER_PIPELINE ;
29
30
use crate :: indexing_scheduler:: scheduling:: scheduling_logic_model:: {
30
31
IndexerAssignment , SchedulingProblem , SchedulingSolution ,
31
32
} ;
@@ -44,9 +45,6 @@ use crate::model::ShardLocations;
44
45
/// of 30%. Which translates into an overall load of 60%.
45
46
const CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD : CpuCapacity = CpuCapacity :: from_cpu_millis ( 1_200 ) ;
46
47
47
- /// That's 80% of a period
48
- const MAX_LOAD_PER_PIPELINE : CpuCapacity = CpuCapacity :: from_cpu_millis ( 3_200 ) ;
49
-
50
48
fn populate_problem (
51
49
source : & SourceToSchedule ,
52
50
problem : & mut SchedulingProblem ,
@@ -748,7 +746,7 @@ mod tests {
748
746
749
747
use fnv:: FnvHashMap ;
750
748
use itertools:: Itertools ;
751
- use quickwit_proto:: indexing:: { CpuCapacity , IndexingTask , mcpu} ;
749
+ use quickwit_proto:: indexing:: { CpuCapacity , IndexingTask , PIPELINE_FULL_CAPACITY , mcpu} ;
752
750
use quickwit_proto:: types:: { IndexUid , NodeId , PipelineUid , ShardId , SourceUid } ;
753
751
use rand:: seq:: SliceRandom ;
754
752
@@ -939,6 +937,71 @@ mod tests {
939
937
}
940
938
}
941
939
940
+ #[ test]
941
+ fn test_build_physical_plan_with_pipeline_limit ( ) {
942
+ let indexer1 = "indexer1" . to_string ( ) ;
943
+ let indexer2 = "indexer2" . to_string ( ) ;
944
+ let source_uid0 = source_id ( ) ;
945
+ let source_uid1 = source_id ( ) ;
946
+ let source_0 = SourceToSchedule {
947
+ source_uid : source_uid0. clone ( ) ,
948
+ source_type : SourceToScheduleType :: Sharded {
949
+ shard_ids : ( 0 ..16 ) . map ( ShardId :: from) . collect ( ) ,
950
+ load_per_shard : NonZeroU32 :: new ( 800 ) . unwrap ( ) ,
951
+ } ,
952
+ params_fingerprint : 0 ,
953
+ } ;
954
+ let source_1 = SourceToSchedule {
955
+ source_uid : source_uid1. clone ( ) ,
956
+ source_type : SourceToScheduleType :: NonSharded {
957
+ num_pipelines : 4 ,
958
+ load_per_pipeline : NonZeroU32 :: new ( PIPELINE_FULL_CAPACITY . cpu_millis ( ) ) . unwrap ( ) ,
959
+ } ,
960
+ params_fingerprint : 0 ,
961
+ } ;
962
+ let mut indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ;
963
+ indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) , mcpu ( 16_000 ) ) ;
964
+ indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) , mcpu ( 16_000 ) ) ;
965
+ let shard_locations = ShardLocations :: default ( ) ;
966
+ let indexing_plan = build_physical_indexing_plan (
967
+ & [ source_0, source_1] ,
968
+ & indexer_id_to_cpu_capacities,
969
+ None ,
970
+ & shard_locations,
971
+ ) ;
972
+ assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) , 2 ) ;
973
+
974
+ let node1_plan = indexing_plan. indexer ( & indexer1) . unwrap ( ) ;
975
+ let node2_plan = indexing_plan. indexer ( & indexer2) . unwrap ( ) ;
976
+
977
+ println ! ( "node1_plan: {node1_plan:#?}" ) ;
978
+ println ! ( "node2_plan: {node2_plan:#?}" ) ;
979
+
980
+ let source_0_on_node1 = node1_plan
981
+ . iter ( )
982
+ . filter ( |task| task. source_id == source_uid0. source_id )
983
+ . count ( ) ;
984
+ let source_0_on_node2 = node2_plan
985
+ . iter ( )
986
+ . filter ( |task| task. source_id == source_uid0. source_id )
987
+ . count ( ) ;
988
+ assert ! ( source_0_on_node1 <= 3 ) ;
989
+ assert ! ( source_0_on_node2 <= 3 ) ;
990
+ assert_eq ! ( source_0_on_node1 + source_0_on_node2, 4 ) ;
991
+
992
+ let source_1_on_node1 = node1_plan
993
+ . iter ( )
994
+ . filter ( |task| task. source_id == source_uid1. source_id )
995
+ . count ( ) ;
996
+ let source_1_on_node2 = node2_plan
997
+ . iter ( )
998
+ . filter ( |task| task. source_id == source_uid1. source_id )
999
+ . count ( ) ;
1000
+ assert ! ( source_1_on_node1 <= 3 ) ;
1001
+ assert ! ( source_1_on_node2 <= 3 ) ;
1002
+ assert_eq ! ( source_1_on_node1 + source_1_on_node2, 4 ) ;
1003
+ }
1004
+
942
1005
fn make_indexing_tasks (
943
1006
source_uid : & SourceUid ,
944
1007
shards : & [ ( PipelineUid , & [ ShardId ] ) ] ,
0 commit comments