Skip to content

Commit 5268e8e

Browse files
authored
Merge pull request #453 from WangZzzhe/dev/orm-podresources
feat(ORM): add orm podResources server
2 parents 0c698ae + b4f1665 commit 5268e8e

File tree

25 files changed

+2333
-46
lines changed

25 files changed

+2333
-46
lines changed

cmd/katalyst-agent/app/agent/orm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
const (
30-
ORMAgent = "katalyst-agent-orm"
30+
ORMAgent = orm.ORMAgentName
3131
)
3232

3333
func InitORM(agentCtx *GenericContext, conf *config.Configuration, _ interface{}, _ string) (bool, Component, error) {

cmd/katalyst-agent/app/options/orm/orm_base.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,26 @@ import (
2525
)
2626

2727
type GenericORMPluginOptions struct {
28-
ORMRconcilePeriod time.Duration
29-
ORMResourceNamesMap map[string]string
30-
ORMPodNotifyChanLen int
31-
TopologyPolicyName string
32-
NumericAlignResources []string
28+
ORMRconcilePeriod time.Duration
29+
ORMResourceNamesMap map[string]string
30+
ORMPodNotifyChanLen int
31+
TopologyPolicyName string
32+
NumericAlignResources []string
33+
ORMPodResourcesSocket string
34+
ORMDevicesProvider string
35+
ORMKubeletPodResourcesEndpoints []string
3336
}
3437

3538
func NewGenericORMPluginOptions() *GenericORMPluginOptions {
3639
return &GenericORMPluginOptions{
37-
ORMRconcilePeriod: time.Second * 5,
38-
ORMResourceNamesMap: map[string]string{},
39-
ORMPodNotifyChanLen: 10,
40-
TopologyPolicyName: "none",
41-
NumericAlignResources: []string{"cpu", "memory"},
40+
ORMRconcilePeriod: time.Second * 5,
41+
ORMResourceNamesMap: map[string]string{},
42+
ORMPodNotifyChanLen: 10,
43+
TopologyPolicyName: "",
44+
NumericAlignResources: []string{"cpu", "memory"},
45+
ORMPodResourcesSocket: "unix:/var/lib/katalyst/pod-resources/kubelet.sock",
46+
ORMDevicesProvider: "",
47+
ORMKubeletPodResourcesEndpoints: []string{"/var/lib/kubelet/pod-resources/kubelet.sock"},
4248
}
4349
}
4450

@@ -58,6 +64,12 @@ func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
5864
o.TopologyPolicyName, "topology merge policy name used by ORM")
5965
fs.StringSliceVar(&o.NumericAlignResources, "numeric-align-resources", o.NumericAlignResources,
6066
"resources which should be aligned in numeric topology policy")
67+
fs.StringVar(&o.ORMPodResourcesSocket, "orm-pod-resources-socket", o.ORMPodResourcesSocket,
68+
"socket of ORM pod resource api, default 'unix:/var/lib/katalyst/pod-resources/kubelet.sock'")
69+
fs.StringVar(&o.ORMDevicesProvider, "orm-devices-provider", o.ORMDevicesProvider,
70+
"devices provider provides devices resources and allocatable for ORM podResources api")
71+
fs.StringSliceVar(&o.ORMKubeletPodResourcesEndpoints, "orm-kubelet-pod-resources-endpoints", o.ORMKubeletPodResourcesEndpoints,
72+
"kubelet podResources endpoints for ORM kubelet devices provider")
6173
}
6274

6375
func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error {
@@ -66,6 +78,9 @@ func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguratio
6678
conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen
6779
conf.TopologyPolicyName = o.TopologyPolicyName
6880
conf.NumericAlignResources = o.NumericAlignResources
81+
conf.ORMPodResourcesSocket = o.ORMPodResourcesSocket
82+
conf.ORMDevicesProvider = o.ORMDevicesProvider
83+
conf.ORMKubeletPodResourcesEndpoints = o.ORMKubeletPodResourcesEndpoints
6984

7085
return nil
7186
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/evanphx/json-patch v5.6.0+incompatible
1212
github.com/fsnotify/fsnotify v1.5.4
1313
github.com/gogo/protobuf v1.3.2
14+
github.com/golang/mock v1.6.0
1415
github.com/golang/protobuf v1.5.2
1516
github.com/google/cadvisor v0.44.2
1617
github.com/google/uuid v1.3.0
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubelet
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"time"
23+
24+
"google.golang.org/grpc"
25+
"k8s.io/klog/v2"
26+
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
27+
28+
podresourcesserver "github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources"
29+
"github.com/kubewharf/katalyst-core/pkg/util/general"
30+
"github.com/kubewharf/katalyst-core/pkg/util/kubelet/podresources"
31+
)
32+
33+
const (
34+
podResourcesClientTimeout = 10 * time.Second
35+
podResourcesClientMaxMsgSize = 1024 * 1024 * 16
36+
)
37+
38+
// Provider provide devices resource by kubelet v1 podResources api
39+
type Provider struct {
40+
client podresourcesapi.PodResourcesListerClient
41+
endpoints []string
42+
conn *grpc.ClientConn
43+
}
44+
45+
func NewProvider(endpoints []string, getClientFunc podresources.GetClientFunc) (podresourcesserver.DevicesProvider, error) {
46+
klog.V(5).Infof("new kubelet devices provider, endpoints: %v", endpoints)
47+
48+
p := &Provider{
49+
endpoints: endpoints,
50+
}
51+
52+
var err error
53+
54+
p.client, p.conn, err = getClientFunc(
55+
general.GetOneExistPath(endpoints),
56+
podResourcesClientTimeout,
57+
podResourcesClientMaxMsgSize)
58+
if err != nil {
59+
klog.Error(err)
60+
return nil, err
61+
}
62+
63+
return p, nil
64+
}
65+
66+
func (p *Provider) GetDevices() []*podresourcesapi.PodResources {
67+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
68+
defer cancel()
69+
70+
response, err := p.client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
71+
if err != nil {
72+
klog.Errorf("list resources from kubelet fail: %v", err)
73+
return nil
74+
}
75+
if response == nil {
76+
klog.Error("list resources from kubelet, get nil response without err")
77+
return nil
78+
}
79+
if response.GetPodResources() == nil {
80+
klog.Error("list resources from kubelet, get nil podResources without err")
81+
return nil
82+
}
83+
84+
if klog.V(6).Enabled() {
85+
str, _ := json.Marshal(response)
86+
klog.Infof("GetDevices: %v", string(str))
87+
}
88+
89+
return response.PodResources
90+
}
91+
92+
func (p *Provider) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
93+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
94+
defer cancel()
95+
96+
response, err := p.client.GetAllocatableResources(ctx, &podresourcesapi.AllocatableResourcesRequest{})
97+
if err != nil {
98+
klog.Errorf("GetAllocatableResources from kubelet fail: %v", err)
99+
return nil
100+
}
101+
if response == nil {
102+
klog.Error("GetAllocatableResources from kubelet, get nil response without err")
103+
return nil
104+
}
105+
if response.GetDevices() == nil {
106+
klog.Error("GetAllocatableResources from kubelet, get nil response without err")
107+
return nil
108+
}
109+
110+
if klog.V(6).Enabled() {
111+
str, _ := json.Marshal(response)
112+
klog.Infof("GetAllocatableDevices: %v", str)
113+
}
114+
return response.GetDevices()
115+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubelet
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/assert"
26+
"google.golang.org/grpc"
27+
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
28+
)
29+
30+
func TestGetDevice(t *testing.T) {
31+
t.Parallel()
32+
33+
mockProvider := &Provider{
34+
client: &mockPodResourcesListerClient{
35+
ListPodResourcesResponse: &v1.ListPodResourcesResponse{
36+
PodResources: []*v1.PodResources{},
37+
},
38+
},
39+
}
40+
41+
resp := mockProvider.GetDevices()
42+
assert.NotNil(t, resp)
43+
44+
nilPodResourcesProvider := &Provider{
45+
client: &mockPodResourcesListerClient{
46+
ListPodResourcesResponse: &v1.ListPodResourcesResponse{},
47+
},
48+
}
49+
resp = nilPodResourcesProvider.GetDevices()
50+
assert.Nil(t, resp)
51+
52+
nilRespProvider := &Provider{
53+
client: &mockPodResourcesListerClient{
54+
ListPodResourcesResponse: nil,
55+
},
56+
}
57+
resp = nilRespProvider.GetDevices()
58+
assert.Nil(t, resp)
59+
60+
errPodResourceProvider := &Provider{
61+
client: &errPodResourceListerClient{},
62+
}
63+
resp = errPodResourceProvider.GetDevices()
64+
assert.Nil(t, resp)
65+
}
66+
67+
func TestGetAllocatableDevices(t *testing.T) {
68+
t.Parallel()
69+
70+
mockProvider := &Provider{
71+
client: &mockPodResourcesListerClient{
72+
AllocatableResourcesResponse: &v1.AllocatableResourcesResponse{
73+
Devices: []*v1.ContainerDevices{},
74+
},
75+
},
76+
}
77+
resp := mockProvider.GetAllocatableDevices()
78+
assert.NotNil(t, resp)
79+
80+
nilRespProvider := &Provider{
81+
client: &mockPodResourcesListerClient{
82+
AllocatableResourcesResponse: nil,
83+
},
84+
}
85+
resp = nilRespProvider.GetAllocatableDevices()
86+
assert.Nil(t, resp)
87+
88+
nilDeviceProvider := &Provider{
89+
client: &mockPodResourcesListerClient{
90+
AllocatableResourcesResponse: &v1.AllocatableResourcesResponse{},
91+
},
92+
}
93+
resp = nilDeviceProvider.GetAllocatableDevices()
94+
assert.Nil(t, resp)
95+
96+
errProvider := &Provider{
97+
client: &errPodResourceListerClient{},
98+
}
99+
resp = errProvider.GetAllocatableDevices()
100+
assert.Nil(t, resp)
101+
}
102+
103+
func TestNewProvider(t *testing.T) {
104+
t.Parallel()
105+
106+
p, err := NewProvider([]string{}, getMockClientFunc)
107+
assert.NoError(t, err)
108+
assert.NotNil(t, p)
109+
}
110+
111+
type mockPodResourcesListerClient struct {
112+
*v1.ListPodResourcesResponse
113+
*v1.AllocatableResourcesResponse
114+
}
115+
116+
func (f *mockPodResourcesListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
117+
return f.ListPodResourcesResponse, nil
118+
}
119+
120+
func (f *mockPodResourcesListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
121+
return f.AllocatableResourcesResponse, nil
122+
}
123+
124+
type errPodResourceListerClient struct{}
125+
126+
func (e *errPodResourceListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
127+
return nil, fmt.Errorf("err")
128+
}
129+
130+
func (e *errPodResourceListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
131+
return nil, fmt.Errorf("err")
132+
}
133+
134+
func getMockClientFunc(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1.PodResourcesListerClient, *grpc.ClientConn, error) {
135+
return &mockPodResourcesListerClient{}, nil, nil
136+
}

pkg/agent/orm/endpoint/endpoint.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type Endpoint interface {
4242
IsStopped() bool
4343
StopGracePeriodExpired() bool
4444
GetResourcePluginOptions(ctx context.Context, in *pluginapi.Empty, opts ...grpc.CallOption) (*pluginapi.ResourcePluginOptions, error)
45+
GetTopologyAwareAllocatableResources(c context.Context, request *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error)
46+
GetTopologyAwareResources(c context.Context, request *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error)
4547
}
4648

4749
type EndpointInfo struct {
@@ -161,6 +163,24 @@ func (e *EndpointImpl) GetTopologyHints(c context.Context, resourceRequest *plug
161163
return e.client.GetTopologyHints(ctx, resourceRequest)
162164
}
163165

166+
func (e *EndpointImpl) GetTopologyAwareAllocatableResources(c context.Context, request *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) {
167+
if e.IsStopped() {
168+
return nil, fmt.Errorf(errEndpointStopped, e)
169+
}
170+
ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetTopologyAwareAllocatableResourcesRPCTimeoutInSecs*time.Second)
171+
defer cancel()
172+
return e.client.GetTopologyAwareAllocatableResources(ctx, request)
173+
}
174+
175+
func (e *EndpointImpl) GetTopologyAwareResources(c context.Context, request *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) {
176+
if e.IsStopped() {
177+
return nil, fmt.Errorf(errEndpointStopped, e)
178+
}
179+
ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetTopologyAwareResourcesRPCTimeoutInSecs*time.Second)
180+
defer cancel()
181+
return e.client.GetTopologyAwareResources(ctx, request)
182+
}
183+
164184
// dial establishes the gRPC communication with the registered resource plugin. https://godoc.org/google.golang.org/grpc#Dial
165185
func dial(unixSocketPath string) (pluginapi.ResourcePluginClient, *grpc.ClientConn, error) {
166186
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

0 commit comments

Comments
 (0)