Skip to content

Commit cae1007

Browse files
authored
Merge pull request #776 from yehlemias/dev/optimize-qrm-remove-pod
optimize qrm remove pod
2 parents 618bb47 + 8ce63db commit cae1007

32 files changed

+508
-343
lines changed

pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator/cpu_assignment.go

Lines changed: 77 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ type cpuAccumulator struct {
4242
}
4343

4444
func newCPUAccumulator(machineInfo *machine.KatalystMachineInfo, availableCPUs machine.CPUSet, numCPUs int) *cpuAccumulator {
45-
return &cpuAccumulator{
45+
a := &cpuAccumulator{
4646
numCPUsNeeded: numCPUs,
4747
cpuTopology: machineInfo.CPUTopology,
4848
cpuDetails: machineInfo.CPUDetails.KeepOnly(availableCPUs),
4949
result: machine.NewCPUSet(),
5050
}
51+
return a
5152
}
5253

5354
func (a *cpuAccumulator) getDetails() machine.CPUDetails {
@@ -268,30 +269,22 @@ func TakeByNUMABalance(info *machine.KatalystMachineInfo, availableCPUs machine.
268269
) (machine.CPUSet, machine.CPUSet, error) {
269270
var err error
270271
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)
272+
271273
if acc.isSatisfied() {
272274
goto successful
273275
}
274276

277+
if takeFreeCoresByNumaBalance(acc) {
278+
goto successful
279+
}
280+
275281
for {
276282
if acc.isFailed() {
277283
err = fmt.Errorf("not enough cpus available to satisfy request")
278284
goto failed
279285
}
280286

281-
numaLoop:
282287
for _, s := range info.CPUDetails.NUMANodes().ToSliceInt() {
283-
if acc.needs(acc.getTopology().CPUsPerCore()) && len(acc.freeCores()) > 0 {
284-
for _, c := range acc.freeCoresInNUMANode(s) {
285-
acc.take(acc.getDetails().CPUsInCores(c))
286-
if acc.isSatisfied() {
287-
goto successful
288-
} else {
289-
continue numaLoop
290-
}
291-
}
292-
continue
293-
}
294-
295288
for _, c := range acc.freeCPUsInNUMANode(s) {
296289
if acc.needs(1) {
297290
acc.take(machine.NewCPUSet(c))
@@ -359,30 +352,22 @@ func TakeByNUMABalanceReversely(info *machine.KatalystMachineInfo, availableCPUs
359352
) (machine.CPUSet, machine.CPUSet, error) {
360353
var err error
361354
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)
355+
362356
if acc.isSatisfied() {
363357
goto successful
364358
}
365359

360+
if takeFreeCoresByNumaBalanceReversely(acc) {
361+
goto successful
362+
}
363+
366364
for {
367365
if acc.isFailed() {
368366
err = fmt.Errorf("not enough cpus available to satisfy request")
369367
goto failed
370368
}
371369

372-
numaLoop:
373370
for _, s := range info.CPUDetails.NUMANodes().ToSliceInt() {
374-
if acc.needs(acc.getTopology().CPUsPerCore()) && len(acc.freeCores()) > 0 {
375-
for _, c := range acc.freeCoresInNUMANodeReversely(s) {
376-
acc.take(acc.getDetails().CPUsInCores(c))
377-
if acc.isSatisfied() {
378-
goto successful
379-
} else {
380-
continue numaLoop
381-
}
382-
}
383-
continue
384-
}
385-
386371
for _, c := range acc.freeCPUsInNUMANodeReversely(s) {
387372
if acc.needs(1) {
388373
acc.take(machine.NewCPUSet(c))
@@ -403,3 +388,68 @@ failed:
403388
successful:
404389
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
405390
}
391+
392+
func takeFreeCoresByNumaBalance(acc *cpuAccumulator) bool {
393+
info := acc.getTopology()
394+
if !acc.needs(info.CPUsPerCore()) {
395+
return false
396+
}
397+
398+
numaSlice := info.CPUDetails.NUMANodes().ToSliceInt()
399+
freeCoresMap := make(map[int][]int)
400+
maxFreeCoresCountInNuma := 0
401+
for _, s := range numaSlice {
402+
freeCores := acc.freeCoresInNUMANode(s)
403+
freeCoresMap[s] = freeCores
404+
if len(freeCores) > maxFreeCoresCountInNuma {
405+
maxFreeCoresCountInNuma = len(freeCores)
406+
}
407+
}
408+
409+
for i := 0; i < maxFreeCoresCountInNuma; i++ {
410+
for _, s := range numaSlice {
411+
if len(freeCoresMap[s]) > i {
412+
c := freeCoresMap[s][i]
413+
if acc.needs(info.CPUsPerCore()) {
414+
acc.take(acc.getDetails().CPUsInCores(c))
415+
if acc.isSatisfied() {
416+
return true
417+
}
418+
}
419+
}
420+
}
421+
}
422+
return false
423+
}
424+
425+
func takeFreeCoresByNumaBalanceReversely(acc *cpuAccumulator) bool {
426+
info := acc.getTopology()
427+
if !acc.needs(info.CPUsPerCore()) {
428+
return false
429+
}
430+
431+
numaSlice := info.CPUDetails.NUMANodes().ToSliceInt()
432+
reverselyFreeCoresMap := make(map[int][]int)
433+
maxFreeCoresCountInNuma := 0
434+
for _, s := range numaSlice {
435+
freeCores := acc.freeCoresInNUMANodeReversely(s)
436+
reverselyFreeCoresMap[s] = freeCores
437+
if len(freeCores) > maxFreeCoresCountInNuma {
438+
maxFreeCoresCountInNuma = len(freeCores)
439+
}
440+
}
441+
for i := 0; i < maxFreeCoresCountInNuma; i++ {
442+
for _, s := range numaSlice {
443+
if len(reverselyFreeCoresMap[s]) > i {
444+
c := reverselyFreeCoresMap[s][i]
445+
if acc.needs(info.CPUsPerCore()) {
446+
acc.take(acc.getDetails().CPUsInCores(c))
447+
if acc.isSatisfied() {
448+
return true
449+
}
450+
}
451+
}
452+
}
453+
}
454+
return false
455+
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_load_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func TestThresholdMet(t *testing.T) {
386386

387387
for entryName, entries := range tt.podEntries {
388388
for subEntryName, entry := range entries {
389-
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
389+
stateImpl.SetAllocationInfo(entryName, subEntryName, entry, true)
390390

391391
if entries.IsPoolEntry() {
392392
continue
@@ -760,7 +760,7 @@ func TestGetTopEvictionPods(t *testing.T) {
760760

761761
for entryName, entries := range tt.podEntries {
762762
for subEntryName, entry := range entries {
763-
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
763+
stateImpl.SetAllocationInfo(entryName, subEntryName, entry, true)
764764

765765
if entries.IsPoolEntry() {
766766
continue
@@ -1630,7 +1630,7 @@ func TestCPUPressureLoadEviction_collectMetrics(t *testing.T) {
16301630
now := time.Now()
16311631
for entryName, entries := range tt.podEntries {
16321632
for subEntryName, entry := range entries {
1633-
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
1633+
stateImpl.SetAllocationInfo(entryName, subEntryName, entry, true)
16341634

16351635
if entries.IsPoolEntry() {
16361636
continue

pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_suppression_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
312312

313313
for entryName, entries := range tt.podEntries {
314314
for subEntryName, entry := range entries {
315-
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
315+
stateImpl.SetAllocationInfo(entryName, subEntryName, entry, true)
316316

317317
if entries.IsPoolEntry() {
318318
continue

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
464464
if p.applySidecarAllocationInfoFromMainContainer(allocationInfo, mainContainerAllocationInfo) {
465465
general.Infof("pod: %s/%s, container: %s sync allocation info from main container",
466466
allocationInfo.PodNamespace, allocationInfo.PodName, containerName)
467-
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
467+
p.state.SetAllocationInfo(podUID, containerName, allocationInfo, true)
468468
needUpdateMachineState = true
469469
}
470470
}
@@ -488,11 +488,11 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
488488
}
489489

490490
allocationInfo.InitTimestamp = time.Now().Format(util.QRMTimeFormat)
491-
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
491+
p.state.SetAllocationInfo(podUID, containerName, allocationInfo, true)
492492
} else if allocationInfo.RampUp && time.Now().After(initTs.Add(p.transitionPeriod)) {
493493
general.Infof("pod: %s/%s, container: %s ramp up finished", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
494494
allocationInfo.RampUp = false
495-
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
495+
p.state.SetAllocationInfo(podUID, containerName, allocationInfo, true)
496496

497497
if allocationInfo.CheckShared() {
498498
allocationInfosJustFinishRampUp = append(allocationInfosJustFinishRampUp, allocationInfo)
@@ -503,7 +503,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
503503
}
504504

505505
if len(allocationInfosJustFinishRampUp) > 0 {
506-
if err = p.putAllocationsAndAdjustAllocationEntries(allocationInfosJustFinishRampUp, true); err != nil {
506+
if err = p.putAllocationsAndAdjustAllocationEntries(allocationInfosJustFinishRampUp, true, true); err != nil {
507507
// not influencing return response to kubelet when putAllocationsAndAdjustAllocationEntries failed
508508
general.Errorf("putAllocationsAndAdjustAllocationEntries failed with error: %v", err)
509509
}
@@ -517,7 +517,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
517517
general.Errorf("GetResourcesAllocation GenerateMachineStateFromPodEntries failed with error: %v", err)
518518
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
519519
}
520-
p.state.SetMachineState(updatedMachineState)
520+
p.state.SetMachineState(updatedMachineState, true)
521521
}
522522

523523
podEntries = p.state.GetPodEntries()
@@ -843,14 +843,17 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
843843
if err != nil {
844844
resp = nil
845845
respErr = fmt.Errorf("add container to qos aware server failed with error: %v", err)
846-
_ = p.removeContainer(req.PodUid, req.ContainerName)
846+
_ = p.removeContainer(req.PodUid, req.ContainerName, false)
847847
}
848848
} else if respErr != nil {
849-
_ = p.removeContainer(req.PodUid, req.ContainerName)
849+
_ = p.removeContainer(req.PodUid, req.ContainerName, false)
850850
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw,
851851
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(respErr)},
852852
metrics.MetricTag{Key: util.MetricTagNameInplaceUpdateResizing, Val: strconv.FormatBool(util.PodInplaceUpdateResizing(req))})
853853
}
854+
if err := p.state.StoreState(); err != nil {
855+
general.ErrorS(err, "store state failed", "podName", req.PodName, "containerName", req.ContainerName)
856+
}
854857

855858
p.Unlock()
856859
if respErr != nil {
@@ -908,7 +911,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
908911
if p.allocationHandlers[qosLevel] == nil {
909912
return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel)
910913
}
911-
return p.allocationHandlers[qosLevel](ctx, req)
914+
return p.allocationHandlers[qosLevel](ctx, req, false)
912915
}
913916

914917
// AllocateForPod is called during pod admit so that the resource
@@ -961,34 +964,40 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context,
961964
}
962965
}
963966

964-
err = p.removePod(req.PodUid, podEntries)
967+
err = p.removePod(req.PodUid, podEntries, false)
965968
if err != nil {
966969
general.ErrorS(err, "remove pod failed with error", "podUID", req.PodUid)
967970
return nil, err
968971
}
969972

970-
aErr := p.adjustAllocationEntries()
973+
aErr := p.adjustAllocationEntries(false)
971974
if aErr != nil {
972975
general.ErrorS(aErr, "adjustAllocationEntries failed", "podUID", req.PodUid)
973976
}
977+
if err := p.state.StoreState(); err != nil {
978+
general.ErrorS(err, "store state failed", "podUID", req.PodUid)
979+
}
974980

975981
return &pluginapi.RemovePodResponse{}, nil
976982
}
977983

978-
func (p *DynamicPolicy) removePod(podUID string, podEntries state.PodEntries) error {
984+
func (p *DynamicPolicy) removePod(podUID string, podEntries state.PodEntries, persistCheckpoint bool) error {
979985
delete(podEntries, podUID)
980986

981987
updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
982988
if err != nil {
983989
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
984990
}
985991

986-
p.state.SetPodEntries(podEntries)
987-
p.state.SetMachineState(updatedMachineState)
992+
p.state.SetPodEntries(podEntries, false)
993+
p.state.SetMachineState(updatedMachineState, false)
994+
if persistCheckpoint {
995+
return p.state.StoreState()
996+
}
988997
return nil
989998
}
990999

991-
func (p *DynamicPolicy) removeContainer(podUID, containerName string) error {
1000+
func (p *DynamicPolicy) removeContainer(podUID, containerName string, persistCheckpoint bool) error {
9921001
podEntries := p.state.GetPodEntries()
9931002

9941003
found := false
@@ -1007,8 +1016,11 @@ func (p *DynamicPolicy) removeContainer(podUID, containerName string) error {
10071016
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
10081017
}
10091018

1010-
p.state.SetPodEntries(podEntries)
1011-
p.state.SetMachineState(updatedMachineState)
1019+
p.state.SetPodEntries(podEntries, false)
1020+
p.state.SetMachineState(updatedMachineState, false)
1021+
if persistCheckpoint {
1022+
return p.state.StoreState()
1023+
}
10121024
return nil
10131025
}
10141026

@@ -1074,8 +1086,11 @@ func (p *DynamicPolicy) cleanPools() error {
10741086
return fmt.Errorf("calculate machineState by podEntries failed with error: %v", err)
10751087
}
10761088

1077-
p.state.SetPodEntries(podEntries)
1078-
p.state.SetMachineState(machineState)
1089+
p.state.SetPodEntries(podEntries, false)
1090+
p.state.SetMachineState(machineState, false)
1091+
if err := p.state.StoreState(); err != nil {
1092+
general.ErrorS(err, "store state failed")
1093+
}
10791094
} else {
10801095
general.Infof("there is no pool to delete")
10811096
}
@@ -1105,7 +1120,7 @@ func (p *DynamicPolicy) initReservePool() error {
11051120
TopologyAwareAssignments: topologyAwareAssignments,
11061121
OriginalTopologyAwareAssignments: machine.DeepcopyCPUAssignment(topologyAwareAssignments),
11071122
}
1108-
p.state.SetAllocationInfo(commonstate.PoolNameReserve, commonstate.FakedContainerName, curReserveAllocationInfo)
1123+
p.state.SetAllocationInfo(commonstate.PoolNameReserve, commonstate.FakedContainerName, curReserveAllocationInfo, true)
11091124

11101125
return nil
11111126
}
@@ -1173,7 +1188,7 @@ func (p *DynamicPolicy) initReclaimPool() error {
11731188
TopologyAwareAssignments: topologyAwareAssignments,
11741189
OriginalTopologyAwareAssignments: machine.DeepcopyCPUAssignment(topologyAwareAssignments),
11751190
}
1176-
p.state.SetAllocationInfo(commonstate.PoolNameReclaim, commonstate.FakedContainerName, curPoolAllocationInfo)
1191+
p.state.SetAllocationInfo(commonstate.PoolNameReclaim, commonstate.FakedContainerName, curPoolAllocationInfo, true)
11771192
} else {
11781193
general.Infof("exist initial %s: %s", commonstate.PoolNameReclaim, reclaimedAllocationInfo.AllocationResult.String())
11791194
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(
458458
if curAllowSharedCoresOverlapReclaimedCores != resp.AllowSharedCoresOverlapReclaimedCores {
459459
general.Infof("set allowSharedCoresOverlapReclaimedCores from %v to %v",
460460
curAllowSharedCoresOverlapReclaimedCores, resp.AllowSharedCoresOverlapReclaimedCores)
461-
p.state.SetAllowSharedCoresOverlapReclaimedCores(resp.AllowSharedCoresOverlapReclaimedCores)
461+
p.state.SetAllowSharedCoresOverlapReclaimedCores(resp.AllowSharedCoresOverlapReclaimedCores, true)
462462
}
463463

464464
return nil
@@ -814,9 +814,11 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
814814
if err != nil {
815815
return fmt.Errorf("calculate machineState by newPodEntries failed with error: %v", err)
816816
}
817-
p.state.SetPodEntries(newEntries)
818-
p.state.SetMachineState(newMachineState)
819-
817+
p.state.SetPodEntries(newEntries, false)
818+
p.state.SetMachineState(newMachineState, false)
819+
if err := p.state.StoreState(); err != nil {
820+
general.ErrorS(err, "store state failed")
821+
}
820822
return nil
821823
}
822824

@@ -861,7 +863,7 @@ func (p *DynamicPolicy) applyNUMAHeadroom(resp *advisorapi.ListAndWatchResponse)
861863
advisorapi.ControlKnobKeyCPUNUMAHeadroom, cpuNUMAHeadroomValue, err)
862864
}
863865

864-
p.state.SetNUMAHeadroom(*cpuNUMAHeadroom)
866+
p.state.SetNUMAHeadroom(*cpuNUMAHeadroom, true)
865867
general.Infof("cpuNUMAHeadroom: %v", cpuNUMAHeadroom)
866868
}
867869

0 commit comments

Comments
 (0)