Skip to content

Commit a074cee

Browse files
committed
DFP proposal
1 parent eca3b94 commit a074cee

File tree

6 files changed

+129
-24
lines changed

6 files changed

+129
-24
lines changed

api/v1alpha1/upstream_policy_types.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,16 @@ type UpstreamList struct {
2929
Items []Upstream `json:"items"`
3030
}
3131

32-
// +kubebuilder:validation:XValidation:message="There must one and only one upstream type set",rule="1 == (self.aws != null?1:0) + (self.static != null?1:0)"
32+
// +kubebuilder:validation:XValidation:message="There must one and only one upstream type set",rule="1 == (self.aws != null?1:0) + (self.static != null?1:0) + (self.dynamicForwardProxy != null?1:0)"
3333
type UpstreamSpec struct {
34-
Aws *AwsUpstream `json:"aws,omitempty"`
35-
Static *StaticUpstream `json:"static,omitempty"`
34+
Aws *AwsUpstream `json:"aws,omitempty"`
35+
Static *StaticUpstream `json:"static,omitempty"`
36+
DynamicForwardProxy *DynamicForwardProxy `json:"dynamicForwardProxy,omitempty"`
3637
}
38+
39+
type DynamicForwardProxy struct {
40+
}
41+
3742
type AwsUpstream struct {
3843
Region string `json:"region,omitempty"`
3944
SecretRef corev1.LocalObjectReference `json:"secretRef,omitempty"`

internal/gateway2/extensions2/plugins/upstream/aws.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ func (p *plugin2) processBackendAws(
110110
) error {
111111

112112
functionName := dest.FunctionName
113-
if p.needFilter == nil {
114-
p.needFilter = make(map[string]bool)
113+
if p.needAwsFilter == nil {
114+
p.needAwsFilter = make(map[string]bool)
115115
}
116-
p.needFilter[pCtx.FilterChainName] = true
116+
p.needAwsFilter[pCtx.FilterChainName] = true
117117

118118
lambdaRouteFunc := &awspb.AWSLambdaPerRoute{
119119
Async: false,
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package upstream
2+
3+
import (
4+
"context"
5+
6+
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
7+
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
8+
envoyclusters "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/dynamic_forward_proxy/v3"
9+
envoydfp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/dynamic_forward_proxy/v3"
10+
11+
"google.golang.org/protobuf/proto"
12+
"google.golang.org/protobuf/types/known/anypb"
13+
14+
"github.com/kgateway-dev/kgateway/api/v1alpha1"
15+
"github.com/kgateway-dev/kgateway/internal/gateway2/ir"
16+
)
17+
18+
func (p *plugin2) processDFPRoute(ctx context.Context, pCtx *ir.RouteContext, outputRoute *envoy_config_route_v3.Route) {
19+
20+
for _, b := range pCtx.In.Backends {
21+
if u := b.Backend.Upstream; u != nil {
22+
if ir, ok := u.ObjIr.(*UpstreamIr); ok {
23+
if ir.dfpFilterConfig != nil {
24+
filters := p.neededDfpFilter[pCtx.FilterChainName]
25+
if _, ok := filters[u.ClusterName()]; !ok {
26+
filters[u.ClusterName()] = ir.dfpFilterConfig
27+
p.neededDfpFilter[pCtx.FilterChainName] = filters
28+
}
29+
}
30+
}
31+
}
32+
}
33+
34+
dfpFilterConfig := &envoydfp.FilterConfig{
35+
ImplementationSpecifier: &envoydfp.FilterConfig_SubClusterConfig{
36+
SubClusterConfig: &envoydfp.SubClusterConfig{},
37+
},
38+
}
39+
var anyFilter anypb.Any
40+
err := anypb.MarshalFrom(&anyFilter, dfpFilterConfig, proto.MarshalOptions{Deterministic: true})
41+
// error should never happen here. panic?
42+
if err != nil {
43+
panic(err)
44+
}
45+
// TODO: make sure anyFilter makes it to the perfilter config of the route.
46+
}
47+
48+
func processDFPCluster(ctx context.Context, in *v1alpha1.DynamicForwardProxy, out *envoy_config_cluster_v3.Cluster) {
49+
50+
out.LbPolicy = envoy_config_cluster_v3.Cluster_CLUSTER_PROVIDED
51+
c := &envoyclusters.ClusterConfig{
52+
ClusterImplementationSpecifier: &envoyclusters.ClusterConfig_SubClustersConfig{
53+
SubClustersConfig: &envoyclusters.SubClustersConfig{
54+
LbPolicy: envoy_config_cluster_v3.Cluster_LEAST_REQUEST,
55+
},
56+
},
57+
}
58+
var anyCluster anypb.Any
59+
err := anypb.MarshalFrom(&anyCluster, c, proto.MarshalOptions{Deterministic: true})
60+
// error should never happen here. panic?
61+
if err != nil {
62+
panic(err)
63+
}
64+
65+
// the upstream has a DNS name. We need Envoy to resolve the DNS name
66+
// set the type to strict dns
67+
out.ClusterDiscoveryType = &envoy_config_cluster_v3.Cluster_ClusterType{
68+
ClusterType: &envoy_config_cluster_v3.Cluster_CustomClusterType{
69+
Name: "envoy.clusters.dynamic_forward_proxy",
70+
TypedConfig: &anyCluster,
71+
},
72+
}
73+
74+
}

internal/gateway2/extensions2/plugins/upstream/plugin.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"maps"
77
"time"
88

9+
"google.golang.org/protobuf/proto"
910
"k8s.io/apimachinery/pkg/runtime"
1011
"k8s.io/apimachinery/pkg/runtime/schema"
1112
"k8s.io/apimachinery/pkg/watch"
1213

1314
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
1415
envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
1516
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
17+
envoydfp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/dynamic_forward_proxy/v3"
1618
awspb "github.com/solo-io/envoy-gloo/go/config/filter/http/aws_lambda/v2"
1719
skubeclient "istio.io/istio/pkg/config/schema/kubeclient"
1820
"istio.io/istio/pkg/kube/kclient"
@@ -62,7 +64,8 @@ func (d *upstreamDestination) Equals(in any) bool {
6264
}
6365

6466
type UpstreamIr struct {
65-
AwsSecret *ir.Secret
67+
AwsSecret *ir.Secret
68+
dfpFilterConfig *envoydfp.FilterConfig
6669
}
6770

6871
func (u *UpstreamIr) data() map[string][]byte {
@@ -77,13 +80,17 @@ func (u *UpstreamIr) Equals(other any) bool {
7780
if !ok {
7881
return false
7982
}
80-
return maps.EqualFunc(u.data(), otherUpstream.data(), func(a, b []byte) bool {
83+
if !maps.EqualFunc(u.data(), otherUpstream.data(), func(a, b []byte) bool {
8184
return bytes.Equal(a, b)
82-
})
85+
}) {
86+
return false
87+
}
88+
return proto.Equal(otherUpstream.dfpFilterConfig, u.dfpFilterConfig)
8389
}
8490

8591
type plugin2 struct {
86-
needFilter map[string]bool
92+
needAwsFilter map[string]bool
93+
neededDfpFilter map[string]map[string]*envoydfp.FilterConfig
8794
}
8895

8996
func registerTypes(ourCli versioned.Interface) {
@@ -171,6 +178,14 @@ func buildTranslateFunc(secrets *krtcollections.SecretIndex) func(krtctx krt.Han
171178
// return error
172179
}
173180
}
181+
if i.Spec.DynamicForwardProxy != nil {
182+
ir.dfpFilterConfig = &envoydfp.FilterConfig{
183+
ImplementationSpecifier: &envoydfp.FilterConfig_SubClusterConfig{
184+
SubClusterConfig: &envoydfp.SubClusterConfig{},
185+
},
186+
}
187+
}
188+
174189
return &ir
175190
}
176191
}
@@ -195,6 +210,8 @@ func processUpstream(ctx context.Context, in ir.Upstream, out *envoy_config_clus
195210
processStatic(ctx, spec.Static, out)
196211
case spec.Aws != nil:
197212
processAws(ctx, spec.Aws, ir, out)
213+
case spec.DynamicForwardProxy != nil:
214+
processDFPCluster(ctx, spec.DynamicForwardProxy, out)
198215
}
199216
}
200217

@@ -221,7 +238,10 @@ func processEndpoints(up *v1alpha1.Upstream) *ir.EndpointsForUpstream {
221238
}
222239

223240
func newPlug(ctx context.Context, tctx ir.GwTranslationCtx) ir.ProxyTranslationPass {
224-
return &plugin2{}
241+
return &plugin2{
242+
needAwsFilter: map[string]bool{},
243+
neededDfpFilter: map[string]map[string]*envoydfp.FilterConfig{},
244+
}
225245
}
226246

227247
func (p *plugin2) Name() string {
@@ -237,7 +257,7 @@ func (p *plugin2) ApplyVhostPlugin(ctx context.Context, pCtx *ir.VirtualHostCont
237257

238258
// called 0 or more times
239259
func (p *plugin2) ApplyForRoute(ctx context.Context, pCtx *ir.RouteContext, outputRoute *envoy_config_route_v3.Route) error {
240-
260+
p.processDFPRoute(ctx, pCtx, outputRoute)
241261
return nil
242262
}
243263

@@ -257,16 +277,20 @@ func (p *plugin2) ApplyForRouteBackend(
257277
// if a plugin emits new filters, they must be with a plugin unique name.
258278
// any filter returned from route config must be disabled, so it doesnt impact other routes.
259279
func (p *plugin2) HttpFilters(ctx context.Context, fc ir.FilterChainCommon) ([]plugins.StagedHttpFilter, error) {
260-
if !p.needFilter[fc.FilterChainName] {
261-
return nil, nil
280+
var ret []plugins.StagedHttpFilter
281+
if p.needAwsFilter[fc.FilterChainName] {
282+
filterConfig := &awspb.AWSLambdaConfig{}
283+
pluginStage := plugins.DuringStage(plugins.OutAuthStage)
284+
f, _ := plugins.NewStagedFilter(FilterName, filterConfig, pluginStage)
285+
ret = append(ret, f)
286+
}
287+
for clstrName, filterConfig := range p.neededDfpFilter[fc.FilterChainName] {
288+
pluginStage := plugins.DuringStage(plugins.OutAuthStage)
289+
f, _ := plugins.NewStagedFilter(clstrName, filterConfig, pluginStage)
290+
ret = append(ret, f)
262291
}
263-
filterConfig := &awspb.AWSLambdaConfig{}
264-
pluginStage := plugins.DuringStage(plugins.OutAuthStage)
265-
f, _ := plugins.NewStagedFilter(FilterName, filterConfig, pluginStage)
266292

267-
return []plugins.StagedHttpFilter{
268-
f,
269-
}, nil
293+
return ret, nil
270294
}
271295

272296
func (p *plugin2) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) {

internal/gateway2/ir/iface.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ func (r *RouteBackendContext) AddTypedConfig(key string, v *anypb.Any) {
3535
}
3636

3737
type RouteContext struct {
38-
Policy PolicyIR
39-
In HttpRouteRuleMatchIR
38+
FilterChainName string
39+
Policy PolicyIR
40+
In HttpRouteRuleMatchIR
4041
}
4142

4243
type ProxyTranslationPass interface {

internal/gateway2/translator/irtranslator/route.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,9 @@ func (h *httpRouteConfigurationTranslator) runRoutePlugins(ctx context.Context,
215215
}
216216
for _, pol := range pols {
217217
pctx := &ir.RouteContext{
218-
Policy: pol.PolicyIr,
219-
In: in,
218+
FilterChainName: h.fc.FilterChainName,
219+
Policy: pol.PolicyIr,
220+
In: in,
220221
}
221222
err := pass.ApplyForRoute(ctx, pctx, out)
222223
if err != nil {

0 commit comments

Comments
 (0)