Skip to content

Commit 0a17231

Browse files
committed
[feat] csi resizer for blockstorage volume in k8s
1 parent 3436215 commit 0a17231

File tree

9 files changed

+902
-14
lines changed

9 files changed

+902
-14
lines changed

cmd/main.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,46 @@ package main
22

33
import (
44
"context"
5+
"flag"
6+
"fmt"
7+
"net/http"
8+
"os"
59
"time"
610

11+
"github.com/kubernetes-csi/csi-lib-utils/metrics"
12+
lcoorV1 "k8s.io/api/coordination/v1"
13+
"k8s.io/apimachinery/pkg/util/wait"
14+
"k8s.io/client-go/informers"
15+
"k8s.io/client-go/kubernetes"
16+
"k8s.io/client-go/rest"
17+
"k8s.io/client-go/tools/cache"
18+
"k8s.io/client-go/tools/clientcmd"
19+
"k8s.io/client-go/util/workqueue"
20+
"k8s.io/klog/v2"
21+
722
lcsi "github.com/vngcloud/vngcloud-csi-volume-modifier/pkg/client"
23+
lcontroller "github.com/vngcloud/vngcloud-csi-volume-modifier/pkg/controller"
24+
lmodifier "github.com/vngcloud/vngcloud-csi-volume-modifier/pkg/modifier"
25+
)
26+
27+
// _____________________________________________________________________________________________________________________CONFIGURATION VARIABLES
28+
29+
var (
30+
resyncPeriod = flag.Duration("resync-period", time.Minute*10, "Resync period for cache")
31+
workers = flag.Int("workers", 10, "Concurrency to process multiple modification requests")
32+
showVersion = flag.Bool("version", false, "Show version")
33+
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
34+
clientConfigUrl = flag.String("client-config-url", "", "URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
35+
kubeConfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig")
36+
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
37+
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
38+
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
39+
timeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
40+
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
41+
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume modification. It exponentially increases with each failure, up to retry-interval-max.")
42+
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume modification.")
43+
44+
version = "<unknown>"
845
)
946

1047
// _____________________________________________________________________________________________________________________PRIVATE METHODS
@@ -14,3 +51,160 @@ func getDriverName(pclient lcsi.IClient, ptimeout time.Duration) (string, error)
1451
defer cancel()
1552
return pclient.GetDriverName(ctx)
1653
}
54+
55+
func leaseHandler(ppodName string, mc lcontroller.IModifyController, leaseChannel chan *lcoorV1.Lease) {
56+
var cancel context.CancelFunc = nil
57+
58+
klog.InfoS("leaseHandler: Looking for external-resizer lease holder")
59+
60+
timer := time.NewTimer(*resyncPeriod)
61+
defer timer.Stop()
62+
63+
for {
64+
select {
65+
case lease, ok := <-leaseChannel:
66+
if !ok {
67+
if cancel != nil {
68+
cancel()
69+
}
70+
return
71+
}
72+
currentLeader := *lease.Spec.HolderIdentity
73+
klog.V(6).InfoS("leaseHandler: Lease updated", "currentLeader", currentLeader, "podName", ppodName)
74+
75+
if currentLeader == ppodName && cancel == nil {
76+
var ctx context.Context
77+
ctx, cancel = context.WithCancel(context.Background())
78+
klog.InfoS("leaseHandler: Starting ModifyController", "podName", ppodName, "currentLeader", currentLeader)
79+
go mc.Run(*workers, ctx)
80+
} else if currentLeader != ppodName && cancel != nil {
81+
klog.InfoS("leaseHandler: Stopping ModifyController", "podName", ppodName, "currentLeader", currentLeader)
82+
cancel()
83+
cancel = nil
84+
}
85+
86+
if !timer.Stop() {
87+
<-timer.C
88+
}
89+
timer.Reset(*resyncPeriod)
90+
91+
case <-timer.C:
92+
if cancel != nil {
93+
cancel()
94+
}
95+
klog.Fatalf("leaseHandler: No external-resizer lease update received within timeout period. Timeout: %v", *resyncPeriod)
96+
}
97+
}
98+
}
99+
100+
func main() {
101+
klog.InitFlags(nil)
102+
flag.Set("logtostderr", "true")
103+
flag.Parse()
104+
105+
if *showVersion {
106+
fmt.Println(os.Args[0], version)
107+
os.Exit(0)
108+
}
109+
klog.Infof("Version : %s", version)
110+
klog.InfoS("Leader election must be enabled in the external-resizer CSI sidecar")
111+
112+
podName := os.Getenv("POD_NAME")
113+
if podName == "" {
114+
klog.Fatal("POD_NAME environment variable is not set")
115+
}
116+
podNamespace := os.Getenv("POD_NAMESPACE")
117+
if podNamespace == "" {
118+
klog.Fatal("POD_NAMESPACE environment variable is not set")
119+
}
120+
121+
addr := *httpEndpoint
122+
var config *rest.Config
123+
var err error
124+
if *clientConfigUrl != "" || *kubeConfig != "" {
125+
config, err = clientcmd.BuildConfigFromFlags(*clientConfigUrl, *kubeConfig)
126+
} else {
127+
config, err = rest.InClusterConfig()
128+
}
129+
if err != nil {
130+
klog.Fatal(err.Error())
131+
}
132+
133+
config.QPS = float32(*kubeAPIQPS)
134+
config.Burst = *kubeAPIBurst
135+
136+
kubeClient, err := kubernetes.NewForConfig(config)
137+
if err != nil {
138+
klog.Fatal(err.Error())
139+
}
140+
141+
informerFactory := informers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
142+
mux := http.NewServeMux()
143+
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
144+
csiClient, err := lcsi.New(*csiAddress, *timeout, metricsManager)
145+
if err != nil {
146+
klog.Fatal(err.Error())
147+
}
148+
if err := csiClient.SupportsVolumeModification(context.TODO()); err != nil {
149+
klog.Fatalf("CSI driver does not support volume modification: %v", err)
150+
}
151+
152+
driverName, err := getDriverName(csiClient, *timeout)
153+
if err != nil {
154+
klog.Fatal(fmt.Errorf("get driver name failed: %v", err))
155+
}
156+
klog.V(2).Infof("CSI driver name: %q", driverName)
157+
158+
csiModifier, err := lmodifier.NewFromClient(
159+
driverName,
160+
csiClient,
161+
kubeClient,
162+
*timeout,
163+
)
164+
if err != nil {
165+
klog.Fatal(err.Error())
166+
}
167+
168+
if addr != "" {
169+
metricsManager.RegisterToServer(mux, *metricsPath)
170+
metricsManager.SetDriverName(driverName)
171+
go func() {
172+
klog.Infof("ServeMux listening at %q", addr)
173+
err := http.ListenAndServe(addr, mux)
174+
if err != nil {
175+
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
176+
}
177+
}()
178+
}
179+
180+
modifierName := csiModifier.Name()
181+
mc := lcontroller.NewModifyController(
182+
modifierName,
183+
csiModifier,
184+
kubeClient,
185+
*resyncPeriod,
186+
informerFactory,
187+
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
188+
true, /* retryFailure */
189+
)
190+
leaseChannel := make(chan *lcoorV1.Lease)
191+
go leaseHandler(podName, mc, leaseChannel)
192+
193+
informerFactoryLeases := informers.NewSharedInformerFactoryWithOptions(kubeClient, *resyncPeriod, informers.WithNamespace(podNamespace))
194+
leaseInformer := informerFactoryLeases.Coordination().V1().Leases().Informer()
195+
leaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
196+
UpdateFunc: func(oldObj, newObj interface{}) {
197+
lease, ok := newObj.(*lcoorV1.Lease)
198+
if !ok {
199+
klog.ErrorS(nil, "Failed to process object, expected it to be a Lease", "obj", newObj)
200+
return
201+
}
202+
if lease.Name == "external-resizer-bs-csi-vngcloud-vn" {
203+
leaseChannel <- lease
204+
}
205+
},
206+
})
207+
informerFactory.Start(wait.NeverStop)
208+
informerFactoryLeases.Start(wait.NeverStop)
209+
leaseInformer.Run(wait.NeverStop)
210+
}

go.mod

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,67 @@ module github.com/vngcloud/vngcloud-csi-volume-modifier
33
go 1.21.6
44

55
require (
6+
github.com/golang/protobuf v1.5.4
67
github.com/kubernetes-csi/csi-lib-utils v0.17.0
78
google.golang.org/grpc v1.62.1
9+
google.golang.org/protobuf v1.33.0
10+
k8s.io/api v0.29.3
11+
k8s.io/apimachinery v0.29.3
12+
k8s.io/client-go v0.29.0
13+
k8s.io/csi-translation-lib v0.29.3
14+
k8s.io/klog/v2 v2.110.1
815
)
916

1017
require (
1118
github.com/beorn7/perks v1.0.1 // indirect
1219
github.com/blang/semver/v4 v4.0.0 // indirect
1320
github.com/cespare/xxhash/v2 v2.2.0 // indirect
21+
github.com/container-storage-interface/spec v1.9.0 // indirect
22+
github.com/davecgh/go-spew v1.1.1 // indirect
23+
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
1424
github.com/go-logr/logr v1.3.0 // indirect
15-
github.com/golang/protobuf v1.5.4 // indirect
16-
github.com/kr/text v0.2.0 // indirect
25+
github.com/go-logr/stdr v1.2.2 // indirect
26+
github.com/go-openapi/jsonpointer v0.19.6 // indirect
27+
github.com/go-openapi/jsonreference v0.20.2 // indirect
28+
github.com/go-openapi/swag v0.22.3 // indirect
29+
github.com/gogo/protobuf v1.3.2 // indirect
30+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
31+
github.com/google/gnostic-models v0.6.8 // indirect
32+
github.com/google/go-cmp v0.6.0 // indirect
33+
github.com/google/gofuzz v1.2.0 // indirect
34+
github.com/google/uuid v1.6.0 // indirect
35+
github.com/imdario/mergo v0.3.6 // indirect
36+
github.com/josharian/intern v1.0.0 // indirect
37+
github.com/json-iterator/go v1.1.12 // indirect
38+
github.com/mailru/easyjson v0.7.7 // indirect
1739
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
40+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
41+
github.com/modern-go/reflect2 v1.0.2 // indirect
42+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
1843
github.com/prometheus/client_golang v1.16.0 // indirect
1944
github.com/prometheus/client_model v0.4.0 // indirect
2045
github.com/prometheus/common v0.44.0 // indirect
2146
github.com/prometheus/procfs v0.10.1 // indirect
2247
github.com/spf13/pflag v1.0.5 // indirect
48+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0 // indirect
49+
go.opentelemetry.io/otel v1.19.0 // indirect
50+
go.opentelemetry.io/otel/metric v1.19.0 // indirect
51+
go.opentelemetry.io/otel/trace v1.19.0 // indirect
2352
golang.org/x/net v0.20.0 // indirect
53+
golang.org/x/oauth2 v0.16.0 // indirect
2454
golang.org/x/sys v0.16.0 // indirect
55+
golang.org/x/term v0.16.0 // indirect
2556
golang.org/x/text v0.14.0 // indirect
57+
golang.org/x/time v0.3.0 // indirect
58+
google.golang.org/appengine v1.6.8 // indirect
2659
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
27-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 // indirect
28-
google.golang.org/protobuf v1.33.0 // indirect
60+
gopkg.in/inf.v0 v0.9.1 // indirect
2961
gopkg.in/yaml.v2 v2.4.0 // indirect
30-
k8s.io/apimachinery v0.29.0 // indirect
62+
gopkg.in/yaml.v3 v3.0.1 // indirect
3163
k8s.io/component-base v0.29.0 // indirect
32-
k8s.io/klog/v2 v2.110.1 // indirect
64+
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
65+
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
66+
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
67+
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
68+
sigs.k8s.io/yaml v1.3.0 // indirect
3369
)

0 commit comments

Comments
 (0)