Skip to content

Commit 8f6cbb5

Browse files
authored
Merge pull request #633 from WangZzzhe/dev/npd-controller
feat: add npd controller
2 parents cb9c984 + 7cd341f commit 8f6cbb5

File tree

15 files changed

+1414
-0
lines changed

15 files changed

+1414
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/klog/v2"
24+
25+
katalyst "github.com/kubewharf/katalyst-core/cmd/base"
26+
"github.com/kubewharf/katalyst-core/pkg/config"
27+
"github.com/kubewharf/katalyst-core/pkg/controller/npd"
28+
)
29+
30+
const (
31+
NPDControllerName = "npd"
32+
)
33+
34+
func StartNPDController(ctx context.Context, controlCtx *katalyst.GenericContext,
35+
conf *config.Configuration, extraConf interface{}, _ string,
36+
) (bool, error) {
37+
if controlCtx == nil || conf == nil {
38+
err := fmt.Errorf("controlCtx and controllerConf can't be nil")
39+
klog.Error(err)
40+
return false, err
41+
}
42+
43+
npdController, err := npd.NewNPDController(
44+
ctx,
45+
controlCtx, conf.GenericConfiguration,
46+
conf.GenericControllerConfiguration,
47+
conf.NPDConfig,
48+
extraConf,
49+
)
50+
if err != nil {
51+
klog.Errorf("failed to new npd controller")
52+
return false, err
53+
}
54+
55+
go npdController.Run()
56+
return true, nil
57+
}

cmd/katalyst-controller/app/enablecontrollers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func init() {
5151
controllerInitializers.Store(controller.VPAControllerName, ControllerStarter{Starter: controller.StartVPAController})
5252
controllerInitializers.Store(controller.KCCControllerName, ControllerStarter{Starter: controller.StartKCCController})
5353
controllerInitializers.Store(controller.SPDControllerName, ControllerStarter{Starter: controller.StartSPDController})
54+
controllerInitializers.Store(controller.NPDControllerName, ControllerStarter{Starter: controller.StartNPDController})
5455
controllerInitializers.Store(controller.LifeCycleControllerName, ControllerStarter{Starter: controller.StartLifeCycleController})
5556
controllerInitializers.Store(controller.MonitorControllerName, ControllerStarter{Starter: controller.StartMonitorController})
5657
controllerInitializers.Store(controller.OvercommitControllerName, ControllerStarter{Starter: controller.StartOvercommitController})

cmd/katalyst-controller/app/options/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type ControllersOptions struct {
2828
*VPAOptions
2929
*KCCOptions
3030
*SPDOptions
31+
*NPDOptions
3132
*LifeCycleOptions
3233
*MonitorOptions
3334
*OvercommitOptions
@@ -40,6 +41,7 @@ func NewControllersOptions() *ControllersOptions {
4041
VPAOptions: NewVPAOptions(),
4142
KCCOptions: NewKCCOptions(),
4243
SPDOptions: NewSPDOptions(),
44+
NPDOptions: NewNPDOptions(),
4345
LifeCycleOptions: NewLifeCycleOptions(),
4446
MonitorOptions: NewMonitorOptions(),
4547
OvercommitOptions: NewOvercommitOptions(),
@@ -52,6 +54,7 @@ func (o *ControllersOptions) AddFlags(fss *cliflag.NamedFlagSets) {
5254
o.VPAOptions.AddFlags(fss)
5355
o.KCCOptions.AddFlags(fss)
5456
o.SPDOptions.AddFlags(fss)
57+
o.NPDOptions.AddFlags(fss)
5558
o.LifeCycleOptions.AddFlags(fss)
5659
o.MonitorOptions.AddFlags(fss)
5760
o.OvercommitOptions.AddFlags(fss)
@@ -66,6 +69,7 @@ func (o *ControllersOptions) ApplyTo(c *controllerconfig.ControllersConfiguratio
6669
errList = append(errList, o.VPAOptions.ApplyTo(c.VPAConfig))
6770
errList = append(errList, o.KCCOptions.ApplyTo(c.KCCConfig))
6871
errList = append(errList, o.SPDOptions.ApplyTo(c.SPDConfig))
72+
errList = append(errList, o.NPDOptions.ApplyTo(c.NPDConfig))
6973
errList = append(errList, o.LifeCycleOptions.ApplyTo(c.LifeCycleConfig))
7074
errList = append(errList, o.MonitorOptions.ApplyTo(c.MonitorConfig))
7175
errList = append(errList, o.OvercommitOptions.ApplyTo(c.OvercommitConfig))
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
cliflag "k8s.io/component-base/cli/flag"
21+
22+
"github.com/kubewharf/katalyst-core/pkg/config/controller"
23+
)
24+
25+
type NPDOptions struct {
26+
NPDMetricsPlugins []string
27+
EnableScopeDuplicated bool
28+
SyncWorkers int
29+
}
30+
31+
func NewNPDOptions() *NPDOptions {
32+
return &NPDOptions{
33+
NPDMetricsPlugins: []string{},
34+
EnableScopeDuplicated: false,
35+
SyncWorkers: 1,
36+
}
37+
}
38+
39+
func (o *NPDOptions) AddFlags(fss *cliflag.NamedFlagSets) {
40+
fs := fss.FlagSet("npd")
41+
42+
fs.StringSliceVar(&o.NPDMetricsPlugins, "npd-metrics-plugins", o.NPDMetricsPlugins,
43+
"A list of metrics plugins to be used")
44+
fs.BoolVar(&o.EnableScopeDuplicated, "npd-enable-scope-duplicated", o.EnableScopeDuplicated,
45+
"Whether metrics with the same scope can be updated by multiple plugins")
46+
fs.IntVar(&o.SyncWorkers, "npd-sync-workers", o.SyncWorkers,
47+
"Number of workers to sync npd status")
48+
}
49+
50+
func (o *NPDOptions) ApplyTo(c *controller.NPDConfig) error {
51+
c.NPDMetricsPlugins = o.NPDMetricsPlugins
52+
c.EnableScopeDuplicated = o.EnableScopeDuplicated
53+
c.SyncWorkers = o.SyncWorkers
54+
return nil
55+
}
56+
57+
func (o *NPDOptions) Config() (*controller.NPDConfig, error) {
58+
c := &controller.NPDConfig{}
59+
if err := o.ApplyTo(c); err != nil {
60+
return nil, err
61+
}
62+
63+
return c, nil
64+
}

pkg/client/control/npd.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package control
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
25+
"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
26+
clientset "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned"
27+
)
28+
29+
type NodeProfileControl interface {
30+
CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error)
31+
UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error)
32+
DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error
33+
}
34+
35+
type DummyNPDControl struct{}
36+
37+
func (d *DummyNPDControl) CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
38+
return nil, nil
39+
}
40+
41+
func (d *DummyNPDControl) UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
42+
return nil, nil
43+
}
44+
45+
func (d *DummyNPDControl) DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error {
46+
return nil
47+
}
48+
49+
type NPDControlImp struct {
50+
client clientset.Interface
51+
}
52+
53+
func NewNPDControlImp(client clientset.Interface) *NPDControlImp {
54+
return &NPDControlImp{
55+
client: client,
56+
}
57+
}
58+
59+
func (n *NPDControlImp) CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
60+
if npd == nil {
61+
return nil, fmt.Errorf("npd is nil")
62+
}
63+
64+
return n.client.NodeV1alpha1().NodeProfileDescriptors().Create(ctx, npd, opts)
65+
}
66+
67+
func (n *NPDControlImp) UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
68+
if npd == nil {
69+
return nil, fmt.Errorf("npd is nil")
70+
}
71+
72+
return n.client.NodeV1alpha1().NodeProfileDescriptors().UpdateStatus(ctx, npd, opts)
73+
}
74+
75+
func (n *NPDControlImp) DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error {
76+
return n.client.NodeV1alpha1().NodeProfileDescriptors().Delete(ctx, npdName, opts)
77+
}

pkg/config/controller/controller_base.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type ControllersConfiguration struct {
4646
*VPAConfig
4747
*KCCConfig
4848
*SPDConfig
49+
*NPDConfig
4950
*LifeCycleConfig
5051
*MonitorConfig
5152
*OvercommitConfig
@@ -63,6 +64,7 @@ func NewControllersConfiguration() *ControllersConfiguration {
6364
VPAConfig: NewVPAConfig(),
6465
KCCConfig: NewKCCConfig(),
6566
SPDConfig: NewSPDConfig(),
67+
NPDConfig: NewNPDConfig(),
6668
LifeCycleConfig: NewLifeCycleConfig(),
6769
MonitorConfig: NewMonitorConfig(),
6870
OvercommitConfig: NewOvercommitConfig(),

pkg/config/controller/npd.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
type NPDConfig struct {
20+
NPDMetricsPlugins []string
21+
22+
EnableScopeDuplicated bool
23+
24+
SyncWorkers int
25+
}
26+
27+
func NewNPDConfig() *NPDConfig {
28+
return &NPDConfig{
29+
NPDMetricsPlugins: []string{},
30+
EnableScopeDuplicated: false,
31+
SyncWorkers: 1,
32+
}
33+
}

pkg/controller/npd/handler.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package npd
18+
19+
import (
20+
v1 "k8s.io/api/core/v1"
21+
"k8s.io/client-go/tools/cache"
22+
"k8s.io/klog/v2"
23+
)
24+
25+
func (nc *NPDController) onNodeAdd(obj interface{}) {
26+
node, ok := obj.(*v1.Node)
27+
if !ok {
28+
klog.Errorf("[npd] cannot convert obj to *v1.node")
29+
return
30+
}
31+
32+
nc.enqueueNode(node)
33+
}
34+
35+
func (nc *NPDController) onNodeUpdate(_, newObj interface{}) {
36+
node, ok := newObj.(*v1.Node)
37+
if !ok {
38+
klog.Errorf("[npd] cannot convert obj to *v1.node")
39+
return
40+
}
41+
42+
nc.enqueueNode(node)
43+
}
44+
45+
func (nc *NPDController) onNodeDelete(obj interface{}) {
46+
node, ok := obj.(*v1.Node)
47+
if !ok {
48+
klog.Errorf("[npd] cannot convert obj to *v1.node")
49+
return
50+
}
51+
52+
err := nc.deleteNPD(node.Name)
53+
if err != nil {
54+
klog.Errorf("delete node %v fail: %v", node.Name, err)
55+
return
56+
}
57+
nc.metricsManager.DeleteNodeProfileStatus(node.Name)
58+
}
59+
60+
func (nc *NPDController) enqueueNode(node *v1.Node) {
61+
if node == nil {
62+
klog.Warningf("[npd] enqueue a nil node")
63+
return
64+
}
65+
66+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(node)
67+
if err != nil {
68+
klog.Errorf("[npd] couldn't get key for node: %v, err: %v", node.Name, err)
69+
return
70+
}
71+
72+
nc.nodeQueue.Add(key)
73+
}

0 commit comments

Comments
 (0)