Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions pkg/clusters/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
"k8s.io/component-base/featuregate"
Expand Down Expand Up @@ -118,9 +117,6 @@ type ClusterInfo struct {
// server Cluster
Cluster string

// serverNames are used to route requests with different hostnames
serverNames sync.Map

// global rate limiter type
globalRateLimiter string

Expand All @@ -144,9 +140,9 @@ type ClusterInfo struct {
currentLoggingConfig atomic.Value
featuregate featuregate.MutableFeatureGate

healthCheckIntervalSeconds time.Duration
endpointHeathCheck EndpointHealthCheck
skipSyncEndpoints bool
healthCheckInterval time.Duration
endpointHeathCheck EndpointHealthCheck
skipSyncEndpoints bool
}

type secureServingConfig struct {
Expand All @@ -170,18 +166,18 @@ func NewEmptyClusterInfo(clusterName string, config *rest.Config, healthCheck En
limiter := gatewayflowcontrol.NewUpstreamLimiter(ctx, clusterName, "", clientSets)

info := &ClusterInfo{
ctx: ctx,
cancel: cancel,
Cluster: clusterName,
restConfig: config,
Endpoints: &EndpointInfoMap{data: sync.Map{}},
healthCheckIntervalSeconds: 5 * time.Second,
globalRateLimiter: rateLimiter,
flowcontrol: limiter,
loadbalancer: sync.Map{},
endpointHeathCheck: healthCheck,
skipSyncEndpoints: skipEndpoints,
featuregate: features.DefaultMutableFeatureGate.DeepCopy(),
ctx: ctx,
cancel: cancel,
Cluster: clusterName,
restConfig: config,
Endpoints: &EndpointInfoMap{data: sync.Map{}},
healthCheckInterval: 5 * time.Second,
globalRateLimiter: rateLimiter,
flowcontrol: limiter,
loadbalancer: sync.Map{},
endpointHeathCheck: healthCheck,
skipSyncEndpoints: skipEndpoints,
featuregate: features.DefaultMutableFeatureGate.DeepCopy(),
}
return info
}
Expand Down Expand Up @@ -506,18 +502,13 @@ func (c *ClusterInfo) addOrUpdateEndpoint(endpoint string, disabled bool) error
info, ok := c.Endpoints.Load(endpoint)
if ok {
info.SetDisabled(disabled)
EnsureGatewayHealthCheck(info, c.healthCheckIntervalSeconds, info.ctx)
EnsureGatewayHealthCheck(info, c.healthCheckInterval, info.ctx)
return nil
}

http2configCopy := *c.restConfig
http2configCopy.WrapTransport = transport.NewDynamicImpersonatingRoundTripper
http2configCopy.Host = endpoint
ts, err := rest.TransportFor(&http2configCopy)
if err != nil {
klog.Errorf("failed to create http2 transport for <cluster:%s,endpoint:%s>, err: %v", c.Cluster, endpoint, err)
return err
}

// since http2 doesn't support websocket, we need to disable http2 when using websocket
upgradeConfigCopy := http2configCopy
Expand All @@ -532,12 +523,6 @@ func (c *ClusterInfo) addOrUpdateEndpoint(endpoint string, disabled bool) error
klog.Errorf("failed to convert transport to proxy.UpgradeRequestRoundTripper for <cluster:%s,endpoint:%s>", c.Cluster, endpoint)
}

client, err := kubernetes.NewForConfig(&http2configCopy)
if err != nil {
klog.Errorf("failed to create clientset for <cluster:%s,endpoint:%s>, err: %v", c.Cluster, endpoint, err)
return err
}

// initial endpoint status
initStatus := endpointStatus{
Disabled: disabled,
Expand All @@ -550,19 +535,21 @@ func (c *ClusterInfo) addOrUpdateEndpoint(endpoint string, disabled bool) error
cancel: cancel,
Cluster: c.Cluster,
Endpoint: endpoint,
status: initStatus,
status: &initStatus,
proxyConfig: &http2configCopy,
ProxyTransport: ts,
proxyUpgradeConfig: &upgradeConfigCopy,
PorxyUpgradeTransport: urrt,
clientset: client,
healthCheckFun: c.endpointHeathCheck,
}
if err := info.ResetTransport(); err != nil {
klog.Errorf("failed to init transport for <cluster:%s,endpoint:%s>, err: %v", c.Cluster, endpoint, err)
return err
}

klog.Infof("[cluster info] new endpoint added, cluster=%q, endpoint=%q", c.Cluster, info.Endpoint)
c.Endpoints.Store(endpoint, info)

EnsureGatewayHealthCheck(info, c.healthCheckIntervalSeconds, info.ctx)
EnsureGatewayHealthCheck(info, c.healthCheckInterval, info.ctx)

return nil
}
Expand Down
112 changes: 104 additions & 8 deletions pkg/clusters/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ package clusters
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kts "k8s.io/client-go/transport"
"k8s.io/klog"

"github.com/kubewharf/kubegateway/pkg/gateway/metrics"
"github.com/kubewharf/kubegateway/pkg/transport"
)

type endpointStatus struct {
Healthy bool
Reason string
Message string
Disabled bool
mux sync.RWMutex
Healthy bool
Reason string
Message string
Disabled bool
UnhealthyCount int
mux sync.RWMutex
}

func (s *endpointStatus) IsReady() bool {
Expand All @@ -43,6 +48,12 @@ func (s *endpointStatus) IsReady() bool {
return !s.Disabled && s.Healthy
}

func (s *endpointStatus) GetUnhealthyCount() int {
s.mux.RLock()
defer s.mux.RUnlock()
return s.UnhealthyCount
}

func (s *endpointStatus) SetDisabled(disabled bool) {
s.mux.Lock()
defer s.mux.Unlock()
Expand All @@ -55,6 +66,11 @@ func (s *endpointStatus) SetStatus(healthy bool, reason, message string) {
s.Healthy = healthy
s.Reason = reason
s.Message = message
if healthy {
s.UnhealthyCount = 0
} else {
s.UnhealthyCount++
}
}

type EndpointInfo struct {
Expand All @@ -71,9 +87,10 @@ type EndpointInfo struct {
// http1 proxy round tripper for websocket
PorxyUpgradeTransport proxy.UpgradeRequestRoundTripper

clientset kubernetes.Interface
clientset kubernetes.Interface
cancelableTs *transport.CancelableTransport

status endpointStatus
status *endpointStatus

healthCheckFun EndpointHealthCheck
healthCheckCh chan struct{}
Expand All @@ -89,6 +106,80 @@ func (e *EndpointInfo) Clientset() kubernetes.Interface {
return e.clientset
}

func (e *EndpointInfo) createTransport() (*transport.CancelableTransport, http.RoundTripper, *kubernetes.Clientset, error) {
ts, err := newTransport(e.proxyConfig)
if err != nil {
klog.Errorf("failed to create http2 transport for <cluster:%s,endpoint:%s>, err: %v", e.Cluster, e.Endpoint, err)
return nil, nil, nil, err
}
cancelableTs := transport.NewCancelableTransport(ts)
ts = cancelableTs

proxyTs, err := rest.HTTPWrappersForConfig(e.proxyConfig, ts)
if err != nil {
klog.Errorf("failed to wrap http2 transport for <cluster:%s,endpoint:%s>, err: %v", e.Cluster, e.Endpoint, err)
return nil, nil, nil, err
}

clientsetConfig := *e.proxyConfig
clientsetConfig.Transport = ts // let client set use the same transport as proxy
clientsetConfig.TLSClientConfig = rest.TLSClientConfig{}
client, err := kubernetes.NewForConfig(&clientsetConfig)
if err != nil {
klog.Errorf("failed to create clientset for <cluster:%s,endpoint:%s>, err: %v", e.Cluster, e.Endpoint, err)
return nil, nil, nil, err
}

return cancelableTs, proxyTs, client, nil
}

func (e *EndpointInfo) ResetTransport() error {
cancelableTs, ts, client, err := e.createTransport()
if err != nil {
return err
}
klog.Infof("set new transport %p for cluster %s endpoint: %s", cancelableTs, e.Cluster, e.Endpoint)
e.ProxyTransport = ts
e.clientset = client
cancelTs := e.cancelableTs
e.cancelableTs = cancelableTs
if cancelTs != nil {
klog.Infof("close transport %p for cluster %s endpoint: %s", cancelTs, e.Cluster, e.Endpoint)
cancelTs.Close()
}
return nil
}

func newTransport(cfg *rest.Config) (http.RoundTripper, error) {
config, err := cfg.TransportConfig()
if err != nil {
return nil, err
}
tlsConfig, err := kts.TLSConfigFor(config)
if err != nil {
return nil, err
}
// The options didn't require a custom TLS config
if tlsConfig == nil && config.Dial == nil {
return http.DefaultTransport, nil
}
dial := config.Dial
if dial == nil {
dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
}
return utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
MaxIdleConnsPerHost: 25,
DialContext: dial,
DisableCompression: config.DisableCompression,
}), nil
}

func (e *EndpointInfo) SetDisabled(disabled bool) {
if e.status.Disabled != disabled {
e.status.Disabled = disabled
Expand All @@ -100,13 +191,18 @@ func (e *EndpointInfo) IstDisabled() bool {
return e.status.Disabled
}

func (e *EndpointInfo) GetUnhealthyCount() int {
return e.status.GetUnhealthyCount()
}

func (e *EndpointInfo) UpdateStatus(healthy bool, reason, message string) {
if !healthy {
metrics.RecordUnhealthyUpstream(e.Cluster, e.Endpoint, reason)
}
e.status.SetStatus(healthy, reason, message)

if e.status.Healthy != healthy {
// healthy changed
e.status.SetStatus(healthy, reason, message)
e.recordStatusChange()
}
}
Expand Down
39 changes: 35 additions & 4 deletions pkg/clusters/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
func TestEndpointInfo_ReadyAndReason(t *testing.T) {
tests := []struct {
name string
status endpointStatus
status *endpointStatus
wantReady bool
want string
}{
{
"ready",
endpointStatus{
&endpointStatus{
Disabled: false,
Healthy: true,
},
Expand All @@ -36,7 +36,7 @@ func TestEndpointInfo_ReadyAndReason(t *testing.T) {
},
{
"disabled",
endpointStatus{
&endpointStatus{
Disabled: true,
Healthy: true,
},
Expand All @@ -45,7 +45,7 @@ func TestEndpointInfo_ReadyAndReason(t *testing.T) {
},
{
"unhealthy",
endpointStatus{
&endpointStatus{
Disabled: false,
Healthy: false,
Reason: "Timeout",
Expand All @@ -71,3 +71,34 @@ func TestEndpointInfo_ReadyAndReason(t *testing.T) {
})
}
}

func TestEndpointInfo_UnhealthyCount(t *testing.T) {
e := &EndpointInfo{
Endpoint: "",
status: &endpointStatus{
Disabled: true,
Healthy: true,
},
}
if e.GetUnhealthyCount() != 0 {
t.Errorf("unhealthy count should be 0, actual: %d", e.GetUnhealthyCount())
}

for i := 0; i < 2; i++ {
e.UpdateStatus(false, "mock error", "mock error message")
if e.GetUnhealthyCount() != i+1 {
t.Errorf("unhealthy count should be %d, actual: %d", i+1, e.GetUnhealthyCount())
}
}
e.UpdateStatus(true, "", "")
if e.GetUnhealthyCount() != 0 {
t.Errorf("unhealthy count should be 0, actual: %d", e.GetUnhealthyCount())
}

for i := 0; i < 5; i++ {
e.UpdateStatus(false, "mock error", "mock error message")
if e.GetUnhealthyCount() != i+1 {
t.Errorf("unhealthy count should be %d, actual: %d", i+1, e.GetUnhealthyCount())
}
}
}
11 changes: 11 additions & 0 deletions pkg/gateway/controllers/upstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,5 +360,16 @@ func GatewayHealthCheck(e *clusters.EndpointInfo) (done bool) {
}
klog.Errorf("upstream health check failed, cluster=%q endpoint=%q reason=%q message=%q", e.Cluster, e.Endpoint, reason, message)
e.UpdateStatus(false, reason, message)

const ResetTransportThreshold = 3
if err != nil &&
strings.Contains(err.Error(), "Client.Timeout or context cancellation while reading body") &&
e.GetUnhealthyCount() >= ResetTransportThreshold {
klog.Warningf("transport to endpoint %s hang", e.Endpoint)
if err := e.ResetTransport(); err != nil {
klog.Warningf("reset transport to endpoint %s error: %v", e.Endpoint, err)
}
}

return done
}
Loading