Skip to content

Commit c86ae03

Browse files
authored
allow multiple endpoint plugins to apply on a single backend (#11144)
1 parent 4a800c5 commit c86ae03

File tree

6 files changed

+73
-48
lines changed

6 files changed

+73
-48
lines changed

internal/kgateway/endpoints/prioritize.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,26 @@ import (
1414
"google.golang.org/protobuf/types/known/wrapperspb"
1515
)
1616

17-
func PrioritizeEndpoints(logger *zap.Logger, priorityInfo *PriorityInfo, ep ir.EndpointsForBackend, ucc ir.UniqlyConnectedClient) *envoy_config_endpoint_v3.ClusterLoadAssignment {
17+
// EndpointsInputs is the collective IR that can be modified
18+
// by endpoint plugins to influence the ClusterLoadAssignment.
19+
type EndpointsInputs struct {
20+
EndpointsForBackend ir.EndpointsForBackend
21+
PriorityInfo *PriorityInfo
22+
}
23+
24+
// PrioritizeEndpoints converts EndpointsInputs into a ClusterLoadAssignment.
25+
func PrioritizeEndpoints(
26+
logger *zap.Logger,
27+
ucc ir.UniqlyConnectedClient,
28+
inputs EndpointsInputs,
29+
) *envoy_config_endpoint_v3.ClusterLoadAssignment {
1830
lbInfo := LoadBalancingInfo{
1931
PodLabels: ucc.Labels,
2032
PodLocality: ucc.Locality,
21-
PriorityInfo: priorityInfo,
33+
PriorityInfo: inputs.PriorityInfo,
2234
}
2335

24-
return prioritizeWithLbInfo(logger, ep, lbInfo)
36+
return prioritizeWithLbInfo(logger, inputs.EndpointsForBackend, lbInfo)
2537
}
2638

2739
type LoadBalancingInfo struct {

internal/kgateway/extensions2/plugin/plugin.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"encoding/json"
66

77
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
8-
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
98
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
109

10+
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/endpoints"
1111
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir"
1212

1313
"istio.io/istio/pkg/kube/krt"
@@ -35,8 +35,8 @@ type (
3535
kctx krt.HandlerContext,
3636
ctx context.Context,
3737
ucc ir.UniqlyConnectedClient,
38-
in ir.EndpointsForBackend,
39-
) (*envoy_config_endpoint_v3.ClusterLoadAssignment, uint64)
38+
out *endpoints.EndpointsInputs,
39+
) uint64
4040
)
4141

4242
// TODO: consider changing PerClientProcessBackend to look like this:

internal/kgateway/extensions2/plugins/destrule/destrule_plugin.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import (
99
"k8s.io/apimachinery/pkg/runtime/schema"
1010

1111
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
12-
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
1312
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
14-
"github.com/solo-io/go-utils/contextutils"
1513
"istio.io/api/networking/v1alpha3"
1614
"istio.io/istio/pkg/config/schema/gvr"
1715
"istio.io/istio/pkg/kube/krt"
@@ -55,25 +53,30 @@ type destrulePlugin struct {
5553
destinationRulesIndex DestinationRuleIndex
5654
}
5755

58-
func (d *destrulePlugin) processEndpoints(kctx krt.HandlerContext, ctx context.Context, ucc ir.UniqlyConnectedClient, in ir.EndpointsForBackend) (*envoy_config_endpoint_v3.ClusterLoadAssignment, uint64) {
59-
destrule := d.destinationRulesIndex.FetchDestRulesFor(kctx, ucc.Namespace, in.Hostname, ucc.Labels)
56+
// processEndpoints tries to find a destination rule
57+
// for the backend and if it does, it updates the PriorityInfo on `out`.
58+
func (d *destrulePlugin) processEndpoints(
59+
kctx krt.HandlerContext,
60+
ctx context.Context,
61+
ucc ir.UniqlyConnectedClient,
62+
out *endpoints.EndpointsInputs,
63+
) uint64 {
64+
destrule := d.destinationRulesIndex.FetchDestRulesFor(kctx, ucc.Namespace, out.EndpointsForBackend.Hostname, ucc.Labels)
6065
if destrule == nil {
61-
return nil, 0
66+
return 0
6267
}
6368

64-
logger := contextutils.LoggerFrom(ctx).Desugar()
65-
trafficPolicy := getTrafficPolicy(destrule, in.Port)
69+
trafficPolicy := getTrafficPolicy(destrule, out.EndpointsForBackend.Port)
6670
localityLb := getLocalityLbSetting(trafficPolicy)
67-
var priorityInfo *endpoints.PriorityInfo
68-
var additionalHash uint64
69-
if localityLb != nil {
70-
priorityInfo = getPriorityInfoFromDestrule(localityLb)
71-
hasher := fnv.New64()
72-
hasher.Write([]byte(destrule.UID))
73-
hasher.Write([]byte(fmt.Sprintf("%v", destrule.Generation)))
74-
additionalHash = hasher.Sum64()
71+
if localityLb == nil {
72+
return 0
7573
}
76-
return endpoints.PrioritizeEndpoints(logger, priorityInfo, in, ucc), additionalHash
74+
75+
out.PriorityInfo = getPriorityInfoFromDestrule(localityLb)
76+
hasher := fnv.New64()
77+
hasher.Write([]byte(destrule.UID))
78+
hasher.Write([]byte(fmt.Sprintf("%v", destrule.Generation)))
79+
return hasher.Sum64()
7780
}
7881

7982
func (d *destrulePlugin) processBackend(kctx krt.HandlerContext, ctx context.Context, ucc ir.UniqlyConnectedClient, in ir.BackendObjectIR, outCluster *envoy_config_cluster_v3.Cluster) {

internal/kgateway/proxy_syncer/cla_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ func TestTranslatesDestrulesFailoverPriority(t *testing.T) {
6161
}),
6262
}
6363

64-
cla := endpoints.PrioritizeEndpoints(nil, priorityInfo, *efu, ucc)
64+
epInputs := endpoints.EndpointsInputs{
65+
EndpointsForBackend: *efu,
66+
PriorityInfo: priorityInfo,
67+
}
68+
cla := endpoints.PrioritizeEndpoints(nil, ucc, epInputs)
6569
g.Expect(cla.Endpoints).To(gomega.HaveLen(2))
6670

6771
remoteLocality := cla.Endpoints[0]
@@ -123,7 +127,11 @@ func TestTranslatesDestrulesFailover(t *testing.T) {
123127

124128
priorityInfo := &endpoints.PriorityInfo{}
125129

126-
cla := endpoints.PrioritizeEndpoints(nil, priorityInfo, *efu, ucc)
130+
epInputs := endpoints.EndpointsInputs{
131+
EndpointsForBackend: *efu,
132+
PriorityInfo: priorityInfo,
133+
}
134+
cla := endpoints.PrioritizeEndpoints(nil, ucc, epInputs)
127135
g.Expect(cla.Endpoints).To(gomega.HaveLen(2))
128136

129137
remoteLocality := cla.Endpoints[0]

internal/kgateway/translator/irtranslator/backend.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,16 @@ func (t *BackendTranslator) runPolicies(
7979
inlineEps *ir.EndpointsForBackend,
8080
out *clusterv3.Cluster,
8181
) {
82-
for gk, policyPlugin := range t.ContributedPolicies {
83-
if inlineEps != nil && policyPlugin.PerClientProcessEndpoints != nil {
84-
if cla, _ := policyPlugin.PerClientProcessEndpoints(kctx, ctx, ucc, *inlineEps); cla != nil {
85-
out.LoadAssignment = cla
86-
}
82+
// if the backend was initialized with inlineEps then we
83+
// need an EndpointsInputs to run plugins against
84+
var endpointInputs *endpoints.EndpointsInputs
85+
if inlineEps != nil {
86+
endpointInputs = &endpoints.EndpointsInputs{
87+
EndpointsForBackend: *inlineEps,
8788
}
89+
}
90+
91+
for gk, policyPlugin := range t.ContributedPolicies {
8892
// TODO: in theory it would be nice to do `ProcessBackend` once, and only do
8993
// the the per-client processing for each client.
9094
// that would require refactoring and thinking about the proper IR, so we'll punt on that for
@@ -94,6 +98,11 @@ func (t *BackendTranslator) runPolicies(
9498
policyPlugin.PerClientProcessBackend(kctx, ctx, ucc, backend, out)
9599
}
96100

101+
// run endpoint plugins if we have endpoints to process
102+
if endpointInputs != nil && policyPlugin.PerClientProcessEndpoints != nil {
103+
policyPlugin.PerClientProcessEndpoints(kctx, ctx, ucc, endpointInputs)
104+
}
105+
97106
if policyPlugin.ProcessBackend == nil {
98107
continue
99108
}
@@ -102,13 +111,13 @@ func (t *BackendTranslator) runPolicies(
102111
}
103112
}
104113

105-
// if no plugin initialized the inline CLA, and the cluster type needs one, do it now
106-
if out.GetLoadAssignment() == nil && inlineEps != nil && clusterSupportsInlineCLA(out) {
114+
// for clusters that want a CLA _and_ initialized with inlineEps, build the CLA.
115+
// never overwrite the CLA that was already initialized (potentially within a plugin).
116+
if out.GetLoadAssignment() == nil && endpointInputs != nil && clusterSupportsInlineCLA(out) {
107117
out.LoadAssignment = endpoints.PrioritizeEndpoints(
108-
contextutils.LoggerFrom(ctx).Desugar(), // TODO BackendTranslator's logger
109-
nil,
110-
*inlineEps,
118+
contextutils.LoggerFrom(ctx).Desugar(),
111119
ucc,
120+
*endpointInputs,
112121
)
113122
}
114123
}

internal/kgateway/translator/translator.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,20 +141,13 @@ func (s *CombinedTranslator) TranslateGateway(kctx krt.HandlerContext, ctx conte
141141
}
142142

143143
func (s *CombinedTranslator) TranslateEndpoints(kctx krt.HandlerContext, ucc ir.UniqlyConnectedClient, ep ir.EndpointsForBackend) (*envoy_config_endpoint_v3.ClusterLoadAssignment, uint64) {
144-
// check if we have a plugin to do it
145-
cla, additionalHash := proccessWithPlugins(s.endpointPlugins, kctx, context.TODO(), ucc, ep)
146-
if cla != nil {
147-
return cla, additionalHash
144+
epInputs := endpoints.EndpointsInputs{
145+
EndpointsForBackend: ep,
148146
}
149-
return endpoints.PrioritizeEndpoints(s.logger, nil, ep, ucc), 0
150-
}
151-
152-
func proccessWithPlugins(plugins []extensionsplug.EndpointPlugin, kctx krt.HandlerContext, ctx context.Context, ucc ir.UniqlyConnectedClient, in ir.EndpointsForBackend) (*envoy_config_endpoint_v3.ClusterLoadAssignment, uint64) {
153-
for _, processEnddpoints := range plugins {
154-
cla, additionalHash := processEnddpoints(kctx, context.TODO(), ucc, in)
155-
if cla != nil {
156-
return cla, additionalHash
157-
}
147+
var hash uint64
148+
for _, processEndpoints := range s.endpointPlugins {
149+
additionalHash := processEndpoints(kctx, context.TODO(), ucc, &epInputs)
150+
hash ^= additionalHash
158151
}
159-
return nil, 0
152+
return endpoints.PrioritizeEndpoints(s.logger, ucc, epInputs), hash
160153
}

0 commit comments

Comments
 (0)