diff --git a/docs/examples/affinity/cookie/README.md b/docs/examples/affinity/cookie/README.md index e66503dd75..3133a0518d 100644 --- a/docs/examples/affinity/cookie/README.md +++ b/docs/examples/affinity/cookie/README.md @@ -9,7 +9,7 @@ Session affinity can be configured using the following annotations: |Name|Description|Value| | --- | --- | --- | |nginx.ingress.kubernetes.io/affinity|Type of the affinity, set this to `cookie` to enable session affinity|string (NGINX only supports `cookie`)| -|nginx.ingress.kubernetes.io/affinity-mode|The affinity mode defines how sticky a session is. Use `balanced` to redistribute some sessions when scaling pods or `persistent` for maximum stickiness.|`balanced` (default) or `persistent`| +|nginx.ingress.kubernetes.io/affinity-mode|The affinity mode defines how sticky a session is. Use `balanced` to redistribute some sessions when scaling pods. Use `persistent` to persist sessions until pods receive a deletion timestamp. Use `persistent-drainable` to persist sessions until after a pod gracefully handles its `preStop` lifecycle hook.|`balanced` (default), `persistent`, or `persistent-drainable`| |nginx.ingress.kubernetes.io/affinity-canary-behavior|Defines session affinity behavior of canaries. By default the behavior is `sticky`, and canaries respect session affinity configuration. Set this to `legacy` to restore original canary behavior, when session affinity parameters were not respected.|`sticky` (default) or `legacy`| |nginx.ingress.kubernetes.io/session-cookie-name|Name of the cookie that will be created|string (defaults to `INGRESSCOOKIE`)| |nginx.ingress.kubernetes.io/session-cookie-secure|Set the cookie as secure regardless the protocol of the incoming request|`"true"` or `"false"`| diff --git a/docs/user-guide/nginx-configuration/annotations.md b/docs/user-guide/nginx-configuration/annotations.md index 82ad076626..9ab6d4bf10 100755 --- a/docs/user-guide/nginx-configuration/annotations.md +++ b/docs/user-guide/nginx-configuration/annotations.md @@ -17,7 +17,7 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz |---------------------------|------| |[nginx.ingress.kubernetes.io/app-root](#rewrite)|string| |[nginx.ingress.kubernetes.io/affinity](#session-affinity)|cookie| -|[nginx.ingress.kubernetes.io/affinity-mode](#session-affinity)|"balanced" or "persistent"| +|[nginx.ingress.kubernetes.io/affinity-mode](#session-affinity)|"balanced" or "persistent" or "persistent-drainable"| |[nginx.ingress.kubernetes.io/affinity-canary-behavior](#session-affinity)|"sticky" or "legacy"| |[nginx.ingress.kubernetes.io/auth-realm](#authentication)|string| |[nginx.ingress.kubernetes.io/auth-secret](#authentication)|string| @@ -173,7 +173,19 @@ If the Application Root is exposed in a different path and needs to be redirecte The annotation `nginx.ingress.kubernetes.io/affinity` enables and sets the affinity type in all Upstreams of an Ingress. This way, a request will always be directed to the same upstream server. The only affinity type available for NGINX is `cookie`. -The annotation `nginx.ingress.kubernetes.io/affinity-mode` defines the stickiness of a session. Setting this to `balanced` (default) will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. Setting this to `persistent` will not rebalance sessions to new servers, therefore providing maximum stickiness. +The annotation `nginx.ingress.kubernetes.io/affinity-mode` defines the stickiness of a session. + +- `balanced` (default) + + Setting this to `balanced` will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. + +- `persistent` + + Setting this to `persistent` will not rebalance sessions to new servers, therefore providing greater stickiness. Sticky sessions will continue to be routed to the same server as long as its [Endpoint's condition](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#conditions) remains `Ready`. If the Endpoint stops being `Ready`, such when a server pod receives a deletion timestamp, sessions will be rebalanced to another server. + +- `persistent-drainable` + + Setting this to `persistent-drainable` behaves like `persistent`, but sticky sessions will continue to be routed to the same server as long as its [Endpoint's condition](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#conditions) remains `Serving`, even after the server pod receives a deletion timestamp. This allows graceful session draining during the `preStop` lifecycle hook. New sessions will *not* be directed to these draining servers and will only be routed to a server whose Endpoint is `Ready`, except potentially when all servers are draining. The annotation `nginx.ingress.kubernetes.io/affinity-canary-behavior` defines the behavior of canaries when session affinity is enabled. Setting this to `sticky` (default) will ensure that users that were served by canaries, will continue to be served by canaries. Setting this to `legacy` will restore original canary behavior, when session affinity was ignored. diff --git a/internal/ingress/annotations/annotations_test.go b/internal/ingress/annotations/annotations_test.go index 5df3cdc0ee..b4c7c09acc 100644 --- a/internal/ingress/annotations/annotations_test.go +++ b/internal/ingress/annotations/annotations_test.go @@ -197,6 +197,7 @@ func TestAffinitySession(t *testing.T) { }{ {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "balanced", annotationAffinityCookieName: "route", annotationAffinityCanaryBehavior: ""}, "cookie", "balanced", "route", ""}, {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "persistent", annotationAffinityCookieName: "route1", annotationAffinityCanaryBehavior: "sticky"}, "cookie", "persistent", "route1", "sticky"}, + {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "persistent-drainable", annotationAffinityCookieName: "route1", annotationAffinityCanaryBehavior: "sticky"}, "cookie", "persistent-drainable", "route1", "sticky"}, {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "balanced", annotationAffinityCookieName: "", annotationAffinityCanaryBehavior: "legacy"}, "cookie", "balanced", "INGRESSCOOKIE", "legacy"}, {map[string]string{}, "", "", "", ""}, {nil, "", "", "", ""}, diff --git a/internal/ingress/annotations/sessionaffinity/main.go b/internal/ingress/annotations/sessionaffinity/main.go index bee4a20943..324c746e5b 100644 --- a/internal/ingress/annotations/sessionaffinity/main.go +++ b/internal/ingress/annotations/sessionaffinity/main.go @@ -77,12 +77,13 @@ var sessionAffinityAnnotations = parser.Annotation{ Documentation: `This annotation enables and sets the affinity type in all Upstreams of an Ingress. This way, a request will always be directed to the same upstream server. The only affinity type available for NGINX is cookie`, }, annotationAffinityMode: { - Validator: parser.ValidateOptions([]string{"balanced", "persistent"}, true, true), + Validator: parser.ValidateOptions([]string{"balanced", "persistent", "persistent-drainable"}, true, true), Scope: parser.AnnotationScopeIngress, Risk: parser.AnnotationRiskMedium, Documentation: `This annotation defines the stickiness of a session. Setting this to balanced (default) will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. - Setting this to persistent will not rebalance sessions to new servers, therefore providing maximum stickiness.`, + Setting this to persistent will not rebalance sessions to new servers, therefore providing greater stickiness. Sticky sessions will continue to be routed to the same server as long as its Endpoint's condition remains Ready. If the Endpoint stops being Ready, such when a server pod receives a deletion timestamp, sessions will be rebalanced to another server. + Setting this to persistent-drainable behaves like persistent, but sticky sessions will continue to be routed to the same server as long as its Endpoint's condition remains Serving, even after the server pod receives a deletion timestamp. This allows graceful session draining during the preStop lifecycle hook. New sessions will *not* be directed to these draining servers and will only be routed to a server whose Endpoint is Ready, except potentially when all servers are draining.`, }, annotationAffinityCanaryBehavior: { Validator: parser.ValidateOptions([]string{"sticky", "legacy"}, true, true), diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 0894c0dfc1..06b670e4b4 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -536,7 +536,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Name == svcPort { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) break } } @@ -548,7 +548,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr //nolint:gosec // Ignore G109 error if sp.Port == int32(targetPort) { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) break } } @@ -605,7 +605,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { } else { zone = emptyZone } - endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint", svcKey) endps = []ingress.Endpoint{n.DefaultEndpoint()} @@ -940,7 +940,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in } else { zone = emptyZone } - endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) // custom backend is valid only if contains at least one endpoint if len(endps) > 0 { name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName()) @@ -1050,7 +1050,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B if len(upstreams[defBackend].Endpoints) == 0 { _, port := upstreamServiceNameAndPort(ing.Spec.DefaultBackend.Service) - endps, err := n.serviceEndpoints(svcKey, port.String()) + endps, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints) upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...) if err != nil { klog.Warningf("Error creating upstream %q: %v", defBackend, err) @@ -1114,8 +1114,13 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B } if len(upstreams[name].Endpoints) == 0 { + epSelectionMode := ReadyEndpoints + if anns.SessionAffinity.Mode == "persistent-drainable" { + epSelectionMode = ServingEndpoints + } + _, port := upstreamServiceNameAndPort(path.Backend.Service) - endp, err := n.serviceEndpoints(svcKey, port.String()) + endp, err := n.serviceEndpoints(svcKey, port.String(), epSelectionMode) if err != nil { klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err) n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoService) @@ -1127,6 +1132,10 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoEndpoint) } else { n.metricCollector.DecOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoEndpoint) + + if allEndpointsAreDraining(endp) { + klog.Warningf("All Endpoints for Service %q are draining.", svcKey) + } } upstreams[name].Endpoints = endp } @@ -1184,7 +1193,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *netw } // serviceEndpoints returns the upstream servers (Endpoints) associated with a Service. -func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingress.Endpoint, error) { +func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, epSelectionMode EndpointSelectionMode) ([]ingress.Endpoint, error) { var upstreams []ingress.Endpoint svc, err := n.store.GetService(svcKey) @@ -1205,7 +1214,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres return upstreams, nil } servicePort := externalNamePorts(backendPort, svc) - endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) return upstreams, nil @@ -1221,7 +1230,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres if strconv.Itoa(int(servicePort.Port)) == backendPort || servicePort.TargetPort.String() == backendPort || servicePort.Name == backendPort { - endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) } @@ -1903,3 +1912,12 @@ func newTrafficShapingPolicy(cfg *canary.Config) ingress.TrafficShapingPolicy { Cookie: cfg.Cookie, } } + +func allEndpointsAreDraining(eps []ingress.Endpoint) bool { + for _, ep := range eps { + if !ep.IsDraining { + return false + } + } + return true +} diff --git a/internal/ingress/controller/endpointslices.go b/internal/ingress/controller/endpointslices.go index 1b8a3b571a..9728c3a90c 100644 --- a/internal/ingress/controller/endpointslices.go +++ b/internal/ingress/controller/endpointslices.go @@ -34,9 +34,16 @@ import ( "k8s.io/ingress-nginx/pkg/apis/ingress" ) +type EndpointSelectionMode int + +const ( + ReadyEndpoints EndpointSelectionMode = iota + ServingEndpoints +) + // getEndpointsFromSlices returns a list of Endpoint structs for a given service/target port combination. func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, zoneForHints string, - getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error), + epSelectionMode EndpointSelectionMode, getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error), ) []ingress.Endpoint { upsServers := []ingress.Endpoint{} @@ -153,9 +160,19 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c } for _, ep := range eps.Endpoints { - if (ep.Conditions.Ready != nil) && !(*ep.Conditions.Ready) { - continue + epIsReady := (ep.Conditions.Ready == nil) || *ep.Conditions.Ready + if epSelectionMode == ReadyEndpoints { + if !epIsReady { + continue + } + } else { + // assume epSelectionMode == ServingEndpoints. + epIsServing := (ep.Conditions.Serving == nil) || *ep.Conditions.Serving + if !epIsServing { + continue + } } + epHasZone := false if useTopologyHints { for _, epzone := range ep.Hints.ForZones { @@ -176,10 +193,12 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c if _, exists := processedUpstreamServers[hostPort]; exists { continue } + ups := ingress.Endpoint{ - Address: epAddress, - Port: fmt.Sprintf("%v", epPort), - Target: ep.TargetRef, + Address: epAddress, + Port: fmt.Sprintf("%v", epPort), + Target: ep.TargetRef, + IsDraining: !epIsReady, } upsServers = append(upsServers, ups) processedUpstreamServers[hostPort] = struct{}{} diff --git a/internal/ingress/controller/endpointslices_test.go b/internal/ingress/controller/endpointslices_test.go index ca29a4dd3b..7021f1f7b4 100644 --- a/internal/ingress/controller/endpointslices_test.go +++ b/internal/ingress/controller/endpointslices_test.go @@ -32,13 +32,14 @@ import ( //nolint:dupl // Ignore dupl errors for similar test case func TestGetEndpointsFromSlices(t *testing.T) { tests := []struct { - name string - svc *corev1.Service - port *corev1.ServicePort - proto corev1.Protocol - zone string - fn func(string) ([]*discoveryv1.EndpointSlice, error) - result []ingress.Endpoint + name string + svc *corev1.Service + port *corev1.ServicePort + proto corev1.Protocol + zone string + epSelectionMode EndpointSelectionMode + fn func(string) ([]*discoveryv1.EndpointSlice, error) + result []ingress.Endpoint }{ { "no service should return 0 endpoint", @@ -46,6 +47,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { nil, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -57,6 +59,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { nil, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -68,6 +71,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -83,6 +87,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -108,6 +113,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -133,6 +139,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -158,6 +165,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -188,6 +196,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -218,6 +227,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -243,6 +253,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, fmt.Errorf("unexpected error") }, @@ -268,6 +279,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -312,6 +324,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -321,7 +334,106 @@ func TestGetEndpointsFromSlices(t *testing.T) { { Addresses: []string{"1.1.1.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: &[]bool{false}[0], + Ready: &[]bool{false}[0], + Serving: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{}, + }, + { + "should return a draining endpoint when requesting serving endpoints and the endpoint is not ready", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromInt(80), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "default", + TargetPort: intstr.FromInt(80), + }, + corev1.ProtocolTCP, + "", + ServingEndpoints, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{false}[0], + Serving: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + IsDraining: true, + }, + }, + }, + { + "should return no endpoint when requesting serving endpoints and none are serving", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromInt(80), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "default", + TargetPort: intstr.FromInt(80), + }, + corev1.ProtocolTCP, + "", + ServingEndpoints, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{false}[0], + Serving: &[]bool{false}[0], }, }, }, @@ -356,6 +468,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -400,6 +513,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -449,6 +563,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -498,6 +613,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -573,6 +689,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -644,6 +761,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -698,6 +816,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -774,6 +893,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -854,6 +974,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -939,6 +1060,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -1016,6 +1138,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -1084,7 +1207,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { - result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.zone, testCase.fn) + result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.zone, testCase.epSelectionMode, testCase.fn) if len(testCase.result) != len(result) { t.Errorf("Expected %d Endpoints but got %d", len(testCase.result), len(result)) } diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 20fad5afb8..db69c24916 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -970,6 +970,7 @@ func configureBackends(rawBackends []*ingress.Backend) error { endpoints = append(endpoints, ingress.Endpoint{ Address: endpoint.Address, Port: endpoint.Port, + IsDraining: endpoint.IsDraining, }) } diff --git a/pkg/apis/ingress/types.go b/pkg/apis/ingress/types.go index ccdd49fe92..41c356d5d6 100644 --- a/pkg/apis/ingress/types.go +++ b/pkg/apis/ingress/types.go @@ -181,6 +181,8 @@ type Endpoint struct { Port string `json:"port"` // Target returns a reference to the object providing the endpoint Target *apiv1.ObjectReference `json:"target,omitempty"` + // True if this endpoint is not considered Ready and should not accept new persistent sessions + IsDraining bool `json:"isDraining,omitempty"` } // Server describes a website diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index f06aadf795..57e08ab847 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -47,7 +47,8 @@ local function get_implementation(backend) if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then - if backend["sessionAffinityConfig"]["mode"] == "persistent" then + local mode = backend["sessionAffinityConfig"]["mode"] + if mode == "persistent" or mode == "persistent-drainable" then name = "sticky_persistent" else name = "sticky_balanced" diff --git a/rootfs/etc/nginx/lua/balancer/sticky.lua b/rootfs/etc/nginx/lua/balancer/sticky.lua index 9d0a541169..4fc5693a18 100644 --- a/rootfs/etc/nginx/lua/balancer/sticky.lua +++ b/rootfs/etc/nginx/lua/balancer/sticky.lua @@ -3,6 +3,7 @@ local ck = require("resty.cookie") local ngx_balancer = require("ngx.balancer") local split = require("util.split") local same_site = require("util.same_site") +local util = require("util") local ngx = ngx local pairs = pairs @@ -24,7 +25,8 @@ function _M.new(self) alternative_backends = nil, cookie_session_affinity = nil, traffic_shaping_policy = nil, - backend_key = nil + backend_key = nil, + draining_endpoints = nil } setmetatable(o, self) @@ -129,14 +131,20 @@ function _M.get_last_failure() return ngx_balancer.get_last_failure() end -local function get_failed_upstreams() +local function get_upstreams_to_exclude(self) local indexed_upstream_addrs = {} - local upstream_addrs = split.split_upstream_var(ngx.var.upstream_addr) or {} + local failed_upstream_addrs = split.split_upstream_var(ngx.var.upstream_addr) or {} - for _, addr in ipairs(upstream_addrs) do + for _, addr in ipairs(failed_upstream_addrs) do indexed_upstream_addrs[addr] = true end + if self.draining_endpoints then + for addr, _ in pairs(self.draining_endpoints) do + indexed_upstream_addrs[addr] = true + end + end + return indexed_upstream_addrs end @@ -188,7 +196,7 @@ function _M.balance(self) local new_upstream - new_upstream, key = self:pick_new_upstream(get_failed_upstreams()) + new_upstream, key = self:pick_new_upstream(get_upstreams_to_exclude(self)) if not new_upstream then ngx.log(ngx.WARN, string.format("failed to get new upstream; using upstream %s", new_upstream)) elseif should_set_cookie(self) then @@ -202,6 +210,18 @@ function _M.sync(self, backend) -- reload balancer nodes balancer_resty.sync(self, backend) + self.draining_endpoints = nil + for _, endpoint in pairs(backend.endpoints) do + if endpoint.isDraining then + if not self.draining_endpoints then + self.draining_endpoints = {} + end + + local endpoint_string = util.get_endpoint_string(endpoint) + self.draining_endpoints[endpoint_string] = true + end + end + self.traffic_shaping_policy = backend.trafficShapingPolicy self.alternative_backends = backend.alternativeBackends self.cookie_session_affinity = backend.sessionAffinityConfig.cookieSessionAffinity diff --git a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua index 70723143b6..228bb6896b 100644 --- a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua @@ -73,7 +73,7 @@ describe("Sticky", function() end) local test_backend = get_test_backend() - local test_backend_endpoint= test_backend.endpoints[1].address .. ":" .. test_backend.endpoints[1].port + local test_backend_endpoint= util.get_endpoint_string(test_backend.endpoints[1]) local legacy_cookie_value = test_backend_endpoint local function create_current_cookie_value(backend_key) @@ -304,6 +304,68 @@ describe("Sticky", function() it("returns the correct endpoint for the client", function() test_correct_endpoint(sticky_balanced) end) it("returns the correct endpoint for the client", function() test_correct_endpoint(sticky_persistent) end) end) + + -- Note below that endpoints are only marked this way when persistent + describe("when an endpoint is draining", function() + it("persists client with cookie to endpoint", function() + local s = {} + local cookie_jar = {} + cookie.new = function(self) + local cookie_instance = { + set = function(self, ck) + cookie_jar[ck.key] = ck.value + return true, nil + end, + get = function(self, k) return cookie_jar[k] end, + } + s = spy.on(cookie_instance, "set") + return cookie_instance, false + end + + local b = get_test_backend() + b.sessionAffinityConfig.cookieSessionAffinity.locations = {} + b.sessionAffinityConfig.cookieSessionAffinity.locations["test.com"] = {"/"} + + local expectedPeer = b.endpoints[1] + + local sticky_balancer_instance = sticky_persistent:new(b) + local peer = sticky_balancer_instance:balance() + assert.equal(peer, test_backend_endpoint) + + expectedPeer.isDraining = true + sticky_balancer_instance:sync(b) + sticky_balancer_instance.TESTING = true + peer = sticky_balancer_instance:balance() + assert.equal(peer, test_backend_endpoint) + end) + + it("does not route client without cookie to endpoint", function() + local s = {} + local cookie_jar = {} + cookie.new = function(self) + local cookie_instance = { + set = function(self, ck) + cookie_jar[ck.key] = ck.value + return true, nil + end, + get = function(self, k) return cookie_jar[k] end, + } + s = spy.on(cookie_instance, "set") + return cookie_instance, false + end + + local b = get_test_backend() + b.sessionAffinityConfig.cookieSessionAffinity.locations = {} + b.sessionAffinityConfig.cookieSessionAffinity.locations["test.com"] = {"/"} + + local expectedPeer = b.endpoints[1] + expectedPeer.isDraining = true + + local sticky_balancer_instance = sticky_persistent:new(b) + local peer = sticky_balancer_instance:balance() + assert.equal(peer, nil) + end) + end) end) local function get_several_test_backends(change_on_failure) diff --git a/rootfs/etc/nginx/lua/util.lua b/rootfs/etc/nginx/lua/util.lua index 1e4cd7c017..856737ffb9 100644 --- a/rootfs/etc/nginx/lua/util.lua +++ b/rootfs/etc/nginx/lua/util.lua @@ -13,12 +13,16 @@ local re_gmatch = ngx.re.gmatch local _M = {} +function _M.get_endpoint_string(endpoint) + return endpoint.address .. ":" .. endpoint.port +end + function _M.get_nodes(endpoints) local nodes = {} local weight = 1 for _, endpoint in pairs(endpoints) do - local endpoint_string = endpoint.address .. ":" .. endpoint.port + local endpoint_string = _M.get_endpoint_string(endpoint) nodes[endpoint_string] = weight end