Skip to content

Commit 2333cae

Browse files
authored
Merge pull request #686 from nightmeng/dev/fix-cpu-admit-failed.hints
feat(qrm): optimize admit logic
2 parents 8f6cbb5 + 58d14b3 commit 2333cae

File tree

12 files changed

+133
-108
lines changed

12 files changed

+133
-108
lines changed

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package dynamicpolicy
1919
import (
2020
"context"
2121
"fmt"
22-
"math"
2322
"sync"
2423
"time"
2524

@@ -222,8 +221,6 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
222221
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
223222
}
224223

225-
state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)
226-
227224
if err := policyImplement.cleanPools(); err != nil {
228225
return false, agent.ComponentStub{}, fmt.Errorf("cleanPools failed with error: %v", err)
229226
}
@@ -1181,28 +1178,12 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource
11811178
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
11821179
}
11831180

1184-
shareCoresAllocated := reqFloat64
1185-
podEntries := p.state.GetPodEntries()
1186-
for podUid, podEntry := range podEntries {
1187-
if podEntry.IsPoolEntry() {
1188-
continue
1189-
}
1190-
if podUid == req.PodUid {
1191-
continue
1192-
}
1193-
for _, allocation := range podEntry {
1194-
// shareCoresAllocated should involve both main and sidecar containers
1195-
if state.CheckShared(allocation) && !state.CheckNUMABinding(allocation) {
1196-
shareCoresAllocated += p.getContainerRequestedCores(allocation)
1197-
}
1198-
}
1199-
}
1181+
shareCoresAllocatedInt := state.GetNonBindingSharedRequestedQuantityFromPodEntries(p.state.GetPodEntries(), map[string]float64{req.PodUid: reqFloat64}, p.getContainerRequestedCores)
12001182

12011183
machineState := p.state.GetMachineState()
12021184
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
12031185
state.CheckDedicated, state.CheckNUMABinding)
12041186

1205-
shareCoresAllocatedInt := int(math.Ceil(shareCoresAllocated))
12061187
general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
12071188
if shareCoresAllocatedInt > pooledCPUs.Size() {
12081189
general.Warningf("[checkNormalShareCoresCpuResource] no enough cpu resource for normal share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)",

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
582582
}
583583
newEntries[podUID][containerName] = allocationInfo.Clone()
584584
// adapt to old checkpoint without RequestQuantity property
585-
newEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)
585+
newEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo)
586586

587587
switch allocationInfo.QoSLevel {
588588
case consts.PodAnnotationQoSLevelDedicatedCores:

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -726,15 +726,15 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig
726726
return fmt.Errorf("pool %s cross NUMA: %+v", poolName, poolsQuantityMap[poolName])
727727
}
728728
} else if incrByReq {
729-
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap)
729+
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores)
730730
if err != nil {
731731
return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err)
732732
}
733733
}
734734
} else {
735735
// else we do sum(containers req) for each pool to get pools ratio
736736
var err error
737-
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos)
737+
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores)
738738
if err != nil {
739739
return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err)
740740
}
@@ -745,14 +745,14 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig
745745
allocationInfos[0].PodNamespace, allocationInfos[0].PodName, allocationInfos[0].ContainerName)
746746
}
747747
// if advisor is disabled, qrm can re-calc the pool size exactly. we don't need to adjust the pool size.
748-
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap)
748+
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores)
749749
if err != nil {
750750
return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err)
751751
}
752752
}
753753
}
754754

755-
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos)
755+
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores)
756756
err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap,
757757
entries, machineState)
758758
if err != nil {
@@ -867,12 +867,12 @@ func (p *DynamicPolicy) adjustAllocationEntries() error {
867867
poolsQuantityMap = machine.ParseCPUAssignmentQuantityMap(poolsCPUSetMap)
868868
} else {
869869
var err error
870-
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil)
870+
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores)
871871
if err != nil {
872872
return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err)
873873
}
874874
}
875-
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil)
875+
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores)
876876

877877
err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap, entries, machineState)
878878
if err != nil {
@@ -1095,7 +1095,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
10951095

10961096
newPodEntries[podUID][containerName] = allocationInfo.Clone()
10971097
// adapt to old checkpoint without RequestQuantity property
1098-
newPodEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)
1098+
newPodEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo)
10991099
switch allocationInfo.QoSLevel {
11001100
case apiconsts.PodAnnotationQoSLevelDedicatedCores:
11011101
newPodEntries[podUID][containerName].OwnerPoolName = allocationInfo.GetPoolName()

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
4242
)
4343

44+
var errNoAvailableCPUHints = fmt.Errorf("no available cpu hints")
45+
4446
type memBWHintUpdate struct {
4547
updatedPreferrence bool
4648
leftAllocatable int
@@ -73,7 +75,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
7375
"podName": req.PodName,
7476
"containerName": req.ContainerName,
7577
})...)
76-
return nil, fmt.Errorf("no enough cpu resource")
78+
return nil, errNoAvailableCPUHints
7779
}
7880
}
7981

@@ -209,12 +211,6 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
209211
}
210212
sort.Ints(numaNodes)
211213

212-
hints := map[string]*pluginapi.ListOfTopologyHints{
213-
string(v1.ResourceCPU): {
214-
Hints: []*pluginapi.TopologyHint{},
215-
},
216-
}
217-
218214
minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
219215
if err != nil {
220216
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
@@ -262,6 +258,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
262258
}
263259

264260
preferredHintIndexes := []int{}
261+
var availableNumaHints []*pluginapi.TopologyHint
265262
machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) {
266263
maskCount := mask.Count()
267264
if maskCount < minNUMAsCountNeeded {
@@ -292,18 +289,27 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
292289
}
293290

294291
preferred := maskCount == minNUMAsCountNeeded
295-
hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{
292+
availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{
296293
Nodes: machine.MaskToUInt64Array(mask),
297294
Preferred: preferred,
298295
})
299296

300297
if preferred {
301-
preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1)
298+
preferredHintIndexes = append(preferredHintIndexes, len(availableNumaHints)-1)
302299
}
303300
})
304301

302+
// NOTE: because grpc is inability to distinguish between an empty array and nil,
303+
// we return an error instead of an empty array.
304+
// we should resolve this issue if we need manage multi resource in one plugin.
305+
if len(availableNumaHints) == 0 {
306+
general.Warningf("calculateHints got no available cpu hints for pod: %s/%s, container: %s",
307+
req.PodNamespace, req.PodName, req.ContainerName)
308+
return nil, errNoAvailableCPUHints
309+
}
310+
305311
if numaBound > machine.MBWNUMAsPoint {
306-
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer)
312+
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores)
307313

308314
general.InfoS("getNUMAAllocatedMemBW",
309315
"podNamespace", req.PodNamespace,
@@ -314,15 +320,21 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
314320
general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err)
315321
_ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw)
316322
} else {
317-
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints,
323+
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints,
318324
reqInt, numaAllocatedMemBW, req, numaExclusive)
319325
}
320326
}
321327

328+
hints := map[string]*pluginapi.ListOfTopologyHints{
329+
string(v1.ResourceCPU): {
330+
Hints: availableNumaHints,
331+
},
332+
}
333+
322334
return hints, nil
323335
}
324336

325-
func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer) (map[int]int, error) {
337+
func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer, getContainerRequestedCores state.GetContainerRequestedCoresFunc) (map[int]int, error) {
326338
numaAllocatedMemBW := make(map[int]int)
327339
podUIDToMemBWReq := make(map[string]int)
328340
podUIDToBindingNUMAs := make(map[string]sets.Int)
@@ -350,7 +362,7 @@ func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserve
350362
Name: allocationInfo.PodName,
351363
Labels: allocationInfo.Labels,
352364
Annotations: allocationInfo.Annotations,
353-
}, int(math.Ceil(state.GetContainerRequestedCores()(allocationInfo))))
365+
}, int(math.Ceil(getContainerRequestedCores(allocationInfo))))
354366
if err != nil {
355367
return nil, fmt.Errorf("GetContainerMemoryBandwidthRequest for pod: %s/%s, container: %s failed with error: %v",
356368
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err)
@@ -633,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
633645
general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA",
634646
req.PodNamespace, req.PodName, req.ContainerName)
635647
var calculateErr error
636-
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
648+
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
637649
if calculateErr != nil {
638650
general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA",
639651
req.PodNamespace, req.PodName, req.ContainerName)
@@ -642,15 +654,15 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
642654
} else {
643655
general.Errorf("pod: %s/%s, container: %s request inplace update resize, but no enough resource for it in current NUMA",
644656
req.PodNamespace, req.PodName, req.ContainerName)
645-
return nil, fmt.Errorf("inplace update resize scale out failed with no enough resource")
657+
return nil, errNoAvailableCPUHints
646658
}
647659
} else {
648660
general.Infof("pod: %s/%s, container: %s request inplace update resize, there is enough resource for it in current NUMA",
649661
req.PodNamespace, req.PodName, req.ContainerName)
650662
}
651663
} else if hints == nil {
652664
var calculateErr error
653-
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
665+
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
654666
if calculateErr != nil {
655667
return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr)
656668
}
@@ -780,12 +792,13 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi
780792

781793
func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries,
782794
machineState state.NUMANodeMap,
783-
reqAnnotations map[string]string,
795+
req *pluginapi.ResourceRequest,
784796
) (map[string]*pluginapi.ListOfTopologyHints, error) {
785797
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
786798
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
787-
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries)
799+
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)
788800

801+
reqAnnotations := req.Annotations
789802
numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity,
790803
nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState,
791804
machineState.GetFilteredNUMASetWithAnnotations(state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt())
@@ -826,6 +839,15 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
826839
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
827840
}
828841

842+
// NOTE: because grpc is inability to distinguish between an empty array and nil,
843+
// we return an error instead of an empty array.
844+
// we should resolve this issue if we need manage multi resource in one plugin.
845+
if len(hints[string(v1.ResourceCPU)].Hints) == 0 {
846+
general.Warningf("calculateHints got no available memory hints for snb pod: %s/%s, container: %s",
847+
req.PodNamespace, req.PodName, req.ContainerName)
848+
return nil, errNoAvailableCPUHints
849+
}
850+
829851
return hints, nil
830852
}
831853

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st
112112
podDebugAnnoKeys: []string{podDebugAnnoKey},
113113
}
114114

115-
state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)
116-
117115
// register allocation behaviors for pods with different QoS level
118116
policyImplement.allocationHandlers = map[string]util.AllocationHandler{
119117
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler,
@@ -4923,7 +4921,6 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) {
49234921
as := require.New(t)
49244922

49254923
policyImplement := &DynamicPolicy{}
4926-
state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)
49274924

49284925
testName := "test"
49294926
highDensityCPUTopology, err := machine.GenerateDummyCPUTopology(384, 2, 12)
@@ -5168,7 +5165,7 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) {
51685165
curTT := tt
51695166
t.Run(tt.name, func(t *testing.T) {
51705167
t.Parallel()
5171-
got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer)
5168+
got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer, policyImplement.getContainerRequestedCores)
51725169
if (err != nil) != curTT.wantErr {
51735170
t.Errorf("getNUMAAllocatedMemBW() error = %v, wantErr %v", err, curTT.wantErr)
51745171
return
@@ -5294,10 +5291,8 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) {
52945291
}
52955292

52965293
// pod aggregated size is 8, the new container request is 4, 8 + 4 > 11 (share-NUMA0 size)
5297-
res, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
5298-
as.Nil(err)
5299-
as.NotNil(res.ResourceHints[string(v1.ResourceCPU)])
5300-
as.Equal(0, len(res.ResourceHints[string(v1.ResourceCPU)].Hints))
5294+
_, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
5295+
as.ErrorContains(err, errNoAvailableCPUHints.Error())
53015296

53025297
// reallocate sidecar
53035298
_, err = dynamicPolicy.Allocate(context.Background(), sidecarReq)

0 commit comments

Comments
 (0)