@@ -22,6 +22,7 @@ import (
22
22
"go.uber.org/zap"
23
23
v1 "k8s.io/api/core/v1"
24
24
discovery "k8s.io/api/discovery/v1"
25
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
25
26
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
27
"k8s.io/apimachinery/pkg/labels"
27
28
"k8s.io/apimachinery/pkg/util/sets"
@@ -374,31 +375,23 @@ func getNodeFilter(svc *v1.Service) (labels.Selector, error) {
374
375
return labels .Parse (labelSelector )
375
376
}
376
377
377
- // filterNodes - Separates provisioned nodes based on the label selector,
378
- // that should be backends in the load balancer, and virtual nodes .
379
- func filterNodes (logger * zap. SugaredLogger , svc * v1.Service , nodes []* v1.Node ) ([] * v1. Node , []* v1.Node , error ) {
378
+ // filterNodes based on the label selector, if present, and returns the set of nodes
379
+ // that should be backends in the load balancer.
380
+ func filterNodes (svc * v1.Service , nodes []* v1.Node ) ([]* v1.Node , error ) {
380
381
381
382
selector , err := getNodeFilter (svc )
382
383
if err != nil {
383
- return nil , nil , err
384
+ return nil , err
384
385
}
385
386
386
- var provisionedNodes , virtualNodes []* v1.Node
387
+ var filteredNodes []* v1.Node
387
388
for _ , n := range nodes {
388
- // Since virtual nodes should not be added as backends,
389
- // labels are not checked
390
- isVirtualNode , err := IsVirtualNode (n )
391
- if err != nil {
392
- logger .With (zap .Error (err )).Errorf ("Failed to check if node is virtual: %s" , n .Name )
393
- }
394
- if isVirtualNode {
395
- virtualNodes = append (virtualNodes , n )
396
- } else if selector .Matches (labels .Set (n .GetLabels ())) {
397
- provisionedNodes = append (provisionedNodes , n )
389
+ if selector .Matches (labels .Set (n .GetLabels ())) {
390
+ filteredNodes = append (filteredNodes , n )
398
391
}
399
392
}
400
393
401
- return provisionedNodes , virtualNodes , nil
394
+ return filteredNodes , nil
402
395
}
403
396
404
397
// EnsureLoadBalancer creates a new load balancer or updates the existing one.
@@ -409,22 +402,17 @@ func (cp *CloudProvider) EnsureLoadBalancer(ctx context.Context, clusterName str
409
402
loadBalancerType := getLoadBalancerType (service )
410
403
logger := cp .logger .With ("loadBalancerName" , lbName , "serviceName" , service .Name , "loadBalancerType" , loadBalancerType )
411
404
412
- // Ideally virtual nodes will be excluded since they will have the LabelNodeExcludeBalancers set on them,
413
- // However, the user could delete the label, so we need to filter out virtual nodes here
414
- provisionedNodes , virtualNodes , err := filterNodes (logger , service , nodes )
405
+ nodes , err := filterNodes (service , nodes )
415
406
if err != nil {
416
407
logger .With (zap .Error (err )).Error ("Failed to filter provisioned nodes with label selector and virtual nodes" )
417
408
return nil , err
418
409
}
419
410
420
- virtualNodeExists := len (virtualNodes ) > 0
421
- if ! virtualNodeExists {
422
- // Check if virtual nodes exist in the cluster
423
- virtualNodeExists , err = cp .virtualNodeExists (logger )
424
- if err != nil {
425
- logger .With (zap .Error (err )).Error ("Failed to check if cluster has virtual nodes" )
426
- return nil , errors .Wrap (err , "failed to check if cluster has virtual nodes" )
427
- }
411
+ // Check if virtual nodes exist in the cluster
412
+ virtualNodeExists , err := VirtualNodeExists (cp .NodeLister )
413
+ if err != nil {
414
+ logger .With (zap .Error (err )).Error ("Failed to check if cluster has virtual nodes" )
415
+ return nil , errors .Wrap (err , "failed to check if cluster has virtual nodes" )
428
416
}
429
417
430
418
var virtualPods []* v1.Pod
@@ -437,7 +425,7 @@ func (cp *CloudProvider) EnsureLoadBalancer(ctx context.Context, clusterName str
437
425
}
438
426
}
439
427
440
- logger .With ("provisioned nodes" , len (provisionedNodes ), "virtual pods" , len (virtualPods )).Info ("Ensuring load balancer" )
428
+ logger .With ("provisioned nodes" , len (nodes ), "virtual pods" , len (virtualPods )).Info ("Ensuring load balancer" )
441
429
442
430
dimensionsMap := make (map [string ]string )
443
431
@@ -493,7 +481,7 @@ func (cp *CloudProvider) EnsureLoadBalancer(ctx context.Context, clusterName str
493
481
return nil , err
494
482
}
495
483
496
- spec , err := NewLBSpec (logger , service , provisionedNodes , virtualPods , subnets , sslConfig , cp .securityListManagerFactory , cp .config .Tags )
484
+ spec , err := NewLBSpec (logger , service , nodes , virtualPods , subnets , sslConfig , cp .securityListManagerFactory , cp .config .Tags )
497
485
if err != nil {
498
486
logger .With (zap .Error (err )).Error ("Failed to derive LBSpec" )
499
487
errorType = util .GetError (err )
@@ -549,7 +537,8 @@ func (cp *CloudProvider) EnsureLoadBalancer(ctx context.Context, clusterName str
549
537
}
550
538
}
551
539
552
- if len (provisionedNodes ) == 0 && ! virtualNodeExists {
540
+ // TODO: Revisit this condition when clusters with mixed node pools are introduced, possibly add len(virtualPods) == 0 check
541
+ if len (nodes ) == 0 && ! virtualNodeExists {
553
542
allNodesNotReady , err := cp .checkAllBackendNodesNotReady ()
554
543
if err != nil {
555
544
logger .With (zap .Error (err )).Error ("Failed to check if all backend nodes are not ready" )
@@ -884,7 +873,7 @@ func (cp *CloudProvider) UpdateLoadBalancer(ctx context.Context, clusterName str
884
873
}
885
874
886
875
// getNodesAndPodsByIPs returns slices of Nodes and Pods corresponding to the given IP addresses.
887
- func (cp * CloudProvider ) getNodesAndPodsByIPs (ctx context.Context , logger * zap. SugaredLogger , backendIPs []string , service * v1.Service ) ([]* v1.Node , []* v1.Pod , error ) {
876
+ func (cp * CloudProvider ) getNodesAndPodsByIPs (ctx context.Context , backendIPs []string , service * v1.Service ) ([]* v1.Node , []* v1.Pod , error ) {
888
877
ipToPodLookup := make (map [string ]* v1.Pod )
889
878
ipToNodeLookup := make (map [string ]* v1.Node )
890
879
@@ -895,11 +884,7 @@ func (cp *CloudProvider) getNodesAndPodsByIPs(ctx context.Context, logger *zap.S
895
884
896
885
var virtualNodeExists bool
897
886
for _ , node := range nodeList {
898
- isVirtualNode , err := IsVirtualNode (node )
899
- if err != nil {
900
- logger .With (zap .Error (err )).Errorf ("Failed to check if node is virtual: %s" , node .Name )
901
- }
902
- if isVirtualNode {
887
+ if IsVirtualNode (node ) {
903
888
virtualNodeExists = true
904
889
continue
905
890
}
@@ -925,7 +910,7 @@ func (cp *CloudProvider) getNodesAndPodsByIPs(ctx context.Context, logger *zap.S
925
910
if node , nodeExists := ipToNodeLookup [ip ]; nodeExists {
926
911
nodes = append (nodes , node )
927
912
} else if pod , podExists := ipToPodLookup [ip ]; virtualNodeExists && podExists {
928
- pods = append (pods , pod )
913
+ pods = append (pods , pod )
929
914
} else {
930
915
return nil , nil , errors .Errorf ("provisioned node or virtual pod was not found by IP %q" , ip )
931
916
}
@@ -1021,7 +1006,7 @@ func (cp *CloudProvider) cleanupSecListForLoadBalancerDelete(lb *client.GenericL
1021
1006
ipSet .Insert (* backend .IpAddress )
1022
1007
}
1023
1008
}
1024
- nodes , _ , err := cp .getNodesAndPodsByIPs (ctx , logger , ipSet .List (), service )
1009
+ nodes , _ , err := cp .getNodesAndPodsByIPs (ctx , ipSet .List (), service )
1025
1010
if err != nil {
1026
1011
logger .With (zap .Error (err )).Error ("Failed to fetch nodes by internal ips" )
1027
1012
return errors .Wrap (err , "fetching nodes by internal ips" )
@@ -1175,48 +1160,45 @@ func (cp *CloudProvider) getVirtualPodsOfService(ctx context.Context, logger *za
1175
1160
if err != nil {
1176
1161
return nil , err
1177
1162
}
1163
+
1164
+ endpointSet := make (map [string ]struct {})
1178
1165
var virtualPods []* v1.Pod
1179
1166
for _ , es := range endpointSlices {
1180
1167
for _ , e := range es .Endpoints {
1181
1168
if e .TargetRef .Kind == "Pod" {
1169
+ if _ , exists := endpointSet [e .Addresses [0 ]]; exists {
1170
+ continue
1171
+ }
1172
+
1182
1173
pod , err := cp .kubeclient .CoreV1 ().Pods (es .Namespace ).Get (ctx , e .TargetRef .Name , metav1.GetOptions {})
1183
1174
if err != nil {
1175
+ if apierrors .IsNotFound (err ) {
1176
+ logger .With (zap .Error (err )).Errorf ("Pod object does not exist: %s" , e .TargetRef .Name )
1177
+ continue
1178
+ }
1184
1179
return nil , err
1185
1180
}
1181
+
1186
1182
node , err := cp .NodeLister .Get (pod .Spec .NodeName )
1187
1183
if err != nil {
1184
+ if apierrors .IsNotFound (err ) {
1185
+ logger .With (zap .Error (err )).Errorf ("Node object does not exist: %s" , pod .Spec .NodeName )
1186
+ continue
1187
+ }
1188
1188
return nil , err
1189
1189
}
1190
- isVirtualNode , err := IsVirtualNode (node )
1191
- if err != nil {
1192
- logger .With (zap .Error (err )).Errorf ("Failed to check if node is virtual: %s" , node .Name )
1193
- }
1194
- if isVirtualNode {
1190
+
1191
+ if IsVirtualNode (node ) {
1195
1192
virtualPods = append (virtualPods , pod )
1196
1193
}
1194
+
1195
+ endpointSet [e .Addresses [0 ]] = struct {}{}
1197
1196
}
1198
1197
}
1199
1198
}
1200
1199
return virtualPods , nil
1201
1200
}
1202
1201
1203
- func (cp * CloudProvider ) virtualNodeExists (logger * zap.SugaredLogger ) (bool , error ) {
1204
- nodeList , err := cp .NodeLister .List (labels .Everything ())
1205
- if err != nil {
1206
- return false , err
1207
- }
1208
- for _ , node := range nodeList {
1209
- isVirtualNode , err := IsVirtualNode (node )
1210
- if err != nil {
1211
- logger .With (zap .Error (err )).Errorf ("Failed to check if node is virtual: %s" , node .Name )
1212
- }
1213
- if isVirtualNode {
1214
- return true , nil
1215
- }
1216
- }
1217
- return false , nil
1218
- }
1219
-
1220
1202
func (cp * CloudProvider ) getEndpointSlicesForService (service * v1.Service ) ([]* discovery.EndpointSlice , error ) {
1221
1203
esLabelSelector := labels .Set (map [string ]string {
1222
1204
discovery .LabelServiceName : service .Name ,
0 commit comments