Skip to content

Commit 0b75b6d

Browse files
committed
fix(advisor): optimize lw health check
1 parent a449bc9 commit 0b75b6d

File tree

7 files changed

+468
-14
lines changed

7 files changed

+468
-14
lines changed

pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) er
308308
klog.Infof("[qosaware-cpu] notify cpu server: %+v", calculationResult)
309309
return nil
310310
default:
311-
klog.Errorf("[qosaware-cpu] channel is full")
312-
return fmt.Errorf("calculation result channel is full")
311+
klog.Warningf("[qosaware-cpu] channel is full, drop the new advice")
312+
return nil
313313
}
314314
}
315315

pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ func (ra *memoryResourceAdvisor) sendAdvices() error {
180180
general.Infof("notify memory server: %+v", result)
181181
return nil
182182
default:
183-
general.Errorf("channel is full")
184-
return fmt.Errorf("memory advice channel is full")
183+
klog.Warningf("[qosaware-memory] channel is full, drop the new advice")
184+
return nil
185185
}
186186
}
187187

pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ func (cs *cpuServer) RegisterAdvisorServer() {
7575

7676
func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvisor_ListAndWatchServer) error {
7777
_ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWCalled), int64(cs.period.Seconds()), metrics.MetricTypeNameCount)
78-
general.RegisterHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
78+
general.RegisterTemporaryHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
79+
defer general.UnregisterTemporaryHeartbeatCheck(cpuServerHealthCheckName)
7980

8081
if !cs.getCheckpointCalled {
8182
if err := cs.startToGetCheckpointFromCPUPlugin(); err != nil {
@@ -91,6 +92,18 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi
9192
return fmt.Errorf("recvCh convert failed")
9293
}
9394

95+
maxDropLength := len(recvCh)
96+
klog.Infof("[qosaware-server-cpu] drop all old cpu advices in channel (max: %d)", maxDropLength)
97+
for i := 0; i < maxDropLength; i++ {
98+
select {
99+
case <-recvCh:
100+
default:
101+
klog.Infof("[qosaware-server-cpu] all old cpu advices in channel is dropped (count: %d)", i)
102+
break
103+
}
104+
}
105+
106+
klog.Infof("[qosaware-server-cpu] start to push cpu advices")
94107
for {
95108
select {
96109
case <-server.Context().Done():

pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go

Lines changed: 197 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,33 @@ func newTestCPUServer(t *testing.T, podList []*v1.Pod) *cpuServer {
9898
return cpuServer
9999
}
100100

101+
func newTestCPUServerWithChanBuffer(t *testing.T, podList []*v1.Pod) *cpuServer {
102+
recvCh := make(chan types.InternalCPUCalculationResult, 1)
103+
sendCh := make(chan types.TriggerInfo, 1)
104+
conf := generateTestConfiguration(t)
105+
106+
metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
107+
metaCache, err := metacache.NewMetaCacheImp(conf, metricspool.DummyMetricsEmitterPool{}, metricsFetcher)
108+
require.NoError(t, err)
109+
require.NotNil(t, metaCache)
110+
111+
metaServer := &metaserver.MetaServer{
112+
MetaAgent: &agent.MetaAgent{
113+
PodFetcher: &pod.PodFetcherStub{
114+
PodList: podList,
115+
},
116+
},
117+
}
118+
119+
cpuServer, err := NewCPUServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{})
120+
require.NoError(t, err)
121+
require.NotNil(t, cpuServer)
122+
123+
cpuServer.getCheckpointCalled = true
124+
125+
return cpuServer
126+
}
127+
101128
func TestCPUServerStartAndStop(t *testing.T) {
102129
t.Parallel()
103130

@@ -259,17 +286,17 @@ func DeepCopyResponse(response *cpuadvisor.ListAndWatchResponse) (*cpuadvisor.Li
259286
return copyResponse, nil
260287
}
261288

289+
type ContainerInfo struct {
290+
request *advisorsvc.ContainerMetadata
291+
podInfo *v1.Pod
292+
allocationInfo *cpuadvisor.AllocationInfo
293+
isolated bool
294+
regions sets.String
295+
}
296+
262297
func TestCPUServerListAndWatch(t *testing.T) {
263298
t.Parallel()
264299

265-
type ContainerInfo struct {
266-
request *advisorsvc.ContainerMetadata
267-
podInfo *v1.Pod
268-
allocationInfo *cpuadvisor.AllocationInfo
269-
isolated bool
270-
regions sets.String
271-
}
272-
273300
tests := []struct {
274301
name string
275302
provision types.InternalCPUCalculationResult
@@ -1708,3 +1735,165 @@ func TestConcurrencyGetCheckpointAndAddContainer(t *testing.T) {
17081735
time.Sleep(10 * time.Second)
17091736
cancel()
17101737
}
1738+
1739+
func TestCPUServerDropOldAdvice(t *testing.T) {
1740+
t.Parallel()
1741+
1742+
cpuServer := newTestCPUServerWithChanBuffer(t, []*v1.Pod{})
1743+
s := &mockCPUServerService_ListAndWatchServer{ResultsChan: make(chan *cpuadvisor.ListAndWatchResponse)}
1744+
stop := make(chan struct{})
1745+
recvCh := cpuServer.recvCh.(chan types.InternalCPUCalculationResult)
1746+
recvCh <- types.InternalCPUCalculationResult{}
1747+
go func() {
1748+
err := cpuServer.ListAndWatch(&advisorsvc.Empty{}, s)
1749+
assert.NoError(t, err, "failed to LW cpu server")
1750+
stop <- struct{}{}
1751+
}()
1752+
provision := types.InternalCPUCalculationResult{
1753+
TimeStamp: time.Now(),
1754+
PoolEntries: map[string]map[int]int{
1755+
state.PoolNameReclaim: {
1756+
0: 4,
1757+
1: 8,
1758+
},
1759+
},
1760+
}
1761+
infos := []*ContainerInfo{
1762+
{
1763+
request: &advisorsvc.ContainerMetadata{
1764+
PodUid: "pod1",
1765+
ContainerName: "c1",
1766+
Annotations: map[string]string{
1767+
consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable,
1768+
},
1769+
QosLevel: consts.PodAnnotationQoSLevelDedicatedCores,
1770+
},
1771+
podInfo: &v1.Pod{
1772+
ObjectMeta: metav1.ObjectMeta{
1773+
Namespace: "default",
1774+
Name: "pod1",
1775+
UID: "pod1",
1776+
Annotations: map[string]string{
1777+
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
1778+
consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}",
1779+
},
1780+
},
1781+
Spec: v1.PodSpec{
1782+
Containers: []v1.Container{
1783+
{
1784+
Name: "c1",
1785+
},
1786+
},
1787+
},
1788+
},
1789+
allocationInfo: &cpuadvisor.AllocationInfo{
1790+
OwnerPoolName: state.PoolNameDedicated,
1791+
TopologyAwareAssignments: map[uint64]string{
1792+
0: "0-3",
1793+
1: "24-47",
1794+
},
1795+
},
1796+
},
1797+
}
1798+
for _, info := range infos {
1799+
assert.NoError(t, cpuServer.addContainer(info.request))
1800+
assert.NoError(t, cpuServer.updateContainerInfo(info.request.PodUid, info.request.ContainerName, info.podInfo, info.allocationInfo))
1801+
1802+
nodeInfo, _ := cpuServer.metaCache.GetContainerInfo(info.request.PodUid, info.request.ContainerName)
1803+
nodeInfo.Isolated = info.isolated
1804+
if info.regions.Len() > 0 {
1805+
nodeInfo.RegionNames = info.regions
1806+
}
1807+
assert.NoError(t, cpuServer.metaCache.SetContainerInfo(info.request.PodUid, info.request.ContainerName, nodeInfo))
1808+
}
1809+
1810+
recvCh <- provision
1811+
res := <-s.ResultsChan
1812+
close(cpuServer.stopCh)
1813+
<-stop
1814+
copyres, err := DeepCopyResponse(res)
1815+
assert.NoError(t, err)
1816+
wantRes := &cpuadvisor.ListAndWatchResponse{
1817+
Entries: map[string]*cpuadvisor.CalculationEntries{
1818+
state.PoolNameReclaim: {
1819+
Entries: map[string]*cpuadvisor.CalculationInfo{
1820+
"": {
1821+
OwnerPoolName: state.PoolNameReclaim,
1822+
CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{
1823+
0: {
1824+
Blocks: []*cpuadvisor.Block{
1825+
{
1826+
Result: 4,
1827+
OverlapTargets: []*cpuadvisor.OverlapTarget{
1828+
{
1829+
OverlapTargetPodUid: "pod1",
1830+
OverlapTargetContainerName: "c1",
1831+
OverlapType: cpuadvisor.OverlapType_OverlapWithPod,
1832+
},
1833+
},
1834+
},
1835+
},
1836+
},
1837+
1: {
1838+
Blocks: []*cpuadvisor.Block{
1839+
{
1840+
Result: 8,
1841+
OverlapTargets: []*cpuadvisor.OverlapTarget{
1842+
{
1843+
OverlapTargetPodUid: "pod1",
1844+
OverlapTargetContainerName: "c1",
1845+
OverlapType: cpuadvisor.OverlapType_OverlapWithPod,
1846+
},
1847+
},
1848+
},
1849+
},
1850+
},
1851+
},
1852+
},
1853+
},
1854+
},
1855+
"pod1": {
1856+
Entries: map[string]*cpuadvisor.CalculationInfo{
1857+
"c1": {
1858+
OwnerPoolName: state.PoolNameDedicated,
1859+
CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{
1860+
0: {
1861+
Blocks: []*cpuadvisor.Block{
1862+
{
1863+
Result: 4,
1864+
OverlapTargets: []*cpuadvisor.OverlapTarget{
1865+
{
1866+
OverlapTargetPoolName: state.PoolNameReclaim,
1867+
OverlapType: cpuadvisor.OverlapType_OverlapWithPool,
1868+
},
1869+
},
1870+
},
1871+
},
1872+
},
1873+
1: {
1874+
Blocks: []*cpuadvisor.Block{
1875+
{
1876+
Result: 16,
1877+
OverlapTargets: nil,
1878+
},
1879+
{
1880+
Result: 8,
1881+
OverlapTargets: []*cpuadvisor.OverlapTarget{
1882+
{
1883+
OverlapTargetPoolName: state.PoolNameReclaim,
1884+
OverlapType: cpuadvisor.OverlapType_OverlapWithPool,
1885+
},
1886+
},
1887+
},
1888+
},
1889+
},
1890+
},
1891+
},
1892+
},
1893+
},
1894+
},
1895+
}
1896+
if !reflect.DeepEqual(copyres, wantRes) {
1897+
t.Errorf("ListAndWatch()\ngot = %+v, \nwant= %+v", general.ToString(copyres), general.ToString(wantRes))
1898+
}
1899+
}

pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,27 @@ func (ms *memoryServer) listContainers() error {
140140

141141
func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.AdvisorService_ListAndWatchServer) error {
142142
_ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWCalled), int64(ms.period.Seconds()), metrics.MetricTypeNameCount)
143-
general.RegisterHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
143+
general.RegisterTemporaryHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
144+
defer general.UnregisterTemporaryHeartbeatCheck(memoryServerHealthCheckName)
144145

145146
recvCh, ok := ms.recvCh.(chan types.InternalMemoryCalculationResult)
146147
if !ok {
147148
return fmt.Errorf("recvCh convert failed")
148149
}
149150
ms.listAndWatchCalled = true
150151

152+
maxDropLength := len(recvCh)
153+
klog.Infof("[qosaware-server-memory] drop all old memory advices in channel (max: %d)", maxDropLength)
154+
for i := 0; i < maxDropLength; i++ {
155+
select {
156+
case <-recvCh:
157+
default:
158+
klog.Infof("[qosaware-server-memory] all old memory advice in channel is dropped (max: %d)", i)
159+
break
160+
}
161+
}
162+
163+
klog.Infof("[qosaware-server-memory] start to push memory advice")
151164
for {
152165
select {
153166
case <-server.Context().Done():

0 commit comments

Comments
 (0)