Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/kube-gateway/app/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ func buildProxyHandlerChainFunc(o *proxyHandlerOptions) func(apiHandler http.Han
// rate and throughput monitor
throughputMonitor := monitor.NewThroughputMonitor()
rateMonitor := monitor.NewRateMonitor()
handler = gatewayfilters.WithRequestThroughput(handler, throughputMonitor)
handler = gatewayfilters.WithRequestReaderWriterWrapper(handler, throughputMonitor)
handler = gatewayfilters.WithRequestRate(handler, c.LongRunningFunc, rateMonitor)

handler = gatewayfilters.WithPreProcessingMetrics(handler)
handler = gatewayfilters.WithTraceLog(handler, o.enableProxyTracing, c.LongRunningFunc)
handler = gatewayfilters.WithUpstreamInfo(handler, o.clusterManager, c.Serializer)
handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{}, c.Serializer)
handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{LongRunningFunc: c.LongRunningFunc}, c.Serializer)
handler = gatewayfilters.WithTerminationMetrics(handler)
handler = gatewayfilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && o.goawayChance > 0 {
Expand Down
21 changes: 15 additions & 6 deletions pkg/flowcontrols/remote/global_flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ var (
GlobalMaxInflightBatchAcquireMin = int32(1)
)

const (
waitAcquireTimeout = time.Millisecond * 300
var (
waitAcquireTimeout = time.Millisecond * 300
batchAcquireMaxDuration = time.Millisecond * 100
)

func init() {
Expand Down Expand Up @@ -319,7 +320,7 @@ func (m *tokenBucketWrapper) ExpectToken() int32 {

batch := m.tokenBatch
lastQPS := m.meter.Rate()
if lastQPS > float64(m.tokenBatch) {
if lastQPS > float64(m.reserve) {
batch = int32(lastQPS) * GlobalTokenBucketBatchAcquiredPercent / 100
if batch < GlobalTokenBucketBatchAcquireMin {
batch = GlobalTokenBucketBatchAcquireMin
Expand All @@ -331,7 +332,15 @@ func (m *tokenBucketWrapper) ExpectToken() int32 {
if expect < 0 {
expect = 0
}
if expect > batch {

minBatch := m.tokenBatch
if expect < minBatch {
// skip acquiring until expect more than min batch
acquireTime := atomic.LoadInt64(&m.lastAcquireTime)
if time.Now().UnixNano()-acquireTime < int64(batchAcquireMaxDuration) {
expect = 0
}
} else if expect > batch {
expect = batch
}

Expand Down Expand Up @@ -372,8 +381,8 @@ func (m *tokenBucketWrapper) SetLimit(acquireResult *AcquireResult) bool {
if lastQPS < float64(localQPS) {
lastQPS = float64(localQPS)
}
klog.V(2).Infof("[global tokenBucket] cluster=%q resize flowcontrol=%s qps=%v for error: %v",
m.fcc.cluster, m.fcc.name, lastQPS, result.Error)
klog.V(2).Infof("[global tokenBucket] cluster=%q resize flowcontrol=%s qps=%v requestID=%v for error: %v",
m.fcc.cluster, m.fcc.name, lastQPS, acquireResult.requestTime, result.Error)

m.FlowControl.Resize(uint32(lastQPS), uint32(lastQPS))
atomic.StoreUint32(&m.serverUnavailable, 1)
Expand Down
23 changes: 19 additions & 4 deletions pkg/flowcontrols/remote/remote_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,26 @@ import (
)

const (
MaxIdealDuration = time.Millisecond * 900
MaxLimitRequestQPS = 200
LimitRequestTimeout = time.Millisecond * 290
MaxIdealDuration = time.Millisecond * 900
MaxLimitRequestQPS = 100
)

var (
LimitRequestTimeout = time.Millisecond * 500
)

func init() {
if val := os.Getenv("LIMIT_ACQUIRE_TIMEOUT"); len(val) > 0 {
duration, err := time.ParseDuration(val)
if err != nil {
klog.Warningf("Illegal LIMIT_ACQUIRE_TIMEOUT(%q): %v."+
" Default value %d is used", val, err, LimitRequestTimeout)
} else {
LimitRequestTimeout = duration
}
}
}

func NewGlobalCounterProvider(ctx context.Context, cluster string, clientSets clientsets.ClientSets, clientID string) GlobalCounterProvider {
return &globalCounterManager{
ctx: ctx,
Expand Down Expand Up @@ -195,7 +210,7 @@ func (g *globalCounterManager) doAcquire() {

acquireResult, err := client.ProxyV1alpha1().RateLimitConditions().Acquire(ctx, g.cluster, acquireRequest, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Do acquire request error: %v", err)
klog.Errorf("Do acquire request for cluster=%v requestID=%v error: %v", g.cluster, requestTime, err)
result = requestReasonRateLimiterError
if os.IsTimeout(err) {
result = requestReasonTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/responsewriter"

"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/monitor"
"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request"
"github.com/kubewharf/kubegateway/pkg/gateway/metrics"
)

Expand All @@ -38,8 +39,8 @@ const (

var startThroughputMetricOnce sync.Once

// WithRequestThroughput record request input and output throughput
func WithRequestThroughput(
// WithRequestReaderWriterWrapper record request input and output throughput
func WithRequestReaderWriterWrapper(
handler http.Handler,
throughputMonitor *monitor.ThroughputMonitor,
) http.Handler {
Expand All @@ -52,78 +53,138 @@ func WithRequestThroughput(
})

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
delegate := &throughputResponseWriter{
delegate := &responseWriterWrapper{
ResponseWriter: w,
throughputMonitor: throughputMonitor,
}
rw := responsewriter.WrapForHTTP1Or2(delegate)

rd := &throughputRequestReader{
rd := &requestReaderWrapper{
ReadCloser: req.Body,
throughputMonitor: throughputMonitor,
}
req.Body = rd

info, ok := request.ExtraRequestInfoFrom(req.Context())
if !ok {
handler.ServeHTTP(w, req)
return
}
info.ReaderWriter = &readerWriter{
requestReaderWrapper: rd,
responseWriterWrapper: delegate,
}
req = req.WithContext(request.WithExtraRequestInfo(req.Context(), info))
handler.ServeHTTP(rw, req)
})
}

var _ io.ReadCloser = &throughputRequestReader{}
var _ request.RequestReaderWriterWrapper = &readerWriter{}

type readerWriter struct {
*requestReaderWrapper
*responseWriterWrapper
}

var _ io.ReadCloser = &requestReaderWrapper{}

type throughputRequestReader struct {
type requestReaderWrapper struct {
io.ReadCloser
read int
throughputMonitor *monitor.ThroughputMonitor
}

// Write implements io.ReadCloser
func (r *throughputRequestReader) Read(p []byte) (int, error) {
func (r *requestReaderWrapper) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
r.read += n

r.throughputMonitor.RecordRequest(int64(n))
return n, err
}

func (r *throughputRequestReader) RequestSize() int {
func (r *requestReaderWrapper) RequestSize() int {
return r.read
}

var _ http.ResponseWriter = &throughputResponseWriter{}
var _ responsewriter.UserProvidedDecorator = &throughputResponseWriter{}
var _ http.ResponseWriter = &responseWriterWrapper{}
var _ responsewriter.UserProvidedDecorator = &responseWriterWrapper{}

type responseWriterWrapper struct {
statusRecorded bool
status int
addedInfo string
startTime time.Time
captureErrorOutput bool
written int64
read int

type throughputResponseWriter struct {
http.ResponseWriter
written int64
throughputMonitor *monitor.ThroughputMonitor
}

func (w *throughputResponseWriter) Unwrap() http.ResponseWriter {
func (w *responseWriterWrapper) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}

// WriteHeader implements http.ResponseWriter.
func (w *responseWriterWrapper) WriteHeader(status int) {
w.recordStatus(status)
w.ResponseWriter.WriteHeader(status)
}

func (w *responseWriterWrapper) recordStatus(status int) {
w.status = status
w.statusRecorded = true
w.captureErrorOutput = captureErrorOutput(status)
}

func captureErrorOutput(code int) bool {
return code >= http.StatusInternalServerError
}

// Write implements http.ResponseWriter.
func (w *throughputResponseWriter) Write(b []byte) (int, error) {
func (w *responseWriterWrapper) Write(b []byte) (int, error) {
if !w.statusRecorded {
w.recordStatus(http.StatusOK) // Default if WriteHeader hasn't been called
}
if w.captureErrorOutput {
w.debugf("logging error output: %q\n", string(b))
}

n, err := w.ResponseWriter.Write(b)
w.written += int64(n)

w.throughputMonitor.RecordResponse(int64(n))
return n, err
}

// debugf adds additional data to be logged with this request.
func (w *responseWriterWrapper) debugf(format string, data ...interface{}) {
w.addedInfo += "\n" + fmt.Sprintf(format, data...)
}

// Hijack implements http.Hijacker. If the underlying ResponseWriter is a
// Hijacker, its Hijack method is returned. Otherwise an error is returned.
func (w *throughputResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
func (w *responseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hj, ok := w.ResponseWriter.(http.Hijacker); ok {
return hj.Hijack()
}
return nil, nil, fmt.Errorf("http.Hijacker interface is not supported")
}

func (w *throughputResponseWriter) ResponseSize() int {
func (w *responseWriterWrapper) ResponseSize() int {
return int(w.written)
}

func (w *responseWriterWrapper) Status() int {
return w.status
}

func (w *responseWriterWrapper) AddedInfo() string {
return w.addedInfo
}

func startRecordingThroughputMetric(throughputMonitor *monitor.ThroughputMonitor) {
go func() {
wait.Forever(func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/gateway/endpoints/filters/termination.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (rw *terminationMetricsWriter) recordMetrics(req *http.Request) {
// use status text when reason is empty
reason = stringy.New(http.StatusText(rw.status)).SnakeCase().ToLower()
}
metrics.RecordProxyRequestTermination(req, rw.status, reason, proxyInfo.FlowControl)

metrics.RecordProxyRequestTermination(req, rw.status, reason, proxyInfo.FlowControl, proxyInfo.User)
}
}
20 changes: 17 additions & 3 deletions pkg/gateway/endpoints/filters/tracelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog"
Expand Down Expand Up @@ -61,11 +62,16 @@ func WithTraceLog(handler http.Handler, enableTracing bool, longRunningRequestCh
defer func() {
tr.End()

metrics.RecordProxyTraceLatency(tr.StageLatency(), extraInfo.Hostname, requestInfo)
proxyInfo, ok := request.ExtraProxyInfoFrom(req.Context())
var userInfo user.Info
if ok {
userInfo = proxyInfo.User
}
metrics.RecordProxyTraceLatency(tr.StageLatency(), extraInfo.Hostname, requestInfo, userInfo, req)

threshold := request.LogThreshold(requestInfo.Verb)
if req.Header.Get("x-debug-trace-log") == "1" || tr.IfLong(threshold) {
tr.WithAttributes(traceFields(req, requestInfo)...)
tr.WithAttributes(traceFields(req, requestInfo, userInfo)...)
tr.Log()
}
}()
Expand All @@ -80,13 +86,21 @@ func WithTraceLog(handler http.Handler, enableTracing bool, longRunningRequestCh
})
}

func traceFields(req *http.Request, requestInfo *apirequest.RequestInfo) []tracing.KeyValue {
func traceFields(req *http.Request, requestInfo *apirequest.RequestInfo, user user.Info) []tracing.KeyValue {
sourceIPs := utilnet.SourceIPs(req)
userName := "anonymous"
userGroup := []string{"anonymous"}
if user != nil {
userName = user.GetName()
userGroup = user.GetGroups()
}
return []tracing.KeyValue{
tracing.StringKeyValue("verb", requestInfo.Verb),
tracing.StringKeyValue("resource", requestInfo.Resource),
tracing.StringKeyValue("name", requestInfo.Name),
tracing.StringKeyValue("host", req.Host),
tracing.StringKeyValue("user-name", userName),
tracing.StringKeyValue("user-group", fmt.Sprintf("%v", userGroup)),
tracing.StringKeyValue("user-agent", req.Header.Get("User-Agent")),
tracing.StringKeyValue("srcIP", fmt.Sprintf("%v", sourceIPs)),
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/gateway/endpoints/request/proxyInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package request
import (
"context"
"fmt"

"k8s.io/apiserver/pkg/authentication/user"
)

// ProxyInfo contains information that indicates if the request is proxied
Expand All @@ -25,6 +27,7 @@ type ProxyInfo struct {
Endpoint string
Reason string
FlowControl string
User user.Info
}

func NewProxyInfo() *ProxyInfo {
Expand All @@ -44,12 +47,13 @@ func ExtraProxyInfoFrom(ctx context.Context) (*ProxyInfo, bool) {
return info, ok
}

func SetFlowControl(ctx context.Context, flowControl string) error {
func SetProxyInfo(ctx context.Context, flowControl string, user user.Info) error {
info, ok := ExtraProxyInfoFrom(ctx)
if !ok {
return fmt.Errorf("no proxy info found in context")
}
info.FlowControl = flowControl
info.User = user
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/gateway/endpoints/request/readerwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package request

type RequestReaderWriterWrapper interface {
RequestSize() int
ResponseSize() int
Status() int
AddedInfo() string
}
Loading