Skip to content

Commit 693a233

Browse files
committed
chore: best-effort cleanup socket
Build a cancellable context and propagate it or the context's Done() channel to the various concurrent processes.
1 parent 9e70aa6 commit 693a233

File tree

6 files changed

+51
-27
lines changed

6 files changed

+51
-27
lines changed

cmd/secrets-store-csi-driver/main.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"time"
2223

@@ -105,31 +106,31 @@ func main() {
105106
}
106107
// +kubebuilder:scaffold:builder
107108

108-
stopCh := ctrl.SetupSignalHandler()
109+
ctx := withShutdownSignal(context.Background())
110+
109111
go func() {
110112
klog.Infof("starting manager")
111-
if err := mgr.Start(stopCh); err != nil {
113+
if err := mgr.Start(ctx.Done()); err != nil {
112114
klog.Fatalf("failed to run manager, error: %+v", err)
113115
}
114116
}()
115117

116118
go func() {
117-
reconciler.RunPatcher(stopCh)
119+
reconciler.RunPatcher(ctx)
118120
}()
119121

120122
if *enableSecretRotation {
121123
rec, err := rotation.NewReconciler(scheme, *providerVolumePath, *nodeID, *rotationPollInterval)
122124
if err != nil {
123125
klog.Fatalf("failed to initialize rotation reconciler, error: %+v", err)
124126
}
125-
stopCh := make(<-chan struct{})
126-
go rec.Run(stopCh)
127+
go rec.Run(ctx.Done())
127128
}
128129

129-
handle()
130+
handle(ctx)
130131
}
131132

132-
func handle() {
133+
func handle(ctx context.Context) {
133134
driver := secretsstore.GetDriver()
134135
cfg, err := config.GetConfig()
135136
if err != nil {
@@ -139,5 +140,19 @@ func handle() {
139140
if err != nil {
140141
klog.Fatalf("failed to initialize driver, error creating client: %+v", err)
141142
}
142-
driver.Run(*driverName, *nodeID, *endpoint, *providerVolumePath, *minProviderVersion, *grpcSupportedProviders, c)
143+
driver.Run(ctx, *driverName, *nodeID, *endpoint, *providerVolumePath, *minProviderVersion, *grpcSupportedProviders, c)
144+
}
145+
146+
// withShutdownSignal returns a copy of the parent context that will close if
147+
// the process receives termination signals.
148+
func withShutdownSignal(ctx context.Context) context.Context {
149+
nctx, cancel := context.WithCancel(ctx)
150+
stopCh := ctrl.SetupSignalHandler()
151+
152+
go func() {
153+
<-stopCh
154+
klog.Info("received shutdown signal")
155+
cancel()
156+
}()
157+
return nctx
143158
}

controllers/secretproviderclasspodstatus_controller.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,28 +85,27 @@ func New(mgr manager.Manager, nodeID string) (*SecretProviderClassPodStatusRecon
8585
}, nil
8686
}
8787

88-
func (r *SecretProviderClassPodStatusReconciler) RunPatcher(stopCh <-chan struct{}) {
88+
func (r *SecretProviderClassPodStatusReconciler) RunPatcher(ctx context.Context) {
8989
ticker := time.NewTicker(5 * time.Second)
9090
defer ticker.Stop()
9191

9292
for {
9393
select {
94-
case <-stopCh:
94+
case <-ctx.Done():
9595
return
9696
case <-ticker.C:
97-
if err := r.Patcher(); err != nil {
97+
if err := r.Patcher(ctx); err != nil {
9898
klog.ErrorS(err, "failed to patch secret owner ref")
9999
}
100100
}
101101
}
102102
}
103103

104-
func (r *SecretProviderClassPodStatusReconciler) Patcher() error {
104+
func (r *SecretProviderClassPodStatusReconciler) Patcher(ctx context.Context) error {
105105
klog.V(5).Infof("patcher started")
106106
r.mutex.Lock()
107107
defer r.mutex.Unlock()
108108

109-
ctx := context.Background()
110109
spcPodStatusList := &v1alpha1.SecretProviderClassPodStatusList{}
111110
spcMap := make(map[string]v1alpha1.SecretProviderClass)
112111
secretOwnerMap := make(map[types.NamespacedName][]*v1alpha1.SecretProviderClassPodStatus)

pkg/csi-common/server.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package csicommon
1818

1919
import (
20+
"context"
2021
"net"
2122
"os"
2223
"runtime"
@@ -32,7 +33,7 @@ import (
3233
// Defines Non blocking GRPC server interfaces
3334
type NonBlockingGRPCServer interface {
3435
// Start services at the endpoint
35-
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
36+
Start(ctx context.Context, endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
3637
// Waits for the service to stop
3738
Wait()
3839
// Stops the service gracefully
@@ -51,9 +52,9 @@ type nonBlockingGRPCServer struct {
5152
server *grpc.Server
5253
}
5354

54-
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
55+
func (s *nonBlockingGRPCServer) Start(ctx context.Context, endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
5556
s.wg.Add(1)
56-
go s.serve(endpoint, ids, cs, ns)
57+
go s.serve(ctx, endpoint, ids, cs, ns)
5758
}
5859

5960
func (s *nonBlockingGRPCServer) Wait() {
@@ -68,7 +69,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
6869
s.server.Stop()
6970
}
7071

71-
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
72+
func (s *nonBlockingGRPCServer) serve(ctx context.Context, endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
7273
proto, addr, err := ParseEndpoint(endpoint)
7374
if err != nil {
7475
klog.Fatal(err.Error())
@@ -87,6 +88,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c
8788
if err != nil {
8889
klog.Fatalf("Failed to listen: %v", err)
8990
}
91+
defer listener.Close()
9092

9193
opts := []grpc.ServerOption{
9294
grpc.UnaryInterceptor(logGRPC),
@@ -106,8 +108,14 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c
106108

107109
klog.Infof("Listening for connections on address: %v", listener.Addr())
108110

109-
err = server.Serve(listener)
110-
if err != nil {
111-
klog.Fatalf("Failed to serve: %v", err)
112-
}
111+
go func() {
112+
err = server.Serve(listener)
113+
if err != nil {
114+
klog.Errorf("Failed to serve: %v", err)
115+
}
116+
}()
117+
118+
<-ctx.Done()
119+
server.GracefulStop()
120+
s.wg.Done()
113121
}

pkg/csi-common/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,23 @@ func RunNodePublishServer(endpoint string, d *CSIDriver, ns csi.NodeServer) {
7575
ids := NewDefaultIdentityServer(d)
7676

7777
s := NewNonBlockingGRPCServer()
78-
s.Start(endpoint, ids, nil, ns)
78+
s.Start(context.Background(), endpoint, ids, nil, ns)
7979
s.Wait()
8080
}
8181

8282
func RunControllerPublishServer(endpoint string, d *CSIDriver, cs csi.ControllerServer) {
8383
ids := NewDefaultIdentityServer(d)
8484

8585
s := NewNonBlockingGRPCServer()
86-
s.Start(endpoint, ids, cs, nil)
86+
s.Start(context.Background(), endpoint, ids, cs, nil)
8787
s.Wait()
8888
}
8989

9090
func RunControllerandNodePublishServer(endpoint string, d *CSIDriver, cs csi.ControllerServer, ns csi.NodeServer) {
9191
ids := NewDefaultIdentityServer(d)
9292

9393
s := NewNonBlockingGRPCServer()
94-
s.Start(endpoint, ids, cs, ns)
94+
s.Start(context.Background(), endpoint, ids, cs, ns)
9595
s.Wait()
9696
}
9797

pkg/secrets-store/secrets-store.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package secretsstore
1818

1919
import (
20+
"context"
2021
"strings"
2122

2223
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -93,7 +94,7 @@ func newIdentityServer(d *csicommon.CSIDriver) *identityServer {
9394
}
9495

9596
// Run starts the CSI plugin
96-
func (s *SecretsStore) Run(driverName, nodeID, endpoint, providerVolumePath, minProviderVersions, grpcSupportedProviders string, client client.Client) {
97+
func (s *SecretsStore) Run(ctx context.Context, driverName, nodeID, endpoint, providerVolumePath, minProviderVersions, grpcSupportedProviders string, client client.Client) {
9798
klog.Infof("Driver: %v ", driverName)
9899
klog.Infof("Version: %s", vendorVersion)
99100
klog.Infof("Provider Volume Path: %s", providerVolumePath)
@@ -123,6 +124,6 @@ func (s *SecretsStore) Run(driverName, nodeID, endpoint, providerVolumePath, min
123124
s.ids = newIdentityServer(s.driver)
124125

125126
server := csicommon.NewNonBlockingGRPCServer()
126-
server.Start(endpoint, s.ids, s.cs, s.ns)
127+
server.Start(ctx, endpoint, s.ids, s.cs, s.ns)
127128
server.Wait()
128129
}

test/sanity/sanity_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package sanity
1515

1616
import (
17+
"context"
1718
"testing"
1819

1920
"github.com/kubernetes-csi/csi-test/pkg/sanity"
@@ -32,7 +33,7 @@ const (
3233
func TestSanity(t *testing.T) {
3334
driver := secretsstore.GetDriver()
3435
go func() {
35-
driver.Run("secrets-store.csi.k8s.io", "somenodeid", endpoint, providerVolumePath, "provider1=0.0.2,provider2=0.0.4", "", nil)
36+
driver.Run(context.Background(), "secrets-store.csi.k8s.io", "somenodeid", endpoint, providerVolumePath, "provider1=0.0.2,provider2=0.0.4", "", nil)
3637
}()
3738

3839
config := &sanity.Config{

0 commit comments

Comments
 (0)