Skip to content

Commit da29397

Browse files
authored
Merge pull request #821 from luomingmeng/dev/check-cpu-set-support-compare-with-request
feat(cpu/dynamicpolicy): add metrics for CPU request exceeding CPU set size
2 parents a0b161a + 516da2d commit da29397

File tree

4 files changed

+401
-4
lines changed

4 files changed

+401
-4
lines changed

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -850,10 +850,13 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
850850
_ = p.removeContainer(req.PodUid, req.ContainerName, false)
851851
}
852852
} else if respErr != nil {
853-
_ = p.removeContainer(req.PodUid, req.ContainerName, false)
853+
inplaceUpdateResizing := util.PodInplaceUpdateResizing(req)
854+
if !inplaceUpdateResizing {
855+
_ = p.removeContainer(req.PodUid, req.ContainerName, false)
856+
}
854857
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw,
855858
metrics.MetricTag{Key: "error_message", Val: metric.MetricTagValueFormat(respErr)},
856-
metrics.MetricTag{Key: util.MetricTagNameInplaceUpdateResizing, Val: strconv.FormatBool(util.PodInplaceUpdateResizing(req))})
859+
metrics.MetricTag{Key: util.MetricTagNameInplaceUpdateResizing, Val: strconv.FormatBool(inplaceUpdateResizing)})
857860
}
858861
if err := p.state.StoreState(); err != nil {
859862
general.ErrorS(err, "store state failed", "podName", req.PodName, "containerName", req.ContainerName)

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/kubewharf/katalyst-api/pkg/consts"
2828
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
29+
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
2930
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
3031
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
3132
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
@@ -37,6 +38,11 @@ import (
3738
cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
3839
"github.com/kubewharf/katalyst-core/pkg/util/general"
3940
"github.com/kubewharf/katalyst-core/pkg/util/machine"
41+
"github.com/kubewharf/katalyst-core/pkg/util/native"
42+
)
43+
44+
const (
45+
metricsNamePodTotalRequestLargerThanBindingCPUSet = "pod_total_request_larger_than_cpu_set"
4046
)
4147

4248
// checkCPUSet emit errors if the memory allocation falls into unexpected results
@@ -166,9 +172,139 @@ func (p *DynamicPolicy) checkCPUSet(_ *coreconfig.Configuration,
166172
_ = p.emitter.StoreInt64(util.MetricNameCPUSetOverlap, 1, metrics.MetricTypeNameRaw)
167173
}
168174

175+
p.checkCPUSetWithPodTotalRequest(podEntries, actualCPUSets)
169176
general.Infof("finish checkCPUSet")
170177
}
171178

179+
type cpusetPodState struct {
180+
cpuset machine.CPUSet
181+
totalMilliCPURequest int64
182+
podUIDs sets.String
183+
podMap map[string]*v1.Pod
184+
}
185+
186+
func (p *DynamicPolicy) checkCPUSetWithPodTotalRequest(
187+
podEntries state.PodEntries,
188+
actualCPUSets map[string]map[string]machine.CPUSet,
189+
) {
190+
ctx := context.Background()
191+
cpusetPodStateMap := p.buildCPUSetPodStateMap(ctx, podEntries, actualCPUSets)
192+
p.checkAndEmitMetrics(podEntries, cpusetPodStateMap)
193+
general.Infof("finish checkCPUSetWithPodTotalRequest")
194+
}
195+
196+
func (p *DynamicPolicy) buildCPUSetPodStateMap(ctx context.Context, podEntries state.PodEntries, actualCPUSets map[string]map[string]machine.CPUSet) map[string]*cpusetPodState {
197+
cpusetPodStateMap := make(map[string]*cpusetPodState)
198+
199+
// Build initial cpuset state map
200+
for podUID, containerCPUSets := range actualCPUSets {
201+
for _, cset := range containerCPUSets {
202+
mainContainerEntry := podEntries[podUID].GetMainContainerEntry()
203+
if mainContainerEntry == nil || mainContainerEntry.CheckReclaimed() {
204+
continue
205+
}
206+
207+
csetStr := cset.String()
208+
if _, ok := cpusetPodStateMap[csetStr]; !ok {
209+
cpusetPodStateMap[csetStr] = &cpusetPodState{
210+
cpuset: cset,
211+
podUIDs: sets.NewString(),
212+
}
213+
}
214+
cpusetPodStateMap[csetStr].podUIDs.Insert(podUID)
215+
}
216+
}
217+
218+
// Populate pod info and calculate total CPU requests
219+
for csetStr, cs := range cpusetPodStateMap {
220+
podMap := make(map[string]*v1.Pod, cs.podUIDs.Len())
221+
totalMilliCPURequest := int64(0)
222+
223+
for _, podUID := range cs.podUIDs.List() {
224+
pod, err := p.metaServer.GetPod(ctx, podUID)
225+
if err != nil {
226+
general.Errorf("get pod: %s failed with error: %v", podUID, err)
227+
continue
228+
}
229+
230+
if !native.PodIsActive(pod) {
231+
continue
232+
}
233+
234+
resources := native.SumUpPodRequestResources(pod)
235+
totalMilliCPURequest += resources.Cpu().MilliValue()
236+
podMap[podUID] = pod
237+
}
238+
239+
cs.podMap = podMap
240+
cs.totalMilliCPURequest = totalMilliCPURequest
241+
general.Infof("cpuset: %s, size: %d, totalMilliCPURequest: %d, podUIDs: %v", csetStr, cs.cpuset.Size(),
242+
totalMilliCPURequest, cs.podUIDs.List())
243+
}
244+
245+
return cpusetPodStateMap
246+
}
247+
248+
func (p *DynamicPolicy) checkAndEmitMetrics(podEntries state.PodEntries, cpusetPodStateMap map[string]*cpusetPodState) {
249+
allowSharedCoresOverlapReclaimedCores := p.state.GetAllowSharedCoresOverlapReclaimedCores()
250+
251+
for cpuset, cs := range cpusetPodStateMap {
252+
totalMilliCPURequest := p.calculateTotalCPURequest(cpuset, cs, cpusetPodStateMap)
253+
exceededRatio := float64(totalMilliCPURequest-int64(cs.cpuset.Size()*1000)) / float64(cs.totalMilliCPURequest)
254+
255+
if exceededRatio > 0 {
256+
p.emitExceededMetrics(podEntries, cpuset, cs, exceededRatio, allowSharedCoresOverlapReclaimedCores)
257+
}
258+
}
259+
}
260+
261+
func (p *DynamicPolicy) calculateTotalCPURequest(cpuset string, cs *cpusetPodState, cpusetPodStateMap map[string]*cpusetPodState) int64 {
262+
totalMilliCPURequest := cs.totalMilliCPURequest
263+
264+
// Add requests from subsets
265+
for otherCPUSet, otherState := range cpusetPodStateMap {
266+
if cpuset == otherCPUSet {
267+
continue
268+
}
269+
270+
if otherState.cpuset.IsSubsetOf(cs.cpuset) {
271+
totalMilliCPURequest += otherState.totalMilliCPURequest
272+
}
273+
}
274+
275+
return totalMilliCPURequest
276+
}
277+
278+
func (p *DynamicPolicy) emitExceededMetrics(
279+
podEntries state.PodEntries,
280+
cpuset string,
281+
cs *cpusetPodState,
282+
exceededRatio float64,
283+
allowSharedCoresOverlapReclaimedCores bool,
284+
) {
285+
enableReclaim := p.dynamicConfig.GetDynamicConfiguration().EnableReclaim
286+
for podUID, pod := range cs.podMap {
287+
mainContainerEntry := podEntries[podUID].GetMainContainerEntry()
288+
if mainContainerEntry == nil ||
289+
(mainContainerEntry.CheckShared() && enableReclaim && !allowSharedCoresOverlapReclaimedCores) {
290+
continue
291+
}
292+
293+
general.Errorf("pod: %s/%s, ownerPoolName: %s, qosLevel: %s, cpuset: %s, size %d, exceeds total cpu request: %.3f, exceeded ratio: %.3f",
294+
pod.Namespace, pod.Name, mainContainerEntry.OwnerPoolName, mainContainerEntry.QoSLevel, cpuset, cs.cpuset.Size(),
295+
float64(cs.totalMilliCPURequest)/1000, exceededRatio)
296+
297+
_ = p.emitter.StoreFloat64(metricsNamePodTotalRequestLargerThanBindingCPUSet, exceededRatio, metrics.MetricTypeNameRaw, []metrics.MetricTag{
298+
{Key: "podNamespace", Val: pod.Namespace},
299+
{Key: "podName", Val: pod.Name},
300+
{Key: "qosLevel", Val: mainContainerEntry.QoSLevel},
301+
{Key: "ownerPoolName", Val: mainContainerEntry.OwnerPoolName},
302+
{Key: "poolType", Val: commonstate.GetPoolType(mainContainerEntry.OwnerPoolName)},
303+
{Key: "cpuset", Val: cpuset},
304+
}...)
305+
}
306+
}
307+
172308
// clearResidualState is used to clean residual pods in local state
173309
func (p *DynamicPolicy) clearResidualState(_ *coreconfig.Configuration,
174310
_ interface{},

0 commit comments

Comments
 (0)