Skip to content

Commit 194ff6b

Browse files
committed
feat(qrm): allocate overlapping reclaim cpus reversely
1 parent 83a599e commit 194ff6b

File tree

4 files changed

+103
-4
lines changed

4 files changed

+103
-4
lines changed

pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator/cpu_assignment.go

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,24 @@ func (a *cpuAccumulator) freeCPUsInNUMANode(numaID int) []int {
6464
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceInt()
6565
}
6666

67-
// freeCoresInNUMANode returns free core ids in specified NUMA
67+
// freeCPUsInNUMANodeReversely returns free cpu ids in specified NUMA,
68+
// and those returned cpu slices have already been sorted reversely
69+
func (a *cpuAccumulator) freeCPUsInNUMANodeReversely(numaID int) []int {
70+
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceIntReversely()
71+
}
72+
73+
// freeCoresInNUMANode returns free core ids in specified NUMA,
74+
// and those returned cpu slices have already been sorted
6875
func (a *cpuAccumulator) freeCoresInNUMANode(numaID int) []int {
6976
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceInt()
7077
}
7178

79+
// freeCoresInNUMANode returns free core ids in specified NUMA,
80+
// and those returned cpu slices have already been sorted reversely
81+
func (a *cpuAccumulator) freeCoresInNUMANodeReversely(numaID int) []int {
82+
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceIntReversely()
83+
}
84+
7285
// isSocketFree returns true if the supplied socket is fully available
7386
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
7487
return a.cpuDetails.CPUsInSockets(socketID).Size() == a.getTopology().CPUsPerSocket()
@@ -301,7 +314,7 @@ successful:
301314
}
302315

303316
// TakeHTByNUMABalance tries to make the allocated cpu spread on different
304-
// sockets, and it uses cpu HT as the basic allocation unit
317+
// NUMAs, and it uses cpu HT as the basic allocation unit
305318
func TakeHTByNUMABalance(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
306319
cpuRequirement int,
307320
) (machine.CPUSet, machine.CPUSet, error) {
@@ -338,3 +351,55 @@ failed:
338351
successful:
339352
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
340353
}
354+
355+
// TakeByNUMABalanceReversely tries to make the allocated cpu resersely spread on different
356+
// NUMAs, and it uses cpu Cores as the basic allocation unit
357+
func TakeByNUMABalanceReversely(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
358+
cpuRequirement int,
359+
) (machine.CPUSet, machine.CPUSet, error) {
360+
var err error
361+
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)
362+
if acc.isSatisfied() {
363+
goto successful
364+
}
365+
366+
for {
367+
if acc.isFailed() {
368+
err = fmt.Errorf("not enough cpus available to satisfy request")
369+
goto failed
370+
}
371+
372+
numaLoop:
373+
for _, s := range info.CPUDetails.NUMANodes().ToSliceInt() {
374+
if acc.needs(acc.getTopology().CPUsPerCore()) && len(acc.freeCores()) > 0 {
375+
for _, c := range acc.freeCoresInNUMANodeReversely(s) {
376+
acc.take(acc.getDetails().CPUsInCores(c))
377+
if acc.isSatisfied() {
378+
goto successful
379+
} else {
380+
continue numaLoop
381+
}
382+
}
383+
continue
384+
}
385+
386+
for _, c := range acc.freeCPUsInNUMANodeReversely(s) {
387+
if acc.needs(1) {
388+
acc.take(machine.NewCPUSet(c))
389+
}
390+
if acc.isSatisfied() {
391+
goto successful
392+
} else {
393+
break
394+
}
395+
}
396+
}
397+
}
398+
failed:
399+
if err == nil {
400+
err = errors.New("failed to allocate cpus")
401+
}
402+
return availableCPUs, availableCPUs, err
403+
successful:
404+
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
405+
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,8 +1241,10 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]ma
12411241

12421242
req := int(math.Ceil(float64(cset.Size()) * ratio))
12431243

1244+
// if p.state.GetAllowSharedCoresOverlapReclaimedCores() == false, we will take cpus for reclaim pool lastly,
1245+
// else we also should take cpus for reclaim pool reversely overlapping with share type pool to aviod cpuset jumping obviously
12441246
var tErr error
1245-
overlapCPUs, _, tErr := calculator.TakeByNUMABalance(p.machineInfo, cset, req)
1247+
overlapCPUs, _, tErr := calculator.TakeByNUMABalanceReversely(p.machineInfo, cset, req)
12461248
if tErr != nil {
12471249
err = fmt.Errorf("take overlapCPUs from: %s to %s by ratio: %.4f failed with err: %v",
12481250
poolName, state.PoolNameReclaim, ratio, tErr)

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2809,6 +2809,23 @@ func TestGetResourcesAllocation(t *testing.T) {
28092809
}
28102810

28112811
dynamicPolicy.state.SetAllowSharedCoresOverlapReclaimedCores(true)
2812+
dynamicPolicy.dynamicConfig.GetDynamicConfiguration().EnableReclaim = true
2813+
dynamicPolicy.state.SetAllocationInfo(state.PoolNameReclaim, "", &state.AllocationInfo{
2814+
PodUid: state.PoolNameReclaim,
2815+
OwnerPoolName: state.PoolNameReclaim,
2816+
AllocationResult: machine.MustParse("1,3,4-5"),
2817+
OriginalAllocationResult: machine.MustParse("1,3,4-5"),
2818+
TopologyAwareAssignments: map[int]machine.CPUSet{
2819+
0: machine.NewCPUSet(1),
2820+
1: machine.NewCPUSet(3),
2821+
2: machine.NewCPUSet(4, 5),
2822+
},
2823+
OriginalTopologyAwareAssignments: map[int]machine.CPUSet{
2824+
0: machine.NewCPUSet(1),
2825+
1: machine.NewCPUSet(3),
2826+
2: machine.NewCPUSet(4, 5),
2827+
},
2828+
})
28122829
_, err = dynamicPolicy.Allocate(context.Background(), req)
28132830
as.Nil(err)
28142831

@@ -2833,8 +2850,12 @@ func TestGetResourcesAllocation(t *testing.T) {
28332850
IsNodeResource: false,
28342851
IsScalarResource: true,
28352852
AllocatedQuantity: 14,
2836-
AllocationResult: machine.NewCPUSet(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 15).String(),
2853+
AllocationResult: machine.NewCPUSet(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).String(),
28372854
}, resp3.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceCPU)])
2855+
2856+
reclaimEntry := dynamicPolicy.state.GetAllocationInfo(state.PoolNameReclaim, "")
2857+
as.NotNil(reclaimEntry)
2858+
as.Equal(6, reclaimEntry.AllocationResult.Size()) // ceil("14 * (4 / 10)") == 6
28382859
}
28392860

28402861
func TestAllocateByQoSAwareServerListAndWatchResp(t *testing.T) {

pkg/util/machine/cpuset.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,17 @@ func (s CPUSet) ToSliceInt() []int {
220220
return result
221221
}
222222

223+
// ToSliceInt returns an resersely ordered slice of int that contains
224+
// all elements from this set
225+
func (s CPUSet) ToSliceIntReversely() []int {
226+
result := []int{}
227+
for cpu := range s.elems {
228+
result = append(result, cpu)
229+
}
230+
sort.Sort(sort.Reverse(sort.IntSlice(result)))
231+
return result
232+
}
233+
223234
// ToSliceInt64 returns an ordered slice of int64 that contains
224235
// all elements from this set
225236
func (s CPUSet) ToSliceInt64() []int64 {

0 commit comments

Comments
 (0)