From e77e082f55367cfde1f3306e52f7fece0610a9d4 Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 14:53:43 -0500 Subject: [PATCH 1/7] Document target behavior for `persistent-drainable` --- docs/examples/affinity/cookie/README.md | 2 +- .../nginx-configuration/annotations.md | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/examples/affinity/cookie/README.md b/docs/examples/affinity/cookie/README.md index e66503dd75..3133a0518d 100644 --- a/docs/examples/affinity/cookie/README.md +++ b/docs/examples/affinity/cookie/README.md @@ -9,7 +9,7 @@ Session affinity can be configured using the following annotations: |Name|Description|Value| | --- | --- | --- | |nginx.ingress.kubernetes.io/affinity|Type of the affinity, set this to `cookie` to enable session affinity|string (NGINX only supports `cookie`)| -|nginx.ingress.kubernetes.io/affinity-mode|The affinity mode defines how sticky a session is. Use `balanced` to redistribute some sessions when scaling pods or `persistent` for maximum stickiness.|`balanced` (default) or `persistent`| +|nginx.ingress.kubernetes.io/affinity-mode|The affinity mode defines how sticky a session is. Use `balanced` to redistribute some sessions when scaling pods. Use `persistent` to persist sessions until pods receive a deletion timestamp. Use `persistent-drainable` to persist sessions until after a pod gracefully handles its `preStop` lifecycle hook.|`balanced` (default), `persistent`, or `persistent-drainable`| |nginx.ingress.kubernetes.io/affinity-canary-behavior|Defines session affinity behavior of canaries. By default the behavior is `sticky`, and canaries respect session affinity configuration. Set this to `legacy` to restore original canary behavior, when session affinity parameters were not respected.|`sticky` (default) or `legacy`| |nginx.ingress.kubernetes.io/session-cookie-name|Name of the cookie that will be created|string (defaults to `INGRESSCOOKIE`)| |nginx.ingress.kubernetes.io/session-cookie-secure|Set the cookie as secure regardless the protocol of the incoming request|`"true"` or `"false"`| diff --git a/docs/user-guide/nginx-configuration/annotations.md b/docs/user-guide/nginx-configuration/annotations.md index 82ad076626..9ab6d4bf10 100755 --- a/docs/user-guide/nginx-configuration/annotations.md +++ b/docs/user-guide/nginx-configuration/annotations.md @@ -17,7 +17,7 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz |---------------------------|------| |[nginx.ingress.kubernetes.io/app-root](#rewrite)|string| |[nginx.ingress.kubernetes.io/affinity](#session-affinity)|cookie| -|[nginx.ingress.kubernetes.io/affinity-mode](#session-affinity)|"balanced" or "persistent"| +|[nginx.ingress.kubernetes.io/affinity-mode](#session-affinity)|"balanced" or "persistent" or "persistent-drainable"| |[nginx.ingress.kubernetes.io/affinity-canary-behavior](#session-affinity)|"sticky" or "legacy"| |[nginx.ingress.kubernetes.io/auth-realm](#authentication)|string| |[nginx.ingress.kubernetes.io/auth-secret](#authentication)|string| @@ -173,7 +173,19 @@ If the Application Root is exposed in a different path and needs to be redirecte The annotation `nginx.ingress.kubernetes.io/affinity` enables and sets the affinity type in all Upstreams of an Ingress. This way, a request will always be directed to the same upstream server. The only affinity type available for NGINX is `cookie`. -The annotation `nginx.ingress.kubernetes.io/affinity-mode` defines the stickiness of a session. Setting this to `balanced` (default) will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. Setting this to `persistent` will not rebalance sessions to new servers, therefore providing maximum stickiness. +The annotation `nginx.ingress.kubernetes.io/affinity-mode` defines the stickiness of a session. + +- `balanced` (default) + + Setting this to `balanced` will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. + +- `persistent` + + Setting this to `persistent` will not rebalance sessions to new servers, therefore providing greater stickiness. Sticky sessions will continue to be routed to the same server as long as its [Endpoint's condition](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#conditions) remains `Ready`. If the Endpoint stops being `Ready`, such when a server pod receives a deletion timestamp, sessions will be rebalanced to another server. + +- `persistent-drainable` + + Setting this to `persistent-drainable` behaves like `persistent`, but sticky sessions will continue to be routed to the same server as long as its [Endpoint's condition](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#conditions) remains `Serving`, even after the server pod receives a deletion timestamp. This allows graceful session draining during the `preStop` lifecycle hook. New sessions will *not* be directed to these draining servers and will only be routed to a server whose Endpoint is `Ready`, except potentially when all servers are draining. The annotation `nginx.ingress.kubernetes.io/affinity-canary-behavior` defines the behavior of canaries when session affinity is enabled. Setting this to `sticky` (default) will ensure that users that were served by canaries, will continue to be served by canaries. Setting this to `legacy` will restore original canary behavior, when session affinity was ignored. From 19d1043e0f4ddb472a83daa90ad15a8183b57b6e Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 15:09:50 -0500 Subject: [PATCH 2/7] Wire up `persistent-drainable` annotation. This currently behaves exactly like `persistent` after this commit. --- internal/ingress/annotations/annotations_test.go | 1 + internal/ingress/annotations/sessionaffinity/main.go | 5 +++-- rootfs/etc/nginx/lua/balancer.lua | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/ingress/annotations/annotations_test.go b/internal/ingress/annotations/annotations_test.go index 5df3cdc0ee..b4c7c09acc 100644 --- a/internal/ingress/annotations/annotations_test.go +++ b/internal/ingress/annotations/annotations_test.go @@ -197,6 +197,7 @@ func TestAffinitySession(t *testing.T) { }{ {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "balanced", annotationAffinityCookieName: "route", annotationAffinityCanaryBehavior: ""}, "cookie", "balanced", "route", ""}, {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "persistent", annotationAffinityCookieName: "route1", annotationAffinityCanaryBehavior: "sticky"}, "cookie", "persistent", "route1", "sticky"}, + {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "persistent-drainable", annotationAffinityCookieName: "route1", annotationAffinityCanaryBehavior: "sticky"}, "cookie", "persistent-drainable", "route1", "sticky"}, {map[string]string{annotationAffinityType: "cookie", annotationAffinityMode: "balanced", annotationAffinityCookieName: "", annotationAffinityCanaryBehavior: "legacy"}, "cookie", "balanced", "INGRESSCOOKIE", "legacy"}, {map[string]string{}, "", "", "", ""}, {nil, "", "", "", ""}, diff --git a/internal/ingress/annotations/sessionaffinity/main.go b/internal/ingress/annotations/sessionaffinity/main.go index bee4a20943..324c746e5b 100644 --- a/internal/ingress/annotations/sessionaffinity/main.go +++ b/internal/ingress/annotations/sessionaffinity/main.go @@ -77,12 +77,13 @@ var sessionAffinityAnnotations = parser.Annotation{ Documentation: `This annotation enables and sets the affinity type in all Upstreams of an Ingress. This way, a request will always be directed to the same upstream server. The only affinity type available for NGINX is cookie`, }, annotationAffinityMode: { - Validator: parser.ValidateOptions([]string{"balanced", "persistent"}, true, true), + Validator: parser.ValidateOptions([]string{"balanced", "persistent", "persistent-drainable"}, true, true), Scope: parser.AnnotationScopeIngress, Risk: parser.AnnotationRiskMedium, Documentation: `This annotation defines the stickiness of a session. Setting this to balanced (default) will redistribute some sessions if a deployment gets scaled up, therefore rebalancing the load on the servers. - Setting this to persistent will not rebalance sessions to new servers, therefore providing maximum stickiness.`, + Setting this to persistent will not rebalance sessions to new servers, therefore providing greater stickiness. Sticky sessions will continue to be routed to the same server as long as its Endpoint's condition remains Ready. If the Endpoint stops being Ready, such when a server pod receives a deletion timestamp, sessions will be rebalanced to another server. + Setting this to persistent-drainable behaves like persistent, but sticky sessions will continue to be routed to the same server as long as its Endpoint's condition remains Serving, even after the server pod receives a deletion timestamp. This allows graceful session draining during the preStop lifecycle hook. New sessions will *not* be directed to these draining servers and will only be routed to a server whose Endpoint is Ready, except potentially when all servers are draining.`, }, annotationAffinityCanaryBehavior: { Validator: parser.ValidateOptions([]string{"sticky", "legacy"}, true, true), diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index f06aadf795..57e08ab847 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -47,7 +47,8 @@ local function get_implementation(backend) if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then - if backend["sessionAffinityConfig"]["mode"] == "persistent" then + local mode = backend["sessionAffinityConfig"]["mode"] + if mode == "persistent" or mode == "persistent-drainable" then name = "sticky_persistent" else name = "sticky_balanced" From 22f2305265e65044f5cd12f76126b0e7c024db08 Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 15:32:31 -0500 Subject: [PATCH 3/7] Equip `getEndpointsFromSlices` to select Serving Endpoints This wires up `getEndpointsFromSlices` to have different selection modes for querying Endpoint objects. This is necessary so that `persistent-drainable` can eventually select Endpoints that are `Serving` instead of only `Ready`. This added a unit test to demonstrate the selection behavior, and the wiring up in the controller remains to be done. The Endpoint type was also updated to have an `IsDraining` field when the Endpoint is not Ready. --- internal/ingress/controller/controller.go | 19 +-- internal/ingress/controller/endpointslices.go | 31 +++- .../ingress/controller/endpointslices_test.go | 141 ++++++++++++++++-- internal/ingress/controller/nginx.go | 1 + pkg/apis/ingress/types.go | 2 + 5 files changed, 170 insertions(+), 24 deletions(-) diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 0894c0dfc1..505be4d55d 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -536,7 +536,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Name == svcPort { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) break } } @@ -548,7 +548,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr //nolint:gosec // Ignore G109 error if sp.Port == int32(targetPort) { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) break } } @@ -605,7 +605,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { } else { zone = emptyZone } - endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint", svcKey) endps = []ingress.Endpoint{n.DefaultEndpoint()} @@ -940,7 +940,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in } else { zone = emptyZone } - endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices) // custom backend is valid only if contains at least one endpoint if len(endps) > 0 { name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName()) @@ -1050,7 +1050,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B if len(upstreams[defBackend].Endpoints) == 0 { _, port := upstreamServiceNameAndPort(ing.Spec.DefaultBackend.Service) - endps, err := n.serviceEndpoints(svcKey, port.String()) + endps, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints) upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...) if err != nil { klog.Warningf("Error creating upstream %q: %v", defBackend, err) @@ -1115,7 +1115,8 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B if len(upstreams[name].Endpoints) == 0 { _, port := upstreamServiceNameAndPort(path.Backend.Service) - endp, err := n.serviceEndpoints(svcKey, port.String()) + endp, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints) + if err != nil { klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err) n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoService) @@ -1184,7 +1185,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *netw } // serviceEndpoints returns the upstream servers (Endpoints) associated with a Service. -func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingress.Endpoint, error) { +func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, epSelectionMode EndpointSelectionMode) ([]ingress.Endpoint, error) { var upstreams []ingress.Endpoint svc, err := n.store.GetService(svcKey) @@ -1205,7 +1206,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres return upstreams, nil } servicePort := externalNamePorts(backendPort, svc) - endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) return upstreams, nil @@ -1221,7 +1222,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres if strconv.Itoa(int(servicePort.Port)) == backendPort || servicePort.TargetPort.String() == backendPort || servicePort.Name == backendPort { - endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) } diff --git a/internal/ingress/controller/endpointslices.go b/internal/ingress/controller/endpointslices.go index 1b8a3b571a..9728c3a90c 100644 --- a/internal/ingress/controller/endpointslices.go +++ b/internal/ingress/controller/endpointslices.go @@ -34,9 +34,16 @@ import ( "k8s.io/ingress-nginx/pkg/apis/ingress" ) +type EndpointSelectionMode int + +const ( + ReadyEndpoints EndpointSelectionMode = iota + ServingEndpoints +) + // getEndpointsFromSlices returns a list of Endpoint structs for a given service/target port combination. func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, zoneForHints string, - getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error), + epSelectionMode EndpointSelectionMode, getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error), ) []ingress.Endpoint { upsServers := []ingress.Endpoint{} @@ -153,9 +160,19 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c } for _, ep := range eps.Endpoints { - if (ep.Conditions.Ready != nil) && !(*ep.Conditions.Ready) { - continue + epIsReady := (ep.Conditions.Ready == nil) || *ep.Conditions.Ready + if epSelectionMode == ReadyEndpoints { + if !epIsReady { + continue + } + } else { + // assume epSelectionMode == ServingEndpoints. + epIsServing := (ep.Conditions.Serving == nil) || *ep.Conditions.Serving + if !epIsServing { + continue + } } + epHasZone := false if useTopologyHints { for _, epzone := range ep.Hints.ForZones { @@ -176,10 +193,12 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c if _, exists := processedUpstreamServers[hostPort]; exists { continue } + ups := ingress.Endpoint{ - Address: epAddress, - Port: fmt.Sprintf("%v", epPort), - Target: ep.TargetRef, + Address: epAddress, + Port: fmt.Sprintf("%v", epPort), + Target: ep.TargetRef, + IsDraining: !epIsReady, } upsServers = append(upsServers, ups) processedUpstreamServers[hostPort] = struct{}{} diff --git a/internal/ingress/controller/endpointslices_test.go b/internal/ingress/controller/endpointslices_test.go index ca29a4dd3b..7021f1f7b4 100644 --- a/internal/ingress/controller/endpointslices_test.go +++ b/internal/ingress/controller/endpointslices_test.go @@ -32,13 +32,14 @@ import ( //nolint:dupl // Ignore dupl errors for similar test case func TestGetEndpointsFromSlices(t *testing.T) { tests := []struct { - name string - svc *corev1.Service - port *corev1.ServicePort - proto corev1.Protocol - zone string - fn func(string) ([]*discoveryv1.EndpointSlice, error) - result []ingress.Endpoint + name string + svc *corev1.Service + port *corev1.ServicePort + proto corev1.Protocol + zone string + epSelectionMode EndpointSelectionMode + fn func(string) ([]*discoveryv1.EndpointSlice, error) + result []ingress.Endpoint }{ { "no service should return 0 endpoint", @@ -46,6 +47,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { nil, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -57,6 +59,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { nil, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -68,6 +71,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -83,6 +87,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -108,6 +113,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -133,6 +139,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -158,6 +165,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -188,6 +196,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -218,6 +227,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -243,6 +253,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, fmt.Errorf("unexpected error") }, @@ -268,6 +279,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -312,6 +324,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -321,7 +334,106 @@ func TestGetEndpointsFromSlices(t *testing.T) { { Addresses: []string{"1.1.1.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: &[]bool{false}[0], + Ready: &[]bool{false}[0], + Serving: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{}, + }, + { + "should return a draining endpoint when requesting serving endpoints and the endpoint is not ready", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromInt(80), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "default", + TargetPort: intstr.FromInt(80), + }, + corev1.ProtocolTCP, + "", + ServingEndpoints, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{false}[0], + Serving: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + IsDraining: true, + }, + }, + }, + { + "should return no endpoint when requesting serving endpoints and none are serving", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromInt(80), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "default", + TargetPort: intstr.FromInt(80), + }, + corev1.ProtocolTCP, + "", + ServingEndpoints, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{false}[0], + Serving: &[]bool{false}[0], }, }, }, @@ -356,6 +468,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -400,6 +513,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -449,6 +563,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -498,6 +613,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -573,6 +689,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -644,6 +761,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -698,6 +816,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -774,6 +893,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -854,6 +974,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -939,6 +1060,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -1016,6 +1138,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, corev1.ProtocolTCP, "eu-west-1b", + ReadyEndpoints, func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -1084,7 +1207,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { - result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.zone, testCase.fn) + result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.zone, testCase.epSelectionMode, testCase.fn) if len(testCase.result) != len(result) { t.Errorf("Expected %d Endpoints but got %d", len(testCase.result), len(result)) } diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 20fad5afb8..db69c24916 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -970,6 +970,7 @@ func configureBackends(rawBackends []*ingress.Backend) error { endpoints = append(endpoints, ingress.Endpoint{ Address: endpoint.Address, Port: endpoint.Port, + IsDraining: endpoint.IsDraining, }) } diff --git a/pkg/apis/ingress/types.go b/pkg/apis/ingress/types.go index ccdd49fe92..41c356d5d6 100644 --- a/pkg/apis/ingress/types.go +++ b/pkg/apis/ingress/types.go @@ -181,6 +181,8 @@ type Endpoint struct { Port string `json:"port"` // Target returns a reference to the object providing the endpoint Target *apiv1.ObjectReference `json:"target,omitempty"` + // True if this endpoint is not considered Ready and should not accept new persistent sessions + IsDraining bool `json:"isDraining,omitempty"` } // Server describes a website From 3fa8e15785c8d48fc70cef89f6fb96c20b520a77 Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 15:45:17 -0500 Subject: [PATCH 4/7] Create `get_endpoint_string` and refactor --- rootfs/etc/nginx/lua/test/balancer/sticky_test.lua | 2 +- rootfs/etc/nginx/lua/util.lua | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua index 70723143b6..bbc8c14252 100644 --- a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua @@ -73,7 +73,7 @@ describe("Sticky", function() end) local test_backend = get_test_backend() - local test_backend_endpoint= test_backend.endpoints[1].address .. ":" .. test_backend.endpoints[1].port + local test_backend_endpoint= util.get_endpoint_string(test_backend.endpoints[1]) local legacy_cookie_value = test_backend_endpoint local function create_current_cookie_value(backend_key) diff --git a/rootfs/etc/nginx/lua/util.lua b/rootfs/etc/nginx/lua/util.lua index 1e4cd7c017..856737ffb9 100644 --- a/rootfs/etc/nginx/lua/util.lua +++ b/rootfs/etc/nginx/lua/util.lua @@ -13,12 +13,16 @@ local re_gmatch = ngx.re.gmatch local _M = {} +function _M.get_endpoint_string(endpoint) + return endpoint.address .. ":" .. endpoint.port +end + function _M.get_nodes(endpoints) local nodes = {} local weight = 1 for _, endpoint in pairs(endpoints) do - local endpoint_string = endpoint.address .. ":" .. endpoint.port + local endpoint_string = _M.get_endpoint_string(endpoint) nodes[endpoint_string] = weight end From 29a0f9d46c811cb3acee9a13aef5aea6815db8b3 Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 16:06:33 -0500 Subject: [PATCH 5/7] Exclude draining endpoints from new sticky sessions --- rootfs/etc/nginx/lua/balancer/sticky.lua | 30 +++++++-- .../nginx/lua/test/balancer/sticky_test.lua | 62 +++++++++++++++++++ 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/rootfs/etc/nginx/lua/balancer/sticky.lua b/rootfs/etc/nginx/lua/balancer/sticky.lua index 9d0a541169..4fc5693a18 100644 --- a/rootfs/etc/nginx/lua/balancer/sticky.lua +++ b/rootfs/etc/nginx/lua/balancer/sticky.lua @@ -3,6 +3,7 @@ local ck = require("resty.cookie") local ngx_balancer = require("ngx.balancer") local split = require("util.split") local same_site = require("util.same_site") +local util = require("util") local ngx = ngx local pairs = pairs @@ -24,7 +25,8 @@ function _M.new(self) alternative_backends = nil, cookie_session_affinity = nil, traffic_shaping_policy = nil, - backend_key = nil + backend_key = nil, + draining_endpoints = nil } setmetatable(o, self) @@ -129,14 +131,20 @@ function _M.get_last_failure() return ngx_balancer.get_last_failure() end -local function get_failed_upstreams() +local function get_upstreams_to_exclude(self) local indexed_upstream_addrs = {} - local upstream_addrs = split.split_upstream_var(ngx.var.upstream_addr) or {} + local failed_upstream_addrs = split.split_upstream_var(ngx.var.upstream_addr) or {} - for _, addr in ipairs(upstream_addrs) do + for _, addr in ipairs(failed_upstream_addrs) do indexed_upstream_addrs[addr] = true end + if self.draining_endpoints then + for addr, _ in pairs(self.draining_endpoints) do + indexed_upstream_addrs[addr] = true + end + end + return indexed_upstream_addrs end @@ -188,7 +196,7 @@ function _M.balance(self) local new_upstream - new_upstream, key = self:pick_new_upstream(get_failed_upstreams()) + new_upstream, key = self:pick_new_upstream(get_upstreams_to_exclude(self)) if not new_upstream then ngx.log(ngx.WARN, string.format("failed to get new upstream; using upstream %s", new_upstream)) elseif should_set_cookie(self) then @@ -202,6 +210,18 @@ function _M.sync(self, backend) -- reload balancer nodes balancer_resty.sync(self, backend) + self.draining_endpoints = nil + for _, endpoint in pairs(backend.endpoints) do + if endpoint.isDraining then + if not self.draining_endpoints then + self.draining_endpoints = {} + end + + local endpoint_string = util.get_endpoint_string(endpoint) + self.draining_endpoints[endpoint_string] = true + end + end + self.traffic_shaping_policy = backend.trafficShapingPolicy self.alternative_backends = backend.alternativeBackends self.cookie_session_affinity = backend.sessionAffinityConfig.cookieSessionAffinity diff --git a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua index bbc8c14252..228bb6896b 100644 --- a/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/sticky_test.lua @@ -304,6 +304,68 @@ describe("Sticky", function() it("returns the correct endpoint for the client", function() test_correct_endpoint(sticky_balanced) end) it("returns the correct endpoint for the client", function() test_correct_endpoint(sticky_persistent) end) end) + + -- Note below that endpoints are only marked this way when persistent + describe("when an endpoint is draining", function() + it("persists client with cookie to endpoint", function() + local s = {} + local cookie_jar = {} + cookie.new = function(self) + local cookie_instance = { + set = function(self, ck) + cookie_jar[ck.key] = ck.value + return true, nil + end, + get = function(self, k) return cookie_jar[k] end, + } + s = spy.on(cookie_instance, "set") + return cookie_instance, false + end + + local b = get_test_backend() + b.sessionAffinityConfig.cookieSessionAffinity.locations = {} + b.sessionAffinityConfig.cookieSessionAffinity.locations["test.com"] = {"/"} + + local expectedPeer = b.endpoints[1] + + local sticky_balancer_instance = sticky_persistent:new(b) + local peer = sticky_balancer_instance:balance() + assert.equal(peer, test_backend_endpoint) + + expectedPeer.isDraining = true + sticky_balancer_instance:sync(b) + sticky_balancer_instance.TESTING = true + peer = sticky_balancer_instance:balance() + assert.equal(peer, test_backend_endpoint) + end) + + it("does not route client without cookie to endpoint", function() + local s = {} + local cookie_jar = {} + cookie.new = function(self) + local cookie_instance = { + set = function(self, ck) + cookie_jar[ck.key] = ck.value + return true, nil + end, + get = function(self, k) return cookie_jar[k] end, + } + s = spy.on(cookie_instance, "set") + return cookie_instance, false + end + + local b = get_test_backend() + b.sessionAffinityConfig.cookieSessionAffinity.locations = {} + b.sessionAffinityConfig.cookieSessionAffinity.locations["test.com"] = {"/"} + + local expectedPeer = b.endpoints[1] + expectedPeer.isDraining = true + + local sticky_balancer_instance = sticky_persistent:new(b) + local peer = sticky_balancer_instance:balance() + assert.equal(peer, nil) + end) + end) end) local function get_several_test_backends(change_on_failure) From 5442fa369897edb22937b648ba0de6a1572faa3e Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 16:10:16 -0500 Subject: [PATCH 6/7] Include draining endpoints when `persistent-drainable` I believe that this is the only place that needs to check for `Serving` Endpoint objects to implement this feature. There are 6 total invocations of `getEndpointsFromSlices` in the repo, all in `controller.go`. 2 of them are relating to `L4Service`. I'm assuming L4 is referring to the OSI model and these seem to relate to TCP and UDP, which have no knowledge of HTTP cookies, which `persistent-drainable` relies on, so these can't be required. (They appear to be for controller level config instead of ingress rules). A third is in `getDefaultUpstream`. My understanding is this is for a fallback service should all of the primary services in the ingress rules be down. This seems like a fallback service handler and not useful for `persistent-drainable`. A fourth one is used in `getBackendServers` when building the `custom-default-backend-` upstream when no Endpoints are available for the service. Again, this doesn't seem intended for the primary application that needs session affinity, so exluding the draining Endpoints in the default backend seems desirable. The last two are used in `serviceEndpoints`, which itself is called in two places, both in `createUpstreams`. One of them is for the default backend service, which discussed above, should not be included. The other one is for our main ingress rules and **is** the one that we want to update. --- internal/ingress/controller/controller.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 505be4d55d..227ed90261 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -1114,9 +1114,13 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B } if len(upstreams[name].Endpoints) == 0 { - _, port := upstreamServiceNameAndPort(path.Backend.Service) - endp, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints) + epSelectionMode := ReadyEndpoints + if anns.SessionAffinity.Mode == "persistent-drainable" { + epSelectionMode = ServingEndpoints + } + _, port := upstreamServiceNameAndPort(path.Backend.Service) + endp, err := n.serviceEndpoints(svcKey, port.String(), epSelectionMode) if err != nil { klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err) n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoService) From d42a4ec5cf8e972f94fa7174a27c106917311af5 Mon Sep 17 00:00:00 2001 From: Nicholas Gulachek Date: Sun, 1 Jun 2025 17:10:40 -0500 Subject: [PATCH 7/7] Warn when all Endpoints are draining --- internal/ingress/controller/controller.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 227ed90261..06b670e4b4 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -1132,6 +1132,10 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoEndpoint) } else { n.metricCollector.DecOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoEndpoint) + + if allEndpointsAreDraining(endp) { + klog.Warningf("All Endpoints for Service %q are draining.", svcKey) + } } upstreams[name].Endpoints = endp } @@ -1908,3 +1912,12 @@ func newTrafficShapingPolicy(cfg *canary.Config) ingress.TrafficShapingPolicy { Cookie: cfg.Cookie, } } + +func allEndpointsAreDraining(eps []ingress.Endpoint) bool { + for _, ep := range eps { + if !ep.IsDraining { + return false + } + } + return true +}