Skip to content

Commit 56023db

Browse files
authored
Merge pull request #896 from googs1025/automated-cherry-pick-of-#895-upstream-release-1.30
Automated cherry pick of #895: Compose a cached reader as a cacheOption when initializing a
2 parents ba1efa9 + 7bc5844 commit 56023db

14 files changed

+62
-40
lines changed

pkg/capacityscheduling/capacity_scheduling.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import (
4040
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
4141
"k8s.io/kubernetes/pkg/scheduler/metrics"
4242
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
43-
ctrlruntimecache "sigs.k8s.io/controller-runtime/pkg/cache"
4443
"sigs.k8s.io/controller-runtime/pkg/client"
4544

4645
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
@@ -127,18 +126,14 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
127126
}
128127
logger := klog.FromContext(ctx)
129128

130-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
129+
client, ccache, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
131130
if err != nil {
132131
return nil, err
133132
}
134133

135134
c.client = client
136-
dynamicCache, err := ctrlruntimecache.New(handle.KubeConfig(), ctrlruntimecache.Options{Scheme: scheme})
137-
if err != nil {
138-
return nil, err
139-
}
140135

141-
elasticQuotaInformer, err := dynamicCache.GetInformer(ctx, &v1alpha1.ElasticQuota{})
136+
elasticQuotaInformer, err := ccache.GetInformer(ctx, &v1alpha1.ElasticQuota{})
142137
if err != nil {
143138
return nil, err
144139
}

pkg/coscheduling/coscheduling.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3030
"k8s.io/klog/v2"
3131
"k8s.io/kubernetes/pkg/scheduler/framework"
32-
"sigs.k8s.io/controller-runtime/pkg/client"
3332

3433
"sigs.k8s.io/scheduler-plugins/apis/config"
3534
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
@@ -74,7 +73,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
7473
_ = clientscheme.AddToScheme(scheme)
7574
_ = v1.AddToScheme(scheme)
7675
_ = v1alpha1.AddToScheme(scheme)
77-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
76+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
7877
if err != nil {
7978
return nil, err
8079
}
@@ -84,7 +83,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
8483

8584
scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second
8685
pgMgr := core.NewPodGroupManager(
87-
client,
86+
c,
8887
handle.SnapshotSharedLister(),
8988
&scheduleTimeDuration,
9089
// Keep the podInformer (from frameworkHandle) as the single source of Pods.

pkg/networkaware/networkoverhead/networkoverhead.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import (
3030
corelisters "k8s.io/client-go/listers/core/v1"
3131
"k8s.io/klog/v2"
3232
"k8s.io/kubernetes/pkg/scheduler/framework"
33-
3433
"sigs.k8s.io/controller-runtime/pkg/client"
3534

3635
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3736
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
37+
"sigs.k8s.io/scheduler-plugins/pkg/util"
3838

3939
agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
4040
ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
@@ -147,16 +147,13 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
147147
if err != nil {
148148
return nil, err
149149
}
150-
client, err := client.New(handle.KubeConfig(), client.Options{
151-
Scheme: scheme,
152-
})
150+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
153151
if err != nil {
154152
return nil, err
155153
}
156154

157155
no := &NetworkOverhead{
158-
Client: client,
159-
156+
Client: c,
160157
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
161158
handle: handle,
162159
namespaces: args.Namespaces,

pkg/networkaware/topologicalsort/topologicalsort.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"k8s.io/klog/v2"
2727
"k8s.io/kubernetes/pkg/scheduler/framework"
2828
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
29-
3029
"sigs.k8s.io/controller-runtime/pkg/client"
3130

3231
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3332
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
33+
"sigs.k8s.io/scheduler-plugins/pkg/util"
3434

3535
agv1alpha "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
3636
)
@@ -81,15 +81,13 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
8181
return nil, err
8282
}
8383

84-
client, err := client.New(handle.KubeConfig(), client.Options{
85-
Scheme: scheme,
86-
})
84+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
8785
if err != nil {
8886
return nil, err
8987
}
9088

9189
pl := &TopologicalSort{
92-
Client: client,
90+
Client: c,
9391
handle: handle,
9492
namespaces: args.Namespaces,
9593
}

pkg/sysched/sysched.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
2424
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
25+
"sigs.k8s.io/scheduler-plugins/pkg/util"
2526
)
2627

2728
type SySched struct {
@@ -402,7 +403,7 @@ func getArgs(obj runtime.Object) (*pluginconfig.SySchedArgs, error) {
402403
}
403404

404405
// New initializes a new plugin and returns it.
405-
func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
406+
func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
406407
sc := SySched{handle: handle}
407408
sc.HostToPods = make(map[string][]*v1.Pod)
408409
sc.HostSyscalls = make(map[string]sets.Set[string])
@@ -425,12 +426,12 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew
425426

426427
v1beta1.AddToScheme(scheme)
427428

428-
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
429+
c, _, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme)
429430
if err != nil {
430431
return nil, err
431432
}
432433

433-
sc.client = client
434+
sc.client = c
434435

435436
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
436437

pkg/util/client_util.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package util
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/client-go/rest"
9+
"sigs.k8s.io/controller-runtime/pkg/cache"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
)
12+
13+
// NewClientWithCachedReader returns a controller runtime Client with cache-baked client.
14+
func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme *runtime.Scheme) (client.Client, cache.Cache, error) {
15+
ccache, err := cache.New(config, cache.Options{
16+
Scheme: scheme,
17+
})
18+
if err != nil {
19+
return nil, nil, err
20+
}
21+
go ccache.Start(ctx)
22+
if !ccache.WaitForCacheSync(ctx) {
23+
return nil, nil, fmt.Errorf("failed to sync cache")
24+
}
25+
c, err := client.New(config, client.Options{
26+
Scheme: scheme,
27+
Cache: &client.CacheOptions{
28+
Reader: ccache,
29+
},
30+
})
31+
return c, ccache, err
32+
}

test/integration/capacity_scheduling_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestCapacityScheduling(t *testing.T) {
4444
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
4545

4646
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
47-
extClient := util.NewClientOrDie(globalKubeConfig)
47+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
4848
testCtx.ClientSet = cs
4949
testCtx.KubeConfig = globalKubeConfig
5050

test/integration/coscheduling_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestCoschedulingPlugin(t *testing.T) {
4949
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5050

5151
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
52-
extClient := util.NewClientOrDie(globalKubeConfig)
52+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
5353
testCtx.ClientSet = cs
5454
testCtx.KubeConfig = globalKubeConfig
5555

@@ -382,7 +382,7 @@ func TestPodgroupBackoff(t *testing.T) {
382382
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
383383

384384
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
385-
extClient := util.NewClientOrDie(globalKubeConfig)
385+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
386386
testCtx.ClientSet = cs
387387
testCtx.KubeConfig = globalKubeConfig
388388

test/integration/elasticquota_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestElasticController(t *testing.T) {
5151
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5252

5353
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
54-
extClient := util.NewClientOrDie(globalKubeConfig)
54+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
5555
testCtx.ClientSet = cs
5656
testCtx.KubeConfig = globalKubeConfig
5757

test/integration/networkoverhead_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ import (
3737
st "k8s.io/kubernetes/pkg/scheduler/testing"
3838
imageutils "k8s.io/kubernetes/test/utils/image"
3939

40-
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
41-
4240
scheconfig "sigs.k8s.io/scheduler-plugins/apis/config"
4341
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
4442
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
43+
clientutil "sigs.k8s.io/scheduler-plugins/pkg/util"
4544
"sigs.k8s.io/scheduler-plugins/test/util"
4645

4746
appgroupapi "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup"
@@ -59,7 +58,7 @@ func TestNetworkOverheadPlugin(t *testing.T) {
5958
utilruntime.Must(agv1alpha1.AddToScheme(scheme))
6059
utilruntime.Must(ntv1alpha1.AddToScheme(scheme))
6160

62-
client, err := ctrlclient.New(globalKubeConfig, ctrlclient.Options{Scheme: scheme})
61+
client, _, err := clientutil.NewClientWithCachedReader(testCtx.Ctx, globalKubeConfig, scheme)
6362

6463
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
6564
testCtx.ClientSet = cs

test/integration/podgroup_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func TestPodGroupController(t *testing.T) {
5454
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
5555

5656
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
57-
extClient := util.NewClientOrDie(globalKubeConfig)
57+
extClient := util.NewClientOrDie(testCtx.Ctx, globalKubeConfig)
5858
testCtx.ClientSet = cs
5959
testCtx.KubeConfig = globalKubeConfig
6060

test/integration/sysched_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import (
3434
fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3535
st "k8s.io/kubernetes/pkg/scheduler/testing"
3636
imageutils "k8s.io/kubernetes/test/utils/image"
37-
"sigs.k8s.io/controller-runtime/pkg/client"
3837
schedconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3938
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
4039
"sigs.k8s.io/scheduler-plugins/pkg/sysched"
40+
clientutil "sigs.k8s.io/scheduler-plugins/pkg/util"
4141
"sigs.k8s.io/scheduler-plugins/test/util"
4242
spo "sigs.k8s.io/security-profiles-operator/api/seccompprofile/v1beta1"
4343
)
@@ -136,7 +136,7 @@ func TestSyschedPlugin(t *testing.T) {
136136
_ = v1alpha1.AddToScheme(scheme)
137137
_ = spo.AddToScheme(scheme)
138138

139-
extClient, err := client.New(globalKubeConfig, client.Options{Scheme: scheme})
139+
extClient, _, err := clientutil.NewClientWithCachedReader(testCtx.Ctx, globalKubeConfig, scheme)
140140
if err != nil {
141141
t.Fatal(err)
142142
}
@@ -204,7 +204,7 @@ func TestSyschedPlugin(t *testing.T) {
204204
for i := 0; i < 2; i++ {
205205
nodeName := fmt.Sprintf("fake-node-%d", i)
206206
node := st.MakeNode().Name(nodeName).Label("node", nodeName).Obj()
207-
//node.Spec.PodCIDR = "192.168.0.1/24"
207+
// node.Spec.PodCIDR = "192.168.0.1/24"
208208
node.Status.Addresses = make([]v1.NodeAddress, 1)
209209
ip := fmt.Sprintf("192.168.1.%v", 1+i)
210210
node.Status.Addresses[0] = v1.NodeAddress{

test/integration/topologicalsort_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ import (
3939
st "k8s.io/kubernetes/pkg/scheduler/testing"
4040
imageutils "k8s.io/kubernetes/test/utils/image"
4141

42-
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
43-
4442
scheconfig "sigs.k8s.io/scheduler-plugins/apis/config"
4543
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
4644
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
45+
clientutil "sigs.k8s.io/scheduler-plugins/pkg/util"
4746
"sigs.k8s.io/scheduler-plugins/test/util"
4847

4948
appgroupapi "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup"
@@ -58,7 +57,7 @@ func TestTopologicalSortPlugin(t *testing.T) {
5857
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
5958
utilruntime.Must(agv1alpha1.AddToScheme(scheme))
6059

61-
client, err := ctrlclient.New(globalKubeConfig, ctrlclient.Options{Scheme: scheme})
60+
client, _, err := clientutil.NewClientWithCachedReader(testCtx.Ctx, globalKubeConfig, scheme)
6261

6362
cs := kubernetes.NewForConfigOrDie(globalKubeConfig)
6463
testCtx.ClientSet = cs

test/util/fake.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package util
1818

1919
import (
20+
"context"
2021
"sync"
2122

2223
v1 "k8s.io/api/core/v1"
@@ -30,6 +31,7 @@ import (
3031
"sigs.k8s.io/controller-runtime/pkg/client"
3132
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3233
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
34+
"sigs.k8s.io/scheduler-plugins/pkg/util"
3335

3436
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
3537
)
@@ -290,13 +292,13 @@ func NewFakeClient(objs ...runtime.Object) (client.WithWatch, error) {
290292

291293
// NewClientOrDie returns a generic controller-runtime client or panic upon any error.
292294
// This function is used by integration tests.
293-
func NewClientOrDie(cfg *rest.Config) client.Client {
295+
func NewClientOrDie(ctx context.Context, cfg *rest.Config) client.Client {
294296
scheme := runtime.NewScheme()
295297
_ = clientscheme.AddToScheme(scheme)
296298
_ = v1.AddToScheme(scheme)
297299
_ = v1alpha1.AddToScheme(scheme)
298300

299-
c, err := client.New(cfg, client.Options{Scheme: scheme})
301+
c, _, err := util.NewClientWithCachedReader(ctx, cfg, scheme)
300302
if err != nil {
301303
panic(err)
302304
}

0 commit comments

Comments
 (0)