diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index acb2856d2..b213522e5 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -27,6 +27,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/server" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/pkg/util" ) @@ -314,7 +315,7 @@ func (o *ProxyRunOptions) Validate() error { if len(o.ProxyStrategies) == 0 { return fmt.Errorf("ProxyStrategies cannot be empty") } - if _, err := server.ParseProxyStrategies(o.ProxyStrategies); err != nil { + if _, err := proxystrategies.ParseProxyStrategies(o.ProxyStrategies); err != nil { return fmt.Errorf("invalid proxy strategies: %v", err) } if o.XfrChannelSize <= 0 { diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 38d518bbd..db2aec185 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -46,6 +46,7 @@ import ( "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/pkg/server" "sigs.k8s.io/apiserver-network-proxy/pkg/server/leases" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/pkg/util" "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) @@ -134,7 +135,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { AuthenticationAudience: o.AuthenticationAudience, } klog.V(1).Infoln("Starting frontend server for client connections.") - ps, err := server.ParseProxyStrategies(o.ProxyStrategies) + ps, err := proxystrategies.ParseProxyStrategies(o.ProxyStrategies) if err != nil { return err } diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index f42631d28..293253695 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -22,7 +22,6 @@ import ( "io" "math/rand" "slices" - "strings" "sync" "time" @@ -32,72 +31,11 @@ import ( commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) -type ProxyStrategy int - -const ( - // With this strategy the Proxy Server will randomly pick a backend from - // the current healthy backends to establish the tunnel over which to - // forward requests. - ProxyStrategyDefault ProxyStrategy = iota + 1 - // With this strategy the Proxy Server will pick a backend that has the same - // associated host as the request.Host to establish the tunnel. - ProxyStrategyDestHost - // ProxyStrategyDefaultRoute will only forward traffic to agents that have explicity advertised - // they serve the default route through an agent identifier. Typically used in combination with destHost - ProxyStrategyDefaultRoute -) - -func (ps ProxyStrategy) String() string { - switch ps { - case ProxyStrategyDefault: - return "default" - case ProxyStrategyDestHost: - return "destHost" - case ProxyStrategyDefaultRoute: - return "defaultRoute" - } - panic(fmt.Sprintf("unhandled ProxyStrategy: %d", ps)) -} - -func ParseProxyStrategy(s string) (ProxyStrategy, error) { - switch s { - case ProxyStrategyDefault.String(): - return ProxyStrategyDefault, nil - case ProxyStrategyDestHost.String(): - return ProxyStrategyDestHost, nil - case ProxyStrategyDefaultRoute.String(): - return ProxyStrategyDefaultRoute, nil - default: - return 0, fmt.Errorf("unknown proxy strategy: %s", s) - } -} - -// GenProxyStrategiesFromStr generates the list of proxy strategies from the -// comma-seperated string, i.e., destHost. -func ParseProxyStrategies(proxyStrategies string) ([]ProxyStrategy, error) { - var result []ProxyStrategy - - strs := strings.Split(proxyStrategies, ",") - for _, s := range strs { - if len(s) == 0 { - continue - } - ps, err := ParseProxyStrategy(s) - if err != nil { - return nil, err - } - result = append(result, ps) - } - if len(result) == 0 { - return nil, fmt.Errorf("proxy strategies cannot be empty") - } - return result, nil -} - // Backend abstracts a connected Konnectivity agent. // // In the only currently supported case (gRPC), it wraps an @@ -271,24 +209,31 @@ type DefaultBackendStorage struct { // e.g., when associating to the DestHostBackendManager, it can only use the // identifiers of types, IPv4, IPv6 and Host. idTypes []header.IdentifierType + // proxyStrategy is the proxy strategy of the backend manager this storage + // belongs to. + // It is used to record metrics. + proxyStrategy proxystrategies.ProxyStrategy } // NewDefaultBackendManager returns a DefaultBackendManager. func NewDefaultBackendManager() *DefaultBackendManager { return &DefaultBackendManager{ DefaultBackendStorage: NewDefaultBackendStorage( - []header.IdentifierType{header.UID})} + []header.IdentifierType{header.UID}, proxystrategies.ProxyStrategyDefault)} } // NewDefaultBackendStorage returns a DefaultBackendStorage -func NewDefaultBackendStorage(idTypes []header.IdentifierType) *DefaultBackendStorage { +func NewDefaultBackendStorage(idTypes []header.IdentifierType, proxyStrategy proxystrategies.ProxyStrategy) *DefaultBackendStorage { // Set an explicit value, so that the metric is emitted even when // no agent ever successfully connects. metrics.Metrics.SetBackendCount(0) + metrics.Metrics.SetTotalBackendCount(proxyStrategy, 0) + return &DefaultBackendStorage{ - backends: make(map[string][]*Backend), - random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */ - idTypes: idTypes, + backends: make(map[string][]*Backend), + random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */ + idTypes: idTypes, + proxyStrategy: proxyStrategy, } } @@ -318,6 +263,7 @@ func (s *DefaultBackendStorage) addBackend(identifier string, idType header.Iden } s.backends[identifier] = []*Backend{backend} metrics.Metrics.SetBackendCount(len(s.backends)) + metrics.Metrics.SetTotalBackendCount(s.proxyStrategy, len(s.backends)) s.agentIDs = append(s.agentIDs, identifier) } @@ -359,6 +305,7 @@ func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.I klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType) } metrics.Metrics.SetBackendCount(len(s.backends)) + metrics.Metrics.SetTotalBackendCount(s.proxyStrategy, len(s.backends)) } // NumBackends resturns the number of available backends diff --git a/pkg/server/backend_manager_test.go b/pkg/server/backend_manager_test.go index 107103cba..4a8853194 100644 --- a/pkg/server/backend_manager_test.go +++ b/pkg/server/backend_manager_test.go @@ -18,11 +18,9 @@ package server import ( "context" - "fmt" "reflect" "testing" - "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" "google.golang.org/grpc/metadata" @@ -385,123 +383,3 @@ func TestDestHostBackendManager_WithDuplicateIdents(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } } - -func TestProxyStrategy(t *testing.T) { - for desc, tc := range map[string]struct { - input ProxyStrategy - want string - wantPanic string - }{ - "default": { - input: ProxyStrategyDefault, - want: "default", - }, - "destHost": { - input: ProxyStrategyDestHost, - want: "destHost", - }, - "defaultRoute": { - input: ProxyStrategyDefaultRoute, - want: "defaultRoute", - }, - "unrecognized": { - input: ProxyStrategy(0), - wantPanic: "unhandled ProxyStrategy: 0", - }, - } { - t.Run(desc, func(t *testing.T) { - if tc.wantPanic != "" { - assert.PanicsWithValue(t, tc.wantPanic, func() { - _ = tc.input.String() - }) - } else { - got := tc.input.String() - if got != tc.want { - t.Errorf("ProxyStrategy.String(): got %v, want %v", got, tc.want) - } - } - }) - } -} - -func TestParseProxyStrategy(t *testing.T) { - for desc, tc := range map[string]struct { - input string - want ProxyStrategy - wantErr error - }{ - "empty": { - input: "", - wantErr: fmt.Errorf("unknown proxy strategy: "), - }, - "unrecognized": { - input: "unrecognized", - wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"), - }, - "default": { - input: "default", - want: ProxyStrategyDefault, - }, - "destHost": { - input: "destHost", - want: ProxyStrategyDestHost, - }, - "defaultRoute": { - input: "defaultRoute", - want: ProxyStrategyDefaultRoute, - }, - } { - t.Run(desc, func(t *testing.T) { - got, err := ParseProxyStrategy(tc.input) - assert.Equal(t, tc.wantErr, err, "ParseProxyStrategy(%s): got error %q, want %v", tc.input, err, tc.wantErr) - if got != tc.want { - t.Errorf("ParseProxyStrategy(%s): got %v, want %v", tc.input, got, tc.want) - } - }) - } -} - -func TestParseProxyStrategies(t *testing.T) { - for desc, tc := range map[string]struct { - input string - want []ProxyStrategy - wantErr error - }{ - "empty": { - input: "", - wantErr: fmt.Errorf("proxy strategies cannot be empty"), - }, - "unrecognized": { - input: "unrecognized", - wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"), - }, - "default": { - input: "default", - want: []ProxyStrategy{ProxyStrategyDefault}, - }, - "destHost": { - input: "destHost", - want: []ProxyStrategy{ProxyStrategyDestHost}, - }, - "defaultRoute": { - input: "defaultRoute", - want: []ProxyStrategy{ProxyStrategyDefaultRoute}, - }, - "duplicate": { - input: "destHost,defaultRoute,defaultRoute,default", - want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefaultRoute, ProxyStrategyDefault}, - }, - "multiple": { - input: "destHost,defaultRoute,default", - want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefault}, - }, - } { - t.Run(desc, func(t *testing.T) { - got, err := ParseProxyStrategies(tc.input) - assert.Equal(t, tc.wantErr, err, "ParseProxyStrategies(%s): got error %q, want %v", tc.input, err, tc.wantErr) - if !reflect.DeepEqual(got, tc.want) { - t.Errorf("ParseProxyStrategies(%s): got %v, want %v", tc.input, got, tc.want) - } - }) - } -} diff --git a/pkg/server/default_route_backend_manager.go b/pkg/server/default_route_backend_manager.go index f4480eab6..e09136c69 100644 --- a/pkg/server/default_route_backend_manager.go +++ b/pkg/server/default_route_backend_manager.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -32,7 +33,7 @@ var _ BackendManager = &DefaultRouteBackendManager{} func NewDefaultRouteBackendManager() *DefaultRouteBackendManager { return &DefaultRouteBackendManager{ DefaultBackendStorage: NewDefaultBackendStorage( - []header.IdentifierType{header.DefaultRoute})} + []header.IdentifierType{header.DefaultRoute}, proxystrategies.ProxyStrategyDefaultRoute)} } // Backend tries to get a backend that advertises default route, with random selection. diff --git a/pkg/server/desthost_backend_manager.go b/pkg/server/desthost_backend_manager.go index 8c914e4ba..280065775 100644 --- a/pkg/server/desthost_backend_manager.go +++ b/pkg/server/desthost_backend_manager.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -32,7 +33,7 @@ var _ BackendManager = &DestHostBackendManager{} func NewDestHostBackendManager() *DestHostBackendManager { return &DestHostBackendManager{ DefaultBackendStorage: NewDefaultBackendStorage( - []header.IdentifierType{header.IPv4, header.IPv6, header.Host})} + []header.IdentifierType{header.IPv4, header.IPv6, header.Host}, proxystrategies.ProxyStrategyDestHost)} } func (dibm *DestHostBackendManager) AddBackend(backend *Backend) { diff --git a/pkg/server/metrics/metrics.go b/pkg/server/metrics/metrics.go index 9a89d6ee6..9266b2189 100644 --- a/pkg/server/metrics/metrics.go +++ b/pkg/server/metrics/metrics.go @@ -24,6 +24,7 @@ import ( commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" ) const ( @@ -51,6 +52,7 @@ type ServerMetrics struct { grpcConnections *prometheus.GaugeVec httpConnections prometheus.Gauge backend *prometheus.GaugeVec + totalBackendCount *prometheus.GaugeVec pendingDials *prometheus.GaugeVec establishedConns *prometheus.GaugeVec fullRecvChannels *prometheus.GaugeVec @@ -114,6 +116,17 @@ func newServerMetrics() *ServerMetrics { }, []string{}, ) + totalBackendCount := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "ready_backend_connections_total", + Help: "Total number of konnectivity agent connected to the proxy server", + }, + []string{ + "proxy_strategy", + }, + ) pendingDials := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -203,6 +216,7 @@ func newServerMetrics() *ServerMetrics { prometheus.MustRegister(grpcConnections) prometheus.MustRegister(httpConnections) prometheus.MustRegister(backend) + prometheus.MustRegister(totalBackendCount) prometheus.MustRegister(pendingDials) prometheus.MustRegister(establishedConns) prometheus.MustRegister(fullRecvChannels) @@ -220,6 +234,7 @@ func newServerMetrics() *ServerMetrics { grpcConnections: grpcConnections, httpConnections: httpConnections, backend: backend, + totalBackendCount: totalBackendCount, pendingDials: pendingDials, establishedConns: establishedConns, fullRecvChannels: fullRecvChannels, @@ -240,6 +255,7 @@ func (s *ServerMetrics) Reset() { s.frontendLatencies.Reset() s.grpcConnections.Reset() s.backend.Reset() + s.totalBackendCount.Reset() s.pendingDials.Reset() s.establishedConns.Reset() s.fullRecvChannels.Reset() @@ -284,6 +300,11 @@ func (s *ServerMetrics) SetBackendCount(count int) { s.backend.WithLabelValues().Set(float64(count)) } +// SetTotalBackendCount sets the total number of backend connection. +func (s *ServerMetrics) SetTotalBackendCount(proxyStrategy proxystrategies.ProxyStrategy, count int) { + s.totalBackendCount.WithLabelValues(proxyStrategy.String()).Set(float64(count)) +} + // SetPendingDialCount sets the number of pending dials. func (s *ServerMetrics) SetPendingDialCount(count int) { s.pendingDials.WithLabelValues().Set(float64(count)) diff --git a/pkg/server/proxystrategies/proxystrategies.go b/pkg/server/proxystrategies/proxystrategies.go new file mode 100644 index 000000000..4c2c98e65 --- /dev/null +++ b/pkg/server/proxystrategies/proxystrategies.go @@ -0,0 +1,68 @@ +package proxystrategies + +import ( + "fmt" + "strings" +) + +type ProxyStrategy int + +const ( + // With this strategy the Proxy Server will randomly pick a backend from + // the current healthy backends to establish the tunnel over which to + // forward requests. + ProxyStrategyDefault ProxyStrategy = iota + 1 + // With this strategy the Proxy Server will pick a backend that has the same + // associated host as the request.Host to establish the tunnel. + ProxyStrategyDestHost + // ProxyStrategyDefaultRoute will only forward traffic to agents that have explicity advertised + // they serve the default route through an agent identifier. Typically used in combination with destHost + ProxyStrategyDefaultRoute +) + +func (ps ProxyStrategy) String() string { + switch ps { + case ProxyStrategyDefault: + return "default" + case ProxyStrategyDestHost: + return "destHost" + case ProxyStrategyDefaultRoute: + return "defaultRoute" + } + panic(fmt.Sprintf("unhandled ProxyStrategy: %d", ps)) +} + +func ParseProxyStrategy(s string) (ProxyStrategy, error) { + switch s { + case ProxyStrategyDefault.String(): + return ProxyStrategyDefault, nil + case ProxyStrategyDestHost.String(): + return ProxyStrategyDestHost, nil + case ProxyStrategyDefaultRoute.String(): + return ProxyStrategyDefaultRoute, nil + default: + return 0, fmt.Errorf("unknown proxy strategy: %s", s) + } +} + +// GenProxyStrategiesFromStr generates the list of proxy strategies from the +// comma-seperated string, i.e., destHost. +func ParseProxyStrategies(proxyStrategies string) ([]ProxyStrategy, error) { + var result []ProxyStrategy + + strs := strings.Split(proxyStrategies, ",") + for _, s := range strs { + if len(s) == 0 { + continue + } + ps, err := ParseProxyStrategy(s) + if err != nil { + return nil, err + } + result = append(result, ps) + } + if len(result) == 0 { + return nil, fmt.Errorf("proxy strategies cannot be empty") + } + return result, nil +} diff --git a/pkg/server/proxystrategies/proxystrategies_test.go b/pkg/server/proxystrategies/proxystrategies_test.go new file mode 100644 index 000000000..bb050424b --- /dev/null +++ b/pkg/server/proxystrategies/proxystrategies_test.go @@ -0,0 +1,129 @@ +package proxystrategies + +import ( + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProxyStrategy(t *testing.T) { + for desc, tc := range map[string]struct { + input ProxyStrategy + want string + wantPanic string + }{ + "default": { + input: ProxyStrategyDefault, + want: "default", + }, + "destHost": { + input: ProxyStrategyDestHost, + want: "destHost", + }, + "defaultRoute": { + input: ProxyStrategyDefaultRoute, + want: "defaultRoute", + }, + "unrecognized": { + input: ProxyStrategy(0), + wantPanic: "unhandled ProxyStrategy: 0", + }, + } { + t.Run(desc, func(t *testing.T) { + if tc.wantPanic != "" { + assert.PanicsWithValue(t, tc.wantPanic, func() { + _ = tc.input.String() + }) + } else { + got := tc.input.String() + if got != tc.want { + t.Errorf("ProxyStrategy.String(): got %v, want %v", got, tc.want) + } + } + }) + } +} + +func TestParseProxyStrategy(t *testing.T) { + for desc, tc := range map[string]struct { + input string + want ProxyStrategy + wantErr error + }{ + "empty": { + input: "", + wantErr: fmt.Errorf("unknown proxy strategy: "), + }, + "unrecognized": { + input: "unrecognized", + wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"), + }, + "default": { + input: "default", + want: ProxyStrategyDefault, + }, + "destHost": { + input: "destHost", + want: ProxyStrategyDestHost, + }, + "defaultRoute": { + input: "defaultRoute", + want: ProxyStrategyDefaultRoute, + }, + } { + t.Run(desc, func(t *testing.T) { + got, err := ParseProxyStrategy(tc.input) + assert.Equal(t, tc.wantErr, err, "ParseProxyStrategy(%s): got error %q, want %v", tc.input, err, tc.wantErr) + if got != tc.want { + t.Errorf("ParseProxyStrategy(%s): got %v, want %v", tc.input, got, tc.want) + } + }) + } +} + +func TestParseProxyStrategies(t *testing.T) { + for desc, tc := range map[string]struct { + input string + want []ProxyStrategy + wantErr error + }{ + "empty": { + input: "", + wantErr: fmt.Errorf("proxy strategies cannot be empty"), + }, + "unrecognized": { + input: "unrecognized", + wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"), + }, + "default": { + input: "default", + want: []ProxyStrategy{ProxyStrategyDefault}, + }, + "destHost": { + input: "destHost", + want: []ProxyStrategy{ProxyStrategyDestHost}, + }, + "defaultRoute": { + input: "defaultRoute", + want: []ProxyStrategy{ProxyStrategyDefaultRoute}, + }, + "duplicate": { + input: "destHost,defaultRoute,defaultRoute,default", + want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefaultRoute, ProxyStrategyDefault}, + }, + "multiple": { + input: "destHost,defaultRoute,default", + want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefault}, + }, + } { + t.Run(desc, func(t *testing.T) { + got, err := ParseProxyStrategies(tc.input) + assert.Equal(t, tc.wantErr, err, "ParseProxyStrategies(%s): got error %q, want %v", tc.input, err, tc.wantErr) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("ParseProxyStrategies(%s): got %v, want %v", tc.input, got, tc.want) + } + }) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 9122c48f8..e609584f1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -41,6 +41,7 @@ import ( commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" "sigs.k8s.io/apiserver-network-proxy/pkg/util" "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" @@ -213,7 +214,7 @@ type ProxyServer struct { AgentAuthenticationOptions *AgentTokenAuthenticationOptions // TODO: move strategies into BackendStorage - proxyStrategies []ProxyStrategy + proxyStrategies []proxystrategies.ProxyStrategy xfrChannelSize int } @@ -230,11 +231,11 @@ var _ agent.AgentServiceServer = &ProxyServer{} var _ client.ProxyServiceServer = &ProxyServer{} -func genContext(proxyStrategies []ProxyStrategy, reqHost string) context.Context { +func genContext(proxyStrategies []proxystrategies.ProxyStrategy, reqHost string) context.Context { ctx := context.Background() for _, ps := range proxyStrategies { switch ps { - case ProxyStrategyDestHost: + case proxystrategies.ProxyStrategyDestHost: addr := util.RemovePortFromHost(reqHost) ctx = context.WithValue(ctx, destHostKey, addr) } @@ -373,15 +374,15 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien } // NewProxyServer creates a new ProxyServer instance -func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { +func NewProxyServer(serverID string, proxyStrategies []proxystrategies.ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { var bms []BackendManager for _, ps := range proxyStrategies { switch ps { - case ProxyStrategyDestHost: + case proxystrategies.ProxyStrategyDestHost: bms = append(bms, NewDestHostBackendManager()) - case ProxyStrategyDefault: + case proxystrategies.ProxyStrategyDefault: bms = append(bms, NewDefaultBackendManager()) - case ProxyStrategyDefaultRoute: + case proxystrategies.ProxyStrategyDefaultRoute: bms = append(bms, NewDefaultRouteBackendManager()) default: klog.ErrorS(nil, "Unknown proxy strategy", "strategy", ps) diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 558166520..9e2b2397a 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -39,6 +39,7 @@ import ( client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies" metricstest "sigs.k8s.io/apiserver-network-proxy/pkg/testing/metrics" agentmock "sigs.k8s.io/apiserver-network-proxy/proto/agent/mocks" "sigs.k8s.io/apiserver-network-proxy/proto/header" @@ -169,7 +170,7 @@ func TestAgentTokenAuthenticationErrorsToken(t *testing.T) { conn.EXPECT().Recv().Return(nil, io.EOF) } - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{ + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{ Enabled: true, KubernetesClient: kcs, AgentNamespace: tc.wantNamespace, @@ -197,7 +198,7 @@ func TestRemovePendingDialForStream(t *testing.T) { pending3 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: streamUID}} pending4 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: "different-uid"}} pending5 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: ""}} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.PendingDial.Add(1, pending1) p.PendingDial.Add(2, pending2) p.PendingDial.Add(3, pending3) @@ -225,7 +226,7 @@ func TestAddRemoveFrontends(t *testing.T) { agent2ConnID2 := new(ProxyClientConnection) agent3ConnID1 := new(ProxyClientConnection) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.removeEstablished("agent1", int64(1)) expectedFrontends := make(map[string]map[int64]*ProxyClientConnection) @@ -233,7 +234,7 @@ func TestAddRemoveFrontends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p = NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -263,7 +264,7 @@ func TestAddRemoveBackends_DefaultStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addBackend(backend1) @@ -295,7 +296,7 @@ func TestAddRemoveBackends_DefaultRouteStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=false"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"default-route=true"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefaultRoute}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefaultRoute}, 1, nil, xfrChannelSize) p.addBackend(backend1) @@ -337,7 +338,7 @@ func TestAddRemoveBackends_DestHostStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=true"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDestHost}, 1, nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -384,7 +385,7 @@ func TestAddRemoveBackends_DestHostSanitizeRequest(t *testing.T) { backend1, _ := NewBackend(mockAgentConn(ctrl, "agent1", []string{"host=localhost&host=node1.mydomain.com&ipv4=1.2.3.4&ipv6=9878::7675:1292:9183:7562"})) backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDestHost}, 1, nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -408,7 +409,7 @@ func TestAddRemoveBackends_DestHostWithDefault(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=false"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDestHost, proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -461,7 +462,7 @@ func TestAddRemoveBackends_DestHostWithDuplicateIdents(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"host=localhost&host=node1.mydomain.com&ipv4=1.2.3.4&ipv6=9878::7675:1292:9183:7562"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=localhost&host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDestHost, proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -518,7 +519,7 @@ func TestEstablishedConnsMetric(t *testing.T) { agent2ConnID2 := new(ProxyClientConnection) agent3ConnID1 := new(ProxyClientConnection) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) assertEstablishedConnsMetric(t, 1) p.addEstablished("agent1", int64(2), agent1ConnID2) @@ -550,7 +551,7 @@ func TestRemoveEstablishedForBackendConn(t *testing.T) { agent2ConnID1 := &ProxyClientConnection{backend: backend2} agent2ConnID2 := &ProxyClientConnection{backend: backend2} agent3ConnID1 := &ProxyClientConnection{backend: backend3} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -581,7 +582,7 @@ func TestRemoveEstablishedForStream(t *testing.T) { agent2ConnID1 := &ProxyClientConnection{backend: backend2, frontend: &GrpcFrontend{streamUID: streamUID}} agent2ConnID2 := &ProxyClientConnection{backend: backend2} agent3ConnID1 := &ProxyClientConnection{backend: backend3, frontend: &GrpcFrontend{streamUID: streamUID}} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -615,14 +616,17 @@ func prepareFrontendConn(ctrl *gomock.Controller) *agentmock.MockAgentService_Co return frontendConn } -func prepareAgentConnMD(t testing.TB, ctrl *gomock.Controller, proxyServer *ProxyServer) (*agentmock.MockAgentService_ConnectServer, *Backend) { +func prepareAgentConnMD(t testing.TB, ctrl *gomock.Controller, proxyServer *ProxyServer, agentidentifiers []string) (*agentmock.MockAgentService_ConnectServer, *Backend) { t.Helper() + if agentidentifiers == nil { + agentidentifiers = []string{} + } // prepare the the connection to agent of proxy-server agentConn := agentmock.NewMockAgentService_ConnectServer(ctrl) agentConnMD := metadata.MD{ ":authority": []string{"127.0.0.1:8091"}, "agentid": []string{uuid.New().String()}, - "agentidentifiers": []string{}, + "agentidentifiers": agentidentifiers, "content-type": []string{"application/grpc"}, "user-agent": []string{"grpc-go/1.42.0"}, } @@ -641,7 +645,7 @@ func baseServerProxyTestWithoutBackend(t *testing.T, validate func(*agentmock.Mo defer ctrl.Finish() frontendConn := prepareFrontendConn(ctrl) - proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + proxyServer := NewProxyServer(uuid.New().String(), []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) validate(frontendConn) @@ -655,9 +659,9 @@ func baseServerProxyTestWithBackend(t *testing.T, validate func(*agentmock.MockA frontendConn := prepareFrontendConn(ctrl) // prepare proxy server - proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + proxyServer := NewProxyServer(uuid.New().String(), []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) - agentConn, _ := prepareAgentConnMD(t, ctrl, proxyServer) + agentConn, _ := prepareAgentConnMD(t, ctrl, proxyServer, nil) validate(frontendConn, agentConn) @@ -861,16 +865,37 @@ func TestReadyBackendsMetric(t *testing.T) { metrics.Metrics.Reset() - p := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + p := NewProxyServer(uuid.New().String(), []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) assertReadyBackendsMetric(t, 0) - _, backend := prepareAgentConnMD(t, ctrl, p) + _, backend := prepareAgentConnMD(t, ctrl, p, nil) assertReadyBackendsMetric(t, 1) p.removeBackend(backend) assertReadyBackendsMetric(t, 0) } +func TestTotalReadyBackendsMetric(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + metrics.Metrics.Reset() + + p := NewProxyServer(uuid.New().String(), []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault, proxystrategies.ProxyStrategyDestHost}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + assertTotalReadyBackendsMetric(t, map[string]int{proxystrategies.ProxyStrategyDefault.String(): 0, proxystrategies.ProxyStrategyDestHost.String(): 0}) + + _, backend1 := prepareAgentConnMD(t, ctrl, p, nil) + assertTotalReadyBackendsMetric(t, map[string]int{proxystrategies.ProxyStrategyDefault.String(): 1, proxystrategies.ProxyStrategyDestHost.String(): 0}) + + // Add a backend with IPv4 agent identifier. + _, backend2 := prepareAgentConnMD(t, ctrl, p, []string{"host=localhost"}) + assertTotalReadyBackendsMetric(t, map[string]int{proxystrategies.ProxyStrategyDefault.String(): 2, proxystrategies.ProxyStrategyDestHost.String(): 1}) + + p.removeBackend(backend1) + p.removeBackend(backend2) + assertTotalReadyBackendsMetric(t, map[string]int{proxystrategies.ProxyStrategyDefault.String(): 0, proxystrategies.ProxyStrategyDestHost.String(): 0}) +} + func dialReqPkt(dialID int64) *client.Packet { return &client.Packet{ Type: client.PacketType_DIAL_REQ, @@ -932,6 +957,13 @@ func assertReadyBackendsMetric(t testing.TB, expect int) { } } +func assertTotalReadyBackendsMetric(t testing.TB, expect map[string]int) { + t.Helper() + if err := metricstest.DefaultTester.ExpectServerTotalReadyBackends(expect); err != nil { + t.Errorf("Expected %s metric for each proxy strategy %+v, but got error: %v", "ready_backend_connections_total", expect, err) + } +} + func dialClosePkt(dialID int64) *client.Packet { return &client.Packet{ Type: client.PacketType_DIAL_CLS, diff --git a/pkg/testing/metrics/metrics.go b/pkg/testing/metrics/metrics.go index 664b127fd..9f97cb407 100644 --- a/pkg/testing/metrics/metrics.go +++ b/pkg/testing/metrics/metrics.go @@ -42,6 +42,11 @@ const ( # TYPE konnectivity_network_proxy_server_ready_backend_connections gauge` serverReadyBackendsSample = `konnectivity_network_proxy_server_ready_backend_connections{} %d` + serverTotalReadyBackendsHeader = ` +# HELP konnectivity_network_proxy_server_ready_backend_connections_total Total number of konnectivity agent connected to the proxy server +# TYPE konnectivity_network_proxy_server_ready_backend_connections_total gauge` + serverTotalReadyBackendsSample = `konnectivity_network_proxy_server_ready_backend_connections_total{proxy_strategy="%s"} %d` + serverEstablishedConnsHeader = ` # HELP konnectivity_network_proxy_server_established_connections Current number of established end-to-end connections (post-dial). # TYPE konnectivity_network_proxy_server_established_connections gauge` @@ -106,6 +111,14 @@ func (t *Tester) ExpectServerReadyBackends(v int) error { return t.ExpectMetric(server.Namespace, server.Subsystem, "ready_backend_connections", expect) } +func (t *Tester) ExpectServerTotalReadyBackends(expected map[string]int) error { + expect := serverTotalReadyBackendsHeader + "\n" + for proxyStrategy, numOfBackends := range expected { + expect += fmt.Sprintf(serverTotalReadyBackendsSample+"\n", proxyStrategy, numOfBackends) + } + return t.ExpectMetric(server.Namespace, server.Subsystem, "ready_backend_connections_total", expect) +} + func (t *Tester) ExpectServerEstablishedConns(v int) error { expect := serverEstablishedConnsHeader + "\n" expect += fmt.Sprintf(serverEstablishedConnsSample+"\n", v)