Skip to content

Commit 22f2305

Browse files
committed
Equip getEndpointsFromSlices to select Serving Endpoints
This wires up `getEndpointsFromSlices` to have different selection modes for querying Endpoint objects. This is necessary so that `persistent-drainable` can eventually select Endpoints that are `Serving` instead of only `Ready`. This added a unit test to demonstrate the selection behavior, and the wiring up in the controller remains to be done. The Endpoint type was also updated to have an `IsDraining` field when the Endpoint is not Ready.
1 parent 19d1043 commit 22f2305

File tree

5 files changed

+170
-24
lines changed

5 files changed

+170
-24
lines changed

internal/ingress/controller/controller.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
536536
sp := svc.Spec.Ports[i]
537537
if sp.Name == svcPort {
538538
if sp.Protocol == proto {
539-
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
539+
endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
540540
break
541541
}
542542
}
@@ -548,7 +548,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
548548
//nolint:gosec // Ignore G109 error
549549
if sp.Port == int32(targetPort) {
550550
if sp.Protocol == proto {
551-
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
551+
endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
552552
break
553553
}
554554
}
@@ -605,7 +605,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
605605
} else {
606606
zone = emptyZone
607607
}
608-
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
608+
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
609609
if len(endps) == 0 {
610610
klog.Warningf("Service %q does not have any active Endpoint", svcKey)
611611
endps = []ingress.Endpoint{n.DefaultEndpoint()}
@@ -940,7 +940,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in
940940
} else {
941941
zone = emptyZone
942942
}
943-
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
943+
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
944944
// custom backend is valid only if contains at least one endpoint
945945
if len(endps) > 0 {
946946
name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName())
@@ -1050,7 +1050,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
10501050

10511051
if len(upstreams[defBackend].Endpoints) == 0 {
10521052
_, port := upstreamServiceNameAndPort(ing.Spec.DefaultBackend.Service)
1053-
endps, err := n.serviceEndpoints(svcKey, port.String())
1053+
endps, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints)
10541054
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
10551055
if err != nil {
10561056
klog.Warningf("Error creating upstream %q: %v", defBackend, err)
@@ -1115,7 +1115,8 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
11151115

11161116
if len(upstreams[name].Endpoints) == 0 {
11171117
_, port := upstreamServiceNameAndPort(path.Backend.Service)
1118-
endp, err := n.serviceEndpoints(svcKey, port.String())
1118+
endp, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints)
1119+
11191120
if err != nil {
11201121
klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
11211122
n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoService)
@@ -1184,7 +1185,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *netw
11841185
}
11851186

11861187
// serviceEndpoints returns the upstream servers (Endpoints) associated with a Service.
1187-
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingress.Endpoint, error) {
1188+
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, epSelectionMode EndpointSelectionMode) ([]ingress.Endpoint, error) {
11881189
var upstreams []ingress.Endpoint
11891190

11901191
svc, err := n.store.GetService(svcKey)
@@ -1205,7 +1206,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
12051206
return upstreams, nil
12061207
}
12071208
servicePort := externalNamePorts(backendPort, svc)
1208-
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
1209+
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices)
12091210
if len(endps) == 0 {
12101211
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
12111212
return upstreams, nil
@@ -1221,7 +1222,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
12211222
if strconv.Itoa(int(servicePort.Port)) == backendPort ||
12221223
servicePort.TargetPort.String() == backendPort ||
12231224
servicePort.Name == backendPort {
1224-
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
1225+
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices)
12251226
if len(endps) == 0 {
12261227
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
12271228
}

internal/ingress/controller/endpointslices.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,16 @@ import (
3434
"k8s.io/ingress-nginx/pkg/apis/ingress"
3535
)
3636

37+
type EndpointSelectionMode int
38+
39+
const (
40+
ReadyEndpoints EndpointSelectionMode = iota
41+
ServingEndpoints
42+
)
43+
3744
// getEndpointsFromSlices returns a list of Endpoint structs for a given service/target port combination.
3845
func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, zoneForHints string,
39-
getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error),
46+
epSelectionMode EndpointSelectionMode, getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error),
4047
) []ingress.Endpoint {
4148
upsServers := []ingress.Endpoint{}
4249

@@ -153,9 +160,19 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
153160
}
154161

155162
for _, ep := range eps.Endpoints {
156-
if (ep.Conditions.Ready != nil) && !(*ep.Conditions.Ready) {
157-
continue
163+
epIsReady := (ep.Conditions.Ready == nil) || *ep.Conditions.Ready
164+
if epSelectionMode == ReadyEndpoints {
165+
if !epIsReady {
166+
continue
167+
}
168+
} else {
169+
// assume epSelectionMode == ServingEndpoints.
170+
epIsServing := (ep.Conditions.Serving == nil) || *ep.Conditions.Serving
171+
if !epIsServing {
172+
continue
173+
}
158174
}
175+
159176
epHasZone := false
160177
if useTopologyHints {
161178
for _, epzone := range ep.Hints.ForZones {
@@ -176,10 +193,12 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
176193
if _, exists := processedUpstreamServers[hostPort]; exists {
177194
continue
178195
}
196+
179197
ups := ingress.Endpoint{
180-
Address: epAddress,
181-
Port: fmt.Sprintf("%v", epPort),
182-
Target: ep.TargetRef,
198+
Address: epAddress,
199+
Port: fmt.Sprintf("%v", epPort),
200+
Target: ep.TargetRef,
201+
IsDraining: !epIsReady,
183202
}
184203
upsServers = append(upsServers, ups)
185204
processedUpstreamServers[hostPort] = struct{}{}

0 commit comments

Comments
 (0)