Skip to content

Commit d33adaa

Browse files
authored
Merge pull request #394 from tam7t/reusable-connection
feat: Reuse provider grpc clients
2 parents 1166c19 + c34b0a4 commit d33adaa

File tree

12 files changed

+127
-135
lines changed

12 files changed

+127
-135
lines changed

cmd/secrets-store-csi-driver/main.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"strings"
2223
"time"
2324

2425
"sigs.k8s.io/secrets-store-csi-driver/pkg/metrics"
@@ -85,10 +86,10 @@ func main() {
8586
klog.Fatalf("failed to initialize metrics exporter, error: %+v", err)
8687
}
8788

88-
config := ctrl.GetConfigOrDie()
89-
config.UserAgent = "csi-secrets-store/controller"
89+
cfg := ctrl.GetConfigOrDie()
90+
cfg.UserAgent = "csi-secrets-store/controller"
9091

91-
mgr, err := ctrl.NewManager(config, ctrl.Options{
92+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
9293
Scheme: scheme,
9394
MetricsBindAddress: *metricsAddr,
9495
LeaderElection: false,
@@ -108,6 +109,26 @@ func main() {
108109

109110
ctx := withShutdownSignal(context.Background())
110111

112+
// create provider clients
113+
providerClients := make(map[string]*secretsstore.CSIProviderClient)
114+
for _, provider := range strings.Split(*grpcSupportedProviders, ";") {
115+
p := strings.TrimSpace(provider)
116+
if len(p) != 0 {
117+
// dialing clients is non-blocking and will be retried on errors
118+
providerClients[provider], err = secretsstore.NewProviderClient(secretsstore.CSIProviderName(p), *providerVolumePath)
119+
if err != nil {
120+
klog.Fatalf("failed to create provider client, err: %+v", err)
121+
}
122+
}
123+
}
124+
defer func() {
125+
for k, v := range providerClients {
126+
if err := v.Close(); err != nil {
127+
klog.ErrorS(err, "closing grpc client failed", "provider", k)
128+
}
129+
}
130+
}()
131+
111132
go func() {
112133
klog.Infof("starting manager")
113134
if err := mgr.Start(ctx.Done()); err != nil {
@@ -120,27 +141,23 @@ func main() {
120141
}()
121142

122143
if *enableSecretRotation {
123-
rec, err := rotation.NewReconciler(scheme, *providerVolumePath, *nodeID, *rotationPollInterval)
144+
rec, err := rotation.NewReconciler(scheme, *providerVolumePath, *nodeID, *rotationPollInterval, providerClients)
124145
if err != nil {
125146
klog.Fatalf("failed to initialize rotation reconciler, error: %+v", err)
126147
}
127148
go rec.Run(ctx.Done())
128149
}
129150

130-
handle(ctx)
131-
}
132-
133-
func handle(ctx context.Context) {
134-
driver := secretsstore.GetDriver()
135-
cfg, err := config.GetConfig()
151+
ccfg, err := config.GetConfig()
136152
if err != nil {
137153
klog.Fatalf("failed to initialize driver, error getting config: %+v", err)
138154
}
139-
c, err := client.New(cfg, client.Options{Scheme: scheme, Mapper: nil})
155+
c, err := client.New(ccfg, client.Options{Scheme: scheme, Mapper: nil})
140156
if err != nil {
141157
klog.Fatalf("failed to initialize driver, error creating client: %+v", err)
142158
}
143-
driver.Run(ctx, *driverName, *nodeID, *endpoint, *providerVolumePath, *minProviderVersion, *grpcSupportedProviders, c)
159+
driver := secretsstore.GetDriver()
160+
driver.Run(ctx, *driverName, *nodeID, *endpoint, *providerVolumePath, *minProviderVersion, providerClients, c)
144161
}
145162

146163
// withShutdownSignal returns a copy of the parent context that will close if

pkg/csi-common/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ func (s *nonBlockingGRPCServer) ForceStop() {
7070
}
7171

7272
func (s *nonBlockingGRPCServer) serve(ctx context.Context, endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
73+
defer s.wg.Done()
74+
7375
proto, addr, err := ParseEndpoint(endpoint)
7476
if err != nil {
7577
klog.Fatal(err.Error())
@@ -117,5 +119,4 @@ func (s *nonBlockingGRPCServer) serve(ctx context.Context, endpoint string, ids
117119

118120
<-ctx.Done()
119121
server.GracefulStop()
120-
s.wg.Done()
121122
}

pkg/errors/errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ const (
2929
FailedToMount = "FailedToMount"
3030
// SecretProviderClassNotFound error
3131
SecretProviderClassNotFound = "SecretProviderClassNotFound"
32-
// FailedToCreateProviderGRPCClient error
33-
FailedToCreateProviderGRPCClient = "FailedToCreateProviderGRPCClient"
32+
// FailedToLookupProviderGRPCClient error
33+
FailedToLookupProviderGRPCClient = "FailedToLookupProviderGRPCClient"
3434
// GRPCProviderError error
3535
GRPCProviderError = "GRPCProviderError"
3636
// FailedToRotate error

pkg/rotation/reconciler.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type Reconciler struct {
8383
}
8484

8585
// NewReconciler returns a new reconciler for rotation
86-
func NewReconciler(s *runtime.Scheme, providerVolumePath, nodeName string, rotationPollInterval time.Duration) (*Reconciler, error) {
86+
func NewReconciler(s *runtime.Scheme, providerVolumePath, nodeName string, rotationPollInterval time.Duration, providerClients map[string]*secretsstore.CSIProviderClient) (*Reconciler, error) {
8787
config, err := buildConfig()
8888
if err != nil {
8989
return nil, err
@@ -109,7 +109,7 @@ func NewReconciler(s *runtime.Scheme, providerVolumePath, nodeName string, rotat
109109
scheme: s,
110110
providerVolumePath: providerVolumePath,
111111
rotationPollInterval: rotationPollInterval,
112-
providerClients: make(map[string]*secretsstore.CSIProviderClient),
112+
providerClients: providerClients,
113113
reporter: newStatsReporter(),
114114
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
115115
eventRecorder: recorder,
@@ -274,11 +274,11 @@ func (r *Reconciler) reconcile(ctx context.Context, spcps *v1alpha1.SecretProvid
274274
}
275275

276276
providerName = string(spc.Spec.Provider)
277-
providerClient, err := r.getProviderClient(providerName)
278-
if err != nil {
279-
errorReason = internalerrors.FailedToCreateProviderGRPCClient
280-
r.generateEvent(pod, v1.EventTypeWarning, mountRotationFailedReason, fmt.Sprintf("failed to create provider client, err: %+v", err))
281-
return fmt.Errorf("failed to create provider client, err: %+v", err)
277+
providerClient, exists := r.providerClients[providerName]
278+
if !exists {
279+
errorReason = internalerrors.FailedToLookupProviderGRPCClient
280+
r.generateEvent(pod, v1.EventTypeWarning, mountRotationFailedReason, fmt.Sprintf("failed to lookup provider client: %q", providerName))
281+
return fmt.Errorf("failed to lookup provider client: %q", providerName)
282282
}
283283
newObjectVersions, errorReason, err := providerClient.MountContent(ctx, string(paramsJSON), string(secretsJSON), spcps.Status.TargetPath, string(permissionJSON), oldObjectVersions)
284284
if err != nil {
@@ -446,21 +446,6 @@ func (r *Reconciler) patchSecret(ctx context.Context, name, namespace string, da
446446
return r.ctrlWriterClient.Patch(ctx, secret, patch)
447447
}
448448

449-
// getProviderClient returns the GRPC provider client to use for mount request
450-
func (r *Reconciler) getProviderClient(providerName string) (*secretsstore.CSIProviderClient, error) {
451-
// check if the provider client already exists
452-
if providerClient, exists := r.providerClients[providerName]; exists {
453-
return providerClient, nil
454-
}
455-
// create a new client as it doesn't exist in the reconciler cache
456-
providerClient, err := secretsstore.NewProviderClient(secretsstore.CSIProviderName(providerName), r.providerVolumePath)
457-
if err != nil {
458-
return nil, fmt.Errorf("failed to create %s provider client, err: %+v", providerName, err)
459-
}
460-
r.providerClients[providerName] = providerClient
461-
return providerClient, nil
462-
}
463-
464449
// runWorker runs a thread that process the queue
465450
func (r *Reconciler) runWorker() {
466451
for r.processNextItem() {

pkg/rotation/reconciler_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,24 @@ func newTestReconciler(s *runtime.Scheme, kubeClient kubernetes.Interface, crdCl
7575
return nil, err
7676
}
7777

78+
client, err := secretsstore.NewProviderClient("provider1", socketPath)
79+
if err != nil {
80+
return nil, err
81+
}
82+
7883
return &Reconciler{
7984
store: store,
8085
ctrlReaderClient: ctrlClient,
8186
ctrlWriterClient: ctrlClient,
8287
scheme: s,
8388
providerVolumePath: socketPath,
8489
rotationPollInterval: rotationPollInterval,
85-
providerClients: map[string]*secretsstore.CSIProviderClient{},
86-
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
87-
reporter: newStatsReporter(),
88-
eventRecorder: fakeRecorder,
90+
providerClients: map[string]*secretsstore.CSIProviderClient{
91+
"provider1": client,
92+
},
93+
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
94+
reporter: newStatsReporter(),
95+
eventRecorder: fakeRecorder,
8996
}, nil
9097
}
9198

@@ -370,7 +377,7 @@ func TestReconcileError(t *testing.T) {
370377
expectedErrorEvents: false,
371378
},
372379
{
373-
name: "failed to create provider client",
380+
name: "failed to lookup provider client",
374381
rotationPollInterval: 60 * time.Second,
375382
secretProviderClassPodStatusToProcess: &v1alpha1.SecretProviderClassPodStatus{
376383
ObjectMeta: metav1.ObjectMeta{

pkg/secrets-store/nodeserver.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ import (
4141

4242
type nodeServer struct {
4343
*csicommon.DefaultNodeServer
44-
providerVolumePath string
45-
minProviderVersions map[string]string
46-
mounter mount.Interface
47-
reporter StatsReporter
48-
nodeID string
49-
client client.Client
50-
grpcSupportedProviders map[string]bool
44+
providerVolumePath string
45+
minProviderVersions map[string]string
46+
mounter mount.Interface
47+
reporter StatsReporter
48+
nodeID string
49+
client client.Client
50+
providerClients map[string]*CSIProviderClient
5151
}
5252

5353
const (
@@ -296,14 +296,9 @@ func (ns *nodeServer) mountSecretsStoreObjectContent(ctx context.Context, provid
296296
// if the provider supports and is running grpc server, then communicate with
297297
// provider using the grpc client, otherwise fallback to invoking the provider
298298
// binary which is how it was initially implemented
299-
_, exists := ns.grpcSupportedProviders[providerName]
300-
if exists {
299+
if client, exists := ns.providerClients[providerName]; exists {
301300
klog.InfoS("Using grpc client", "provider", providerName, "pod", podName)
302-
providerClient, err := NewProviderClient(CSIProviderName(providerName), ns.providerVolumePath)
303-
if err != nil {
304-
return nil, internalerrors.FailedToCreateProviderGRPCClient, fmt.Errorf("failed to create provider client, err: %+v", err)
305-
}
306-
return providerClient.MountContent(ctx, attributes, secrets, targetPath, permission, nil)
301+
return client.MountContent(ctx, attributes, secrets, targetPath, permission, nil)
307302
}
308303

309304
providerBinary := ns.getProviderPath(runtime.GOOS, providerName)

pkg/secrets-store/nodeserver_test.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,18 @@ import (
4040
"sigs.k8s.io/controller-runtime/pkg/client/fake"
4141
)
4242

43-
func testNodeServer(mountPoints []mount.MountPoint, client client.Client, grpcSupportProviders string, reporter StatsReporter, providerBinaryName string) (*nodeServer, error) {
43+
func testNodeServer(mountPoints []mount.MountPoint, client client.Client, grpcSupportProvider string, reporter StatsReporter, providerBinaryName string) (*nodeServer, error) {
4444
tmpDir, err := ioutil.TempDir("", "ut")
4545
if err != nil {
4646
return nil, err
4747
}
48-
if grpcSupportProviders != "" {
49-
err = ioutil.WriteFile(fmt.Sprintf("%s/%s.sock", tmpDir, grpcSupportProviders), nil, permission)
48+
providerClients := map[string]*CSIProviderClient{}
49+
if grpcSupportProvider != "" {
50+
client, err := NewProviderClient(CSIProviderName(grpcSupportProvider), tmpDir)
5051
if err != nil {
5152
return nil, err
5253
}
54+
providerClients[grpcSupportProvider] = client
5355
}
5456
if providerBinaryName != "" {
5557
dirPath := fmt.Sprintf("%s/%s", tmpDir, providerBinaryName)
@@ -59,12 +61,15 @@ func testNodeServer(mountPoints []mount.MountPoint, client client.Client, grpcSu
5961
return nil, err
6062
}
6163
f, err := os.Create(filePath)
64+
if err != nil {
65+
return nil, err
66+
}
6267
defer f.Close()
6368
if err != nil {
6469
return nil, err
6570
}
6671
}
67-
return newNodeServer(NewFakeDriver(), tmpDir, "", grpcSupportProviders, "testnode", mount.NewFakeMounter(mountPoints), client, reporter)
72+
return newNodeServer(NewFakeDriver(), tmpDir, "", "testnode", mount.NewFakeMounter(mountPoints), providerClients, client, reporter)
6873
}
6974

7075
func getTestTargetPath(pattern string, t *testing.T) string {
@@ -402,15 +407,6 @@ func TestMountSecretsStoreObjectContent(t *testing.T) {
402407
expectedErrorReason: internalerrors.ProviderBinaryNotFound,
403408
expectedErr: true,
404409
},
405-
{
406-
name: "failed to create provider grpc client",
407-
attributes: "{}",
408-
targetPath: getTestTargetPath("", t),
409-
permission: fmt.Sprint(permission),
410-
grpcSupportProviders: "provider1",
411-
expectedErrorReason: "GRPCProviderError",
412-
expectedErr: true,
413-
},
414410
}
415411

416412
for _, test := range tests {

0 commit comments

Comments
 (0)