diff --git a/cmd/kube-gateway/app/proxy.go b/cmd/kube-gateway/app/proxy.go index 21efb7e..d103c9f 100644 --- a/cmd/kube-gateway/app/proxy.go +++ b/cmd/kube-gateway/app/proxy.go @@ -157,13 +157,13 @@ func buildProxyHandlerChainFunc(o *proxyHandlerOptions) func(apiHandler http.Han // rate and throughput monitor throughputMonitor := monitor.NewThroughputMonitor() rateMonitor := monitor.NewRateMonitor() - handler = gatewayfilters.WithRequestThroughput(handler, throughputMonitor) + handler = gatewayfilters.WithRequestReaderWriterWrapper(handler, throughputMonitor) handler = gatewayfilters.WithRequestRate(handler, c.LongRunningFunc, rateMonitor) handler = gatewayfilters.WithPreProcessingMetrics(handler) handler = gatewayfilters.WithTraceLog(handler, o.enableProxyTracing, c.LongRunningFunc) handler = gatewayfilters.WithUpstreamInfo(handler, o.clusterManager, c.Serializer) - handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{}, c.Serializer) + handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{LongRunningFunc: c.LongRunningFunc}, c.Serializer) handler = gatewayfilters.WithTerminationMetrics(handler) handler = gatewayfilters.WithRequestInfo(handler, c.RequestInfoResolver) if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && o.goawayChance > 0 { diff --git a/pkg/flowcontrols/remote/global_flowcontrol.go b/pkg/flowcontrols/remote/global_flowcontrol.go index c46d1f5..dc96b58 100644 --- a/pkg/flowcontrols/remote/global_flowcontrol.go +++ b/pkg/flowcontrols/remote/global_flowcontrol.go @@ -29,8 +29,9 @@ var ( GlobalMaxInflightBatchAcquireMin = int32(1) ) -const ( - waitAcquireTimeout = time.Millisecond * 300 +var ( + waitAcquireTimeout = time.Millisecond * 300 + batchAcquireMaxDuration = time.Millisecond * 100 ) func init() { @@ -319,7 +320,7 @@ func (m *tokenBucketWrapper) ExpectToken() int32 { batch := m.tokenBatch lastQPS := m.meter.Rate() - if lastQPS > float64(m.tokenBatch) { + if lastQPS > float64(m.reserve) { batch = int32(lastQPS) * GlobalTokenBucketBatchAcquiredPercent / 100 if batch < GlobalTokenBucketBatchAcquireMin { batch = GlobalTokenBucketBatchAcquireMin @@ -331,7 +332,15 @@ func (m *tokenBucketWrapper) ExpectToken() int32 { if expect < 0 { expect = 0 } - if expect > batch { + + minBatch := m.tokenBatch + if expect < minBatch { + // skip acquiring until expect more than min batch + acquireTime := atomic.LoadInt64(&m.lastAcquireTime) + if time.Now().UnixNano()-acquireTime < int64(batchAcquireMaxDuration) { + expect = 0 + } + } else if expect > batch { expect = batch } @@ -372,8 +381,8 @@ func (m *tokenBucketWrapper) SetLimit(acquireResult *AcquireResult) bool { if lastQPS < float64(localQPS) { lastQPS = float64(localQPS) } - klog.V(2).Infof("[global tokenBucket] cluster=%q resize flowcontrol=%s qps=%v for error: %v", - m.fcc.cluster, m.fcc.name, lastQPS, result.Error) + klog.V(2).Infof("[global tokenBucket] cluster=%q resize flowcontrol=%s qps=%v requestID=%v for error: %v", + m.fcc.cluster, m.fcc.name, lastQPS, acquireResult.requestTime, result.Error) m.FlowControl.Resize(uint32(lastQPS), uint32(lastQPS)) atomic.StoreUint32(&m.serverUnavailable, 1) diff --git a/pkg/flowcontrols/remote/remote_counter.go b/pkg/flowcontrols/remote/remote_counter.go index 1a5a7da..0e29dc2 100644 --- a/pkg/flowcontrols/remote/remote_counter.go +++ b/pkg/flowcontrols/remote/remote_counter.go @@ -17,11 +17,26 @@ import ( ) const ( - MaxIdealDuration = time.Millisecond * 900 - MaxLimitRequestQPS = 200 - LimitRequestTimeout = time.Millisecond * 290 + MaxIdealDuration = time.Millisecond * 900 + MaxLimitRequestQPS = 100 ) +var ( + LimitRequestTimeout = time.Millisecond * 500 +) + +func init() { + if val := os.Getenv("LIMIT_ACQUIRE_TIMEOUT"); len(val) > 0 { + duration, err := time.ParseDuration(val) + if err != nil { + klog.Warningf("Illegal LIMIT_ACQUIRE_TIMEOUT(%q): %v."+ + " Default value %d is used", val, err, LimitRequestTimeout) + } else { + LimitRequestTimeout = duration + } + } +} + func NewGlobalCounterProvider(ctx context.Context, cluster string, clientSets clientsets.ClientSets, clientID string) GlobalCounterProvider { return &globalCounterManager{ ctx: ctx, @@ -195,7 +210,7 @@ func (g *globalCounterManager) doAcquire() { acquireResult, err := client.ProxyV1alpha1().RateLimitConditions().Acquire(ctx, g.cluster, acquireRequest, metav1.CreateOptions{}) if err != nil { - klog.Errorf("Do acquire request error: %v", err) + klog.Errorf("Do acquire request for cluster=%v requestID=%v error: %v", g.cluster, requestTime, err) result = requestReasonRateLimiterError if os.IsTimeout(err) { result = requestReasonTimeout diff --git a/pkg/gateway/endpoints/filters/requestthroughput.go b/pkg/gateway/endpoints/filters/readerwriter.go similarity index 52% rename from pkg/gateway/endpoints/filters/requestthroughput.go rename to pkg/gateway/endpoints/filters/readerwriter.go index a32b3d0..74263bd 100644 --- a/pkg/gateway/endpoints/filters/requestthroughput.go +++ b/pkg/gateway/endpoints/filters/readerwriter.go @@ -29,6 +29,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/responsewriter" "github.com/kubewharf/kubegateway/pkg/gateway/endpoints/monitor" + "github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request" "github.com/kubewharf/kubegateway/pkg/gateway/metrics" ) @@ -38,8 +39,8 @@ const ( var startThroughputMetricOnce sync.Once -// WithRequestThroughput record request input and output throughput -func WithRequestThroughput( +// WithRequestReaderWriterWrapper record request input and output throughput +func WithRequestReaderWriterWrapper( handler http.Handler, throughputMonitor *monitor.ThroughputMonitor, ) http.Handler { @@ -52,32 +53,49 @@ func WithRequestThroughput( }) return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - delegate := &throughputResponseWriter{ + delegate := &responseWriterWrapper{ ResponseWriter: w, throughputMonitor: throughputMonitor, } rw := responsewriter.WrapForHTTP1Or2(delegate) - rd := &throughputRequestReader{ + rd := &requestReaderWrapper{ ReadCloser: req.Body, throughputMonitor: throughputMonitor, } req.Body = rd + info, ok := request.ExtraRequestInfoFrom(req.Context()) + if !ok { + handler.ServeHTTP(w, req) + return + } + info.ReaderWriter = &readerWriter{ + requestReaderWrapper: rd, + responseWriterWrapper: delegate, + } + req = req.WithContext(request.WithExtraRequestInfo(req.Context(), info)) handler.ServeHTTP(rw, req) }) } -var _ io.ReadCloser = &throughputRequestReader{} +var _ request.RequestReaderWriterWrapper = &readerWriter{} + +type readerWriter struct { + *requestReaderWrapper + *responseWriterWrapper +} + +var _ io.ReadCloser = &requestReaderWrapper{} -type throughputRequestReader struct { +type requestReaderWrapper struct { io.ReadCloser read int throughputMonitor *monitor.ThroughputMonitor } // Write implements io.ReadCloser -func (r *throughputRequestReader) Read(p []byte) (int, error) { +func (r *requestReaderWrapper) Read(p []byte) (int, error) { n, err := r.ReadCloser.Read(p) r.read += n @@ -85,25 +103,55 @@ func (r *throughputRequestReader) Read(p []byte) (int, error) { return n, err } -func (r *throughputRequestReader) RequestSize() int { +func (r *requestReaderWrapper) RequestSize() int { return r.read } -var _ http.ResponseWriter = &throughputResponseWriter{} -var _ responsewriter.UserProvidedDecorator = &throughputResponseWriter{} +var _ http.ResponseWriter = &responseWriterWrapper{} +var _ responsewriter.UserProvidedDecorator = &responseWriterWrapper{} + +type responseWriterWrapper struct { + statusRecorded bool + status int + addedInfo string + startTime time.Time + captureErrorOutput bool + written int64 + read int -type throughputResponseWriter struct { http.ResponseWriter - written int64 throughputMonitor *monitor.ThroughputMonitor } -func (w *throughputResponseWriter) Unwrap() http.ResponseWriter { +func (w *responseWriterWrapper) Unwrap() http.ResponseWriter { return w.ResponseWriter } +// WriteHeader implements http.ResponseWriter. +func (w *responseWriterWrapper) WriteHeader(status int) { + w.recordStatus(status) + w.ResponseWriter.WriteHeader(status) +} + +func (w *responseWriterWrapper) recordStatus(status int) { + w.status = status + w.statusRecorded = true + w.captureErrorOutput = captureErrorOutput(status) +} + +func captureErrorOutput(code int) bool { + return code >= http.StatusInternalServerError +} + // Write implements http.ResponseWriter. -func (w *throughputResponseWriter) Write(b []byte) (int, error) { +func (w *responseWriterWrapper) Write(b []byte) (int, error) { + if !w.statusRecorded { + w.recordStatus(http.StatusOK) // Default if WriteHeader hasn't been called + } + if w.captureErrorOutput { + w.debugf("logging error output: %q\n", string(b)) + } + n, err := w.ResponseWriter.Write(b) w.written += int64(n) @@ -111,19 +159,32 @@ func (w *throughputResponseWriter) Write(b []byte) (int, error) { return n, err } +// debugf adds additional data to be logged with this request. +func (w *responseWriterWrapper) debugf(format string, data ...interface{}) { + w.addedInfo += "\n" + fmt.Sprintf(format, data...) +} + // Hijack implements http.Hijacker. If the underlying ResponseWriter is a // Hijacker, its Hijack method is returned. Otherwise an error is returned. -func (w *throughputResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { +func (w *responseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { if hj, ok := w.ResponseWriter.(http.Hijacker); ok { return hj.Hijack() } return nil, nil, fmt.Errorf("http.Hijacker interface is not supported") } -func (w *throughputResponseWriter) ResponseSize() int { +func (w *responseWriterWrapper) ResponseSize() int { return int(w.written) } +func (w *responseWriterWrapper) Status() int { + return w.status +} + +func (w *responseWriterWrapper) AddedInfo() string { + return w.addedInfo +} + func startRecordingThroughputMetric(throughputMonitor *monitor.ThroughputMonitor) { go func() { wait.Forever(func() { diff --git a/pkg/gateway/endpoints/filters/termination.go b/pkg/gateway/endpoints/filters/termination.go index 8a961f2..32e7814 100644 --- a/pkg/gateway/endpoints/filters/termination.go +++ b/pkg/gateway/endpoints/filters/termination.go @@ -82,6 +82,7 @@ func (rw *terminationMetricsWriter) recordMetrics(req *http.Request) { // use status text when reason is empty reason = stringy.New(http.StatusText(rw.status)).SnakeCase().ToLower() } - metrics.RecordProxyRequestTermination(req, rw.status, reason, proxyInfo.FlowControl) + + metrics.RecordProxyRequestTermination(req, rw.status, reason, proxyInfo.FlowControl, proxyInfo.User) } } diff --git a/pkg/gateway/endpoints/filters/tracelog.go b/pkg/gateway/endpoints/filters/tracelog.go index d36d8c6..be1b103 100644 --- a/pkg/gateway/endpoints/filters/tracelog.go +++ b/pkg/gateway/endpoints/filters/tracelog.go @@ -7,6 +7,7 @@ import ( "net/http" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog" @@ -61,11 +62,16 @@ func WithTraceLog(handler http.Handler, enableTracing bool, longRunningRequestCh defer func() { tr.End() - metrics.RecordProxyTraceLatency(tr.StageLatency(), extraInfo.Hostname, requestInfo) + proxyInfo, ok := request.ExtraProxyInfoFrom(req.Context()) + var userInfo user.Info + if ok { + userInfo = proxyInfo.User + } + metrics.RecordProxyTraceLatency(tr.StageLatency(), extraInfo.Hostname, requestInfo, userInfo, req) threshold := request.LogThreshold(requestInfo.Verb) if req.Header.Get("x-debug-trace-log") == "1" || tr.IfLong(threshold) { - tr.WithAttributes(traceFields(req, requestInfo)...) + tr.WithAttributes(traceFields(req, requestInfo, userInfo)...) tr.Log() } }() @@ -80,13 +86,21 @@ func WithTraceLog(handler http.Handler, enableTracing bool, longRunningRequestCh }) } -func traceFields(req *http.Request, requestInfo *apirequest.RequestInfo) []tracing.KeyValue { +func traceFields(req *http.Request, requestInfo *apirequest.RequestInfo, user user.Info) []tracing.KeyValue { sourceIPs := utilnet.SourceIPs(req) + userName := "anonymous" + userGroup := []string{"anonymous"} + if user != nil { + userName = user.GetName() + userGroup = user.GetGroups() + } return []tracing.KeyValue{ tracing.StringKeyValue("verb", requestInfo.Verb), tracing.StringKeyValue("resource", requestInfo.Resource), tracing.StringKeyValue("name", requestInfo.Name), tracing.StringKeyValue("host", req.Host), + tracing.StringKeyValue("user-name", userName), + tracing.StringKeyValue("user-group", fmt.Sprintf("%v", userGroup)), tracing.StringKeyValue("user-agent", req.Header.Get("User-Agent")), tracing.StringKeyValue("srcIP", fmt.Sprintf("%v", sourceIPs)), } diff --git a/pkg/gateway/endpoints/request/proxyInfo.go b/pkg/gateway/endpoints/request/proxyInfo.go index 897c20d..f4143ae 100644 --- a/pkg/gateway/endpoints/request/proxyInfo.go +++ b/pkg/gateway/endpoints/request/proxyInfo.go @@ -17,6 +17,8 @@ package request import ( "context" "fmt" + + "k8s.io/apiserver/pkg/authentication/user" ) // ProxyInfo contains information that indicates if the request is proxied @@ -25,6 +27,7 @@ type ProxyInfo struct { Endpoint string Reason string FlowControl string + User user.Info } func NewProxyInfo() *ProxyInfo { @@ -44,12 +47,13 @@ func ExtraProxyInfoFrom(ctx context.Context) (*ProxyInfo, bool) { return info, ok } -func SetFlowControl(ctx context.Context, flowControl string) error { +func SetProxyInfo(ctx context.Context, flowControl string, user user.Info) error { info, ok := ExtraProxyInfoFrom(ctx) if !ok { return fmt.Errorf("no proxy info found in context") } info.FlowControl = flowControl + info.User = user return nil } diff --git a/pkg/gateway/endpoints/request/readerwriter.go b/pkg/gateway/endpoints/request/readerwriter.go new file mode 100644 index 0000000..a669cbc --- /dev/null +++ b/pkg/gateway/endpoints/request/readerwriter.go @@ -0,0 +1,8 @@ +package request + +type RequestReaderWriterWrapper interface { + RequestSize() int + ResponseSize() int + Status() int + AddedInfo() string +} diff --git a/pkg/gateway/endpoints/request/requestinfo.go b/pkg/gateway/endpoints/request/requestinfo.go index 330f2dd..694c560 100644 --- a/pkg/gateway/endpoints/request/requestinfo.go +++ b/pkg/gateway/endpoints/request/requestinfo.go @@ -16,10 +16,12 @@ package request import ( "context" + "errors" "net/http" authenticationv1 "k8s.io/api/authentication/v1" "k8s.io/apiserver/pkg/authentication/user" + apirequest "k8s.io/apiserver/pkg/endpoints/request" "github.com/kubewharf/kubegateway/pkg/clusters" "github.com/kubewharf/kubegateway/pkg/gateway/net" @@ -39,9 +41,19 @@ type ExtraRequestInfoResolver interface { NewExtraRequestInfo(req *http.Request) (*ExtraRequestInfo, error) } -type ExtraRequestInfoFactory struct{} +type ExtraRequestInfoFactory struct { + LongRunningFunc apirequest.LongRunningRequestCheck +} func (f *ExtraRequestInfoFactory) NewExtraRequestInfo(req *http.Request) (*ExtraRequestInfo, error) { + ctx := req.Context() + requestInfo, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return nil, errors.New("no RequestInfo found in the context") + } + + isLongRunning := f.LongRunningFunc(req, requestInfo) + isImpersonate := len(req.Header.Get(authenticationv1.ImpersonateUserHeader)) > 0 hostname := net.HostWithoutPort(req.Host) @@ -49,6 +61,7 @@ func (f *ExtraRequestInfoFactory) NewExtraRequestInfo(req *http.Request) (*Extra Scheme: req.URL.Scheme, Hostname: hostname, IsImpersonateRequest: isImpersonate, + IsLongRunningRequest: isLongRunning, }, nil } @@ -58,7 +71,9 @@ type ExtraRequestInfo struct { IsImpersonateRequest bool Impersonator user.Info UpstreamCluster *clusters.ClusterInfo + ReaderWriter RequestReaderWriterWrapper IsProxyRequest bool + IsLongRunningRequest bool } // WithExtraRequestInfo returns a copy of parent in which the ExtraRequestInfo value is set diff --git a/pkg/gateway/metrics/metrics.go b/pkg/gateway/metrics/metrics.go index efbe1ac..e5865d9 100644 --- a/pkg/gateway/metrics/metrics.go +++ b/pkg/gateway/metrics/metrics.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilsets "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -88,7 +89,15 @@ func RecordProxyRequestReceived(req *http.Request, serverName string, requestInf // MonitorProxyRequest handles standard transformations for client and the reported verb and then invokes Monitor to record // a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. -func MonitorProxyRequest(req *http.Request, serverName, endpoint, flowControl string, requestInfo *request.RequestInfo, contentType string, httpCode, respSize int, elapsed time.Duration) { +func MonitorProxyRequest(req *http.Request, serverName, endpoint, flowControl string, + requestInfo *request.RequestInfo, + user user.Info, + isLongRunning bool, + contentType string, + httpCode, + reqSize, + respSize int, + elapsed time.Duration) { if requestInfo == nil { requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path} } @@ -104,43 +113,40 @@ func MonitorProxyRequest(req *http.Request, serverName, endpoint, flowControl st } } - ProxyRequestCounterObservers.Observe(MetricInfo{ - ServerName: serverName, - Endpoint: endpoint, - FlowControl: flowControl, - Verb: verb, - Resource: resource, - HttpCode: codeToString(httpCode), - Latency: elapsedSeconds, - Request: req, - }) - ProxyRequestLatenciesObservers.Observe(MetricInfo{ - ServerName: serverName, - Endpoint: endpoint, - FlowControl: flowControl, - Verb: verb, - Resource: resource, - Latency: elapsedSeconds, - Request: req, - }) + userName := cleanUserForMetric(user) + + metricInfo := MetricInfo{ + ServerName: serverName, + Endpoint: endpoint, + FlowControl: flowControl, + Verb: verb, + Resource: resource, + ResponseSize: int64(respSize), + RequestSize: int64(reqSize), + HttpCode: codeToString(httpCode), + Latency: elapsedSeconds, + Request: req, + RequestInfo: requestInfo, + IsLongRunning: isLongRunning, + User: user, + UserName: userName, + } + + ProxyRequestCounterObservers.Observe(metricInfo) + ProxyRequestLatenciesObservers.Observe(metricInfo) if requestInfo.IsResourceRequest { - ProxyResponseSizesObservers.Observe(MetricInfo{ - ServerName: serverName, - Endpoint: endpoint, - Verb: verb, - Resource: resource, - ResponseSize: int64(respSize), - Request: req, - }) + ProxyResponseSizesObservers.Observe(metricInfo) } + + ProxyRequestDataSizeObservers.Observe(metricInfo) } // RecordProxyRequestTermination records that the request was terminated early as part of a resource // preservation or apiserver self-defense mechanism (e.g. timeouts, maxinflight throttling, // proxyHandler errors). RecordProxyRequestTermination should only be called zero or one times // per request. -func RecordProxyRequestTermination(req *http.Request, code int, reason, flowControl string) { +func RecordProxyRequestTermination(req *http.Request, code int, reason, flowControl string, user user.Info) { requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()) if !ok { requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path} @@ -159,6 +165,8 @@ func RecordProxyRequestTermination(req *http.Request, code int, reason, flowCont serverName := net.HostWithoutPort(req.Host) resource := cleanResource(requestInfo) + userName := cleanUserForMetric(user) + ProxyRequestTerminationsObservers.Observe(MetricInfo{ ServerName: serverName, FlowControl: flowControl, @@ -168,6 +176,8 @@ func RecordProxyRequestTermination(req *http.Request, code int, reason, flowCont Reason: reason, Resource: resource, Request: req, + User: user, + UserName: userName, }) } @@ -287,20 +297,39 @@ func RecordProxyRateAndInflight(rate float64, inflight int32) { } func RecordRequestThroughput(requestSizeTotal, responseSizeTotal int64) { - ProxyRequestThroughputObservers.Observe(MetricInfo{ + ProxyRequestTotalDataSizeObservers.Observe(MetricInfo{ RequestSize: requestSizeTotal, ResponseSize: responseSizeTotal, }) } -func RecordProxyTraceLatency(traceLatencies map[string]time.Duration, serverName string, requestInfo *request.RequestInfo) { +func RecordProxyTraceLatency(traceLatencies map[string]time.Duration, serverName string, requestInfo *request.RequestInfo, user user.Info, req *http.Request) { verb := strings.ToUpper(requestInfo.Verb) resource := cleanResource(requestInfo) + userName := cleanUserForMetric(user) + metric := MetricInfo{ ServerName: serverName, Verb: verb, Resource: resource, TraceLatencies: traceLatencies, + Request: req, + User: user, + UserName: userName, } ProxyHandlingLatencyObservers.Observe(metric) } + +func cleanUserForMetric(user user.Info) string { + if user == nil { + return "anonymous" + } + userName := user.GetName() + for _, ug := range user.GetGroups() { + if strings.Contains(ug, "system:nodes") { + userName = ug + break + } + } + return userName +} diff --git a/pkg/gateway/metrics/obsever.go b/pkg/gateway/metrics/obsever.go index 3756e72..1a55011 100644 --- a/pkg/gateway/metrics/obsever.go +++ b/pkg/gateway/metrics/obsever.go @@ -3,6 +3,9 @@ package metrics import ( "net/http" "time" + + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" ) type MetricObserver interface { @@ -27,14 +30,19 @@ var ( ProxyRateLimiterRequestCounterObservers = newUnionObserver() ProxyGlobalFlowControlAcquireObservers = newUnionObserver() - ProxyRequestInflightObservers = newUnionObserver() - ProxyRequestThroughputObservers = newUnionObserver() + ProxyRequestInflightObservers = newUnionObserver() + ProxyRequestTotalDataSizeObservers = newUnionObserver() + ProxyRequestDataSizeObservers = newUnionObserver() ProxyHandlingLatencyObservers = newUnionObserver() ) type MetricInfo struct { Request *http.Request + RequestInfo *request.RequestInfo + User user.Info + UserName string + IsLongRunning bool IsResourceRequest bool ServerName string Endpoint string diff --git a/pkg/gateway/metrics/prometheus.go b/pkg/gateway/metrics/prometheus.go index d98c248..41bb6d0 100644 --- a/pkg/gateway/metrics/prometheus.go +++ b/pkg/gateway/metrics/prometheus.go @@ -15,14 +15,16 @@ package metrics import ( - "github.com/kubewharf/kubegateway/pkg/util/tracing" - "github.com/prometheus/client_golang/prometheus" - compbasemetrics "k8s.io/component-base/metrics" "os" "strconv" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + compbasemetrics "k8s.io/component-base/metrics" metricsregistry "github.com/kubewharf/kubegateway/pkg/gateway/metrics/registry" + "github.com/kubewharf/kubegateway/pkg/util/tracing" ) const ( @@ -30,7 +32,15 @@ const ( subsystem = "proxy" ) +func init() { + if os.Getenv("ENABLE_REQUEST_METRIC_WITH_USER") == "true" { + enableRequestMetricByUser = true + } +} + var ( + enableRequestMetricByUser = false + proxyPid = strconv.Itoa(os.Getpid()) proxyReceiveRequestCounter = compbasemetrics.NewCounterVec( @@ -53,6 +63,29 @@ var ( }, []string{"pid", "serverName", "endpoint", "verb", "resource", "code", "flowcontrol"}, ) + + proxyUserRequestCounter = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "apiserver_request_total_with_user", + Help: "Counter of proxied apiserver requests by user, it is recorded when this proxied request ends", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"serverName", "verb", "resource", "code", "flowcontrol", "user"}, + ) + + proxyUserRequestLoad = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "apiserver_request_load_with_user", + Help: "Total time cost of proxied apiserver requests by user, it is recorded when this proxied request ends", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"serverName", "verb", "resource", "code", "flowcontrol", "user"}, + ) + proxyRequestLatencies = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, @@ -62,12 +95,25 @@ var ( // This metric is used for verifying api call latencies SLO, // as well as tracking regressions in this aspects. // Thus we customize buckets significantly, to empower both usecases. - Buckets: []float64{0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, - 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60, 120, 180, 240, 300}, + Buckets: []float64{0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 5, 10, 30, 60, 120, 180, 600}, StabilityLevel: compbasemetrics.ALPHA, }, []string{"pid", "serverName", "endpoint", "verb", "resource"}, ) + proxyRequestUserLatencies = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "apiserver_request_duration_seconds_with_user", + Help: "Response latency distribution in seconds for each serverName, endpoint, verb, resource, user.", + // This metric is used for verifying api call latencies SLO, + // as well as tracking regressions in this aspects. + // Thus we customize buckets significantly, to empower both usecases. + Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1.0, 5, 10, 30, 60, 120, 180, 600}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"serverName", "verb", "resource", "flowcontrol", "user", "priority"}, + ) proxyResponseSizes = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, @@ -100,6 +146,16 @@ var ( }, []string{"pid", "serverName", "verb", "code", "reason", "resource", "flowcontrol"}, ) + proxyRequestUserTerminations = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "apiserver_request_terminations_total_with_user", + Help: "Terminated requests number with user.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"serverName", "verb", "code", "reason", "resource", "flowcontrol", "user"}, + ) // proxyRegisteredWatchers is a number of currently registered watchers splitted by resource. proxyRegisteredWatchers = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ @@ -158,8 +214,8 @@ var ( []string{"pid"}, ) - // proxyRequestThroughput is the total http data size request and response in bytes - proxyRequestThroughput = compbasemetrics.NewGaugeVec( + // proxyRequestTotalDataSize is the total http data size request and response in bytes + proxyRequestTotalDataSize = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -170,6 +226,18 @@ var ( []string{"pid", "type"}, ) + // proxyRequestUserDataSize is the http data size request and response in bytes + proxyRequestUserDataSize = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "http_data_size_bytes_with_user", + Help: "The http data size for each serverName, verb, resource, user", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"type", "serverName", "verb", "resource", "flowcontrol", "user"}, + ) + proxyHandlingLatencies = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, @@ -183,6 +251,19 @@ var ( []string{"pid", "serverName", "verb", "resource", "stage"}, ) + proxyUserClientOverCostLatencies = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "user_client_over_cost_seconds", + Help: "User client over cost (client_cost - upstream_cost) for each serverName, verb, resource, user.", + // Start with 1ms with the last bucket being [~10s, Inf) + Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1.0, 5, 10, 30, 60}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"serverName", "verb", "resource", "user"}, + ) + localMetrics = []compbasemetrics.Registerable{ proxyReceiveRequestCounter, proxyRequestCounter, @@ -190,13 +271,19 @@ var ( proxyResponseSizes, proxyUpstreamUnhealthy, proxyRequestTerminationsTotal, + proxyRequestUserTerminations, proxyRegisteredWatchers, proxyRateLimiterRequestCounter, proxyGlobalFlowControlRequestCounter, proxyRequestInflight, proxyRequestRate, - proxyRequestThroughput, + proxyRequestTotalDataSize, + proxyRequestUserDataSize, proxyHandlingLatencies, + proxyUserRequestCounter, + proxyUserRequestLoad, + proxyRequestUserLatencies, + proxyUserClientOverCostLatencies, } ) @@ -224,7 +311,8 @@ func Register() { ProxyRateLimiterRequestCounterObservers.AddObserver(&proxyRateLimiterRequestCounterObserver{}) ProxyGlobalFlowControlAcquireObservers.AddObserver(&proxyGlobalFlowControlAcquireObserver{}) ProxyRequestInflightObservers.AddObserver(&proxyRequestInflightObserver{}) - ProxyRequestThroughputObservers.AddObserver(&proxyRequestThroughputObserver{}) + ProxyRequestTotalDataSizeObservers.AddObserver(&proxyRequestTotalDataSizeObserver{}) + ProxyRequestDataSizeObservers.AddObserver(&proxyRequestDataSizeObserver{}) ProxyHandlingLatencyObservers.AddObserver(&proxyHandlingLatenciesObserver{}) }) } @@ -245,12 +333,23 @@ type proxyRequestCounterObserver struct{} func (o *proxyRequestCounterObserver) Observe(metric MetricInfo) { proxyRequestCounter.WithLabelValues(proxyPid, metric.ServerName, metric.Endpoint, metric.Verb, metric.Resource, metric.HttpCode, metric.FlowControl).Inc() + + if enableRequestMetricByUser { + proxyUserRequestCounter.WithLabelValues(metric.ServerName, metric.Verb, metric.Resource, metric.HttpCode, metric.FlowControl, metric.UserName).Inc() + if !metric.IsLongRunning { + proxyUserRequestLoad.WithLabelValues(metric.ServerName, metric.Verb, metric.Resource, metric.HttpCode, metric.FlowControl, metric.UserName).Add(metric.Latency) + } + } } type proxyRequestLatenciesObserver struct{} func (o *proxyRequestLatenciesObserver) Observe(metric MetricInfo) { proxyRequestLatencies.WithLabelValues(proxyPid, metric.ServerName, metric.Endpoint, metric.Verb, metric.Resource).Observe(metric.Latency) + + if enableRequestMetricByUser { + proxyRequestUserLatencies.WithLabelValues(metric.ServerName, metric.Verb, metric.Resource, metric.FlowControl, metric.UserName, "default").Observe(metric.Latency) + } } type proxyResponseSizesObserver struct{} @@ -263,6 +362,9 @@ type proxyRequestTerminationsObserver struct{} func (o *proxyRequestTerminationsObserver) Observe(metric MetricInfo) { proxyRequestTerminationsTotal.WithLabelValues(proxyPid, metric.ServerName, metric.Verb, metric.HttpCode, metric.Reason, metric.Resource, metric.FlowControl).Inc() + if enableRequestMetricByUser { + proxyRequestUserTerminations.WithLabelValues(metric.ServerName, metric.Verb, metric.HttpCode, metric.Reason, metric.Resource, metric.FlowControl, metric.UserName).Inc() + } } type proxyWatcherRegisteredObserver struct{} @@ -296,11 +398,20 @@ func (o *proxyRequestInflightObserver) Observe(metric MetricInfo) { proxyRequestRate.WithLabelValues(proxyPid).Set(metric.Rate) } -type proxyRequestThroughputObserver struct{} +type proxyRequestTotalDataSizeObserver struct{} + +func (o *proxyRequestTotalDataSizeObserver) Observe(metric MetricInfo) { + proxyRequestTotalDataSize.WithLabelValues(proxyPid, "request").Set(float64(metric.RequestSize)) + proxyRequestTotalDataSize.WithLabelValues(proxyPid, "response").Set(float64(metric.ResponseSize)) +} + +type proxyRequestDataSizeObserver struct{} -func (o *proxyRequestThroughputObserver) Observe(metric MetricInfo) { - proxyRequestThroughput.WithLabelValues(proxyPid, "request").Set(float64(metric.RequestSize)) - proxyRequestThroughput.WithLabelValues(proxyPid, "response").Set(float64(metric.ResponseSize)) +func (o *proxyRequestDataSizeObserver) Observe(metric MetricInfo) { + if enableRequestMetricByUser { + proxyRequestUserDataSize.WithLabelValues("request", metric.ServerName, metric.Verb, metric.Resource, metric.FlowControl, metric.UserName).Set(float64(metric.RequestSize)) + proxyRequestUserDataSize.WithLabelValues("response", metric.ServerName, metric.Verb, metric.Resource, metric.FlowControl, metric.UserName).Set(float64(metric.ResponseSize)) + } } type proxyHandlingLatenciesObserver struct{} @@ -308,4 +419,12 @@ type proxyHandlingLatenciesObserver struct{} func (o *proxyHandlingLatenciesObserver) Observe(metric MetricInfo) { proxyHandlingLatencies.WithLabelValues(proxyPid, metric.ServerName, metric.Verb, metric.Resource, tracing.MetricStageHandlingDelay). Observe(metric.TraceLatencies[tracing.MetricStageHandlingDelay].Seconds()) + + if enableRequestMetricByUser { + overCostLatency := metric.TraceLatencies[tracing.MetricStageClientOverCost] + if overCostLatency > time.Millisecond { + proxyUserClientOverCostLatencies.WithLabelValues(metric.ServerName, metric.Verb, metric.Resource, metric.UserName). + Observe(metric.TraceLatencies[tracing.MetricStageClientOverCost].Seconds()) + } + } } diff --git a/pkg/gateway/proxy/dispatcher/dispatcher.go b/pkg/gateway/proxy/dispatcher/dispatcher.go index cda2413..999ef10 100644 --- a/pkg/gateway/proxy/dispatcher/dispatcher.go +++ b/pkg/gateway/proxy/dispatcher/dispatcher.go @@ -26,7 +26,6 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/endpoints/filters" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/client-go/kubernetes/scheme" "github.com/kubewharf/kubegateway/pkg/clusters" @@ -87,7 +86,7 @@ func (d *dispatcher) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - _ = request.SetFlowControl(req.Context(), endpointPicker.FlowControlName()) + _ = request.SetProxyInfo(req.Context(), endpointPicker.FlowControlName(), user) flowcontrol := endpointPicker.FlowControl() if !flowcontrol.TryAcquire() { @@ -144,15 +143,14 @@ func (d *dispatcher) ServeHTTP(w http.ResponseWriter, req *http.Request) { }() logging := d.enableAccessLog && endpointPicker.EnableLog() - delegate := decorateResponseWriter(req, w, logging, requestInfo, extraInfo.Hostname, endpoint.Endpoint, user, extraInfo.Impersonator, endpointPicker.FlowControlName()) - delegate.MonitorBeforeProxy() - defer delegate.MonitorAfterProxy() + monitor := requestMonitor(req, w, logging, requestInfo, extraInfo, endpoint.Endpoint, user, extraInfo.Impersonator, endpointPicker.FlowControlName()) + monitor.MonitorBeforeProxy() + defer monitor.MonitorAfterProxy() - rw := responsewriter.WrapForHTTP1Or2(delegate) - responder := newErrorResponder(d.codecs, endpoint, requestInfo, delegate) + responder := newErrorResponder(d.codecs, endpoint, requestInfo, extraInfo.ReaderWriter) proxyHandler := NewUpgradeAwareHandler(location, endpoint.ProxyTransport, endpoint.PorxyUpgradeTransport, false, false, responder) - proxyHandler.ServeHTTP(rw, newReq) + proxyHandler.ServeHTTP(w, newReq) } func (d *dispatcher) responseError(err *errors.StatusError, w http.ResponseWriter, req *http.Request, reason string) { diff --git a/pkg/gateway/proxy/dispatcher/proxylog.go b/pkg/gateway/proxy/dispatcher/proxylog.go index ac6882c..a792d38 100644 --- a/pkg/gateway/proxy/dispatcher/proxylog.go +++ b/pkg/gateway/proxy/dispatcher/proxylog.go @@ -15,7 +15,6 @@ package dispatcher import ( - "fmt" "net/http" "strings" "time" @@ -23,7 +22,6 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/klog" "github.com/kubewharf/apiserver-runtime/pkg/server" @@ -32,18 +30,9 @@ import ( "github.com/kubewharf/kubegateway/pkg/util/tracing" ) -var _ http.ResponseWriter = &responseWriterDelegator{} -var _ responsewriter.UserProvidedDecorator = &responseWriterDelegator{} - -// Add a layer on top of ResponseWriter, so we can track latency, statusCode -// and error message sources. -type responseWriterDelegator struct { - statusRecorded bool - status int - addedInfo string - startTime time.Time - captureErrorOutput bool - +// proxy log and metrics +type proxyRequestMonitor struct { + startTime time.Time logging bool host string endpoint string @@ -51,81 +40,46 @@ type responseWriterDelegator struct { user user.Info impersonator user.Info - req *http.Request - requestInfo *request.RequestInfo - w http.ResponseWriter - - written int64 + req *http.Request + requestInfo *request.RequestInfo + extraRequestInfo *gatewayrequest.ExtraRequestInfo + w http.ResponseWriter } -func decorateResponseWriter( +func requestMonitor( req *http.Request, w http.ResponseWriter, logging bool, requestInfo *request.RequestInfo, - host, endpoint string, + extraInfo *gatewayrequest.ExtraRequestInfo, + endpoint string, user, impersonator user.Info, flowControlName string, -) *responseWriterDelegator { - return &responseWriterDelegator{ - startTime: time.Now(), - req: req, - w: w, - logging: logging, - requestInfo: requestInfo, - host: host, - endpoint: endpoint, - flowControlName: flowControlName, - user: user, - impersonator: impersonator, +) *proxyRequestMonitor { + return &proxyRequestMonitor{ + startTime: time.Now(), + req: req, + w: w, + logging: logging, + requestInfo: requestInfo, + extraRequestInfo: extraInfo, + host: extraInfo.Hostname, + endpoint: endpoint, + flowControlName: flowControlName, + user: user, + impersonator: impersonator, } } -func (rw *responseWriterDelegator) Unwrap() http.ResponseWriter { - return rw.w -} - -// Header implements http.ResponseWriter. -func (rw *responseWriterDelegator) Header() http.Header { - return rw.w.Header() -} - -// WriteHeader implements http.ResponseWriter. -func (rw *responseWriterDelegator) WriteHeader(status int) { - rw.recordStatus(status) - rw.w.WriteHeader(status) -} - -// Write implements http.ResponseWriter. -func (rw *responseWriterDelegator) Write(b []byte) (int, error) { - if !rw.statusRecorded { - rw.recordStatus(http.StatusOK) // Default if WriteHeader hasn't been called - } - if rw.captureErrorOutput { - rw.debugf("logging error output: %q\n", string(b)) - } - n, err := rw.w.Write(b) - rw.written += int64(n) - return n, err -} - -func (rw *responseWriterDelegator) Status() int { - return rw.status -} - -func (rw *responseWriterDelegator) ContentLength() int { - return int(rw.written) -} - -func (rw *responseWriterDelegator) Elapsed() time.Duration { +func (rw *proxyRequestMonitor) Elapsed() time.Duration { return time.Since(rw.startTime) } -func (rw *responseWriterDelegator) isWatch() bool { +func (rw *proxyRequestMonitor) isWatch() bool { return rw.requestInfo.IsResourceRequest && rw.requestInfo.Verb == "watch" } -func (rw *responseWriterDelegator) MonitorBeforeProxy() { +func (rw *proxyRequestMonitor) MonitorBeforeProxy() { if rw.isWatch() { metrics.RecordWatcherRegistered(rw.host, rw.endpoint, rw.requestInfo.Resource) //TODO: log watch requests before proxy @@ -133,7 +87,7 @@ func (rw *responseWriterDelegator) MonitorBeforeProxy() { // TODO: add a metrics before request forwarded } -func (rw *responseWriterDelegator) MonitorAfterProxy() { +func (rw *proxyRequestMonitor) MonitorAfterProxy() { if rw.isWatch() { metrics.RecordWatcherUnregistered(rw.host, rw.endpoint, rw.requestInfo.Resource) } @@ -142,6 +96,20 @@ func (rw *responseWriterDelegator) MonitorAfterProxy() { return } + userInfo := rw.user + if rw.impersonator != nil { + userInfo = rw.impersonator + } + + var requestSize int + var responseSize int + var status int + if readerWriter := rw.extraRequestInfo.ReaderWriter; readerWriter != nil { + requestSize = readerWriter.RequestSize() + responseSize = readerWriter.ResponseSize() + status = readerWriter.Status() + } + // we only monitor forwarded proxy reqeust here metrics.MonitorProxyRequest( rw.req, @@ -149,21 +117,32 @@ func (rw *responseWriterDelegator) MonitorAfterProxy() { rw.endpoint, rw.flowControlName, rw.requestInfo, - rw.Header().Get("Content-Type"), - rw.Status(), - rw.ContentLength(), + userInfo, + rw.extraRequestInfo.IsLongRunningRequest, + rw.w.Header().Get("Content-Type"), + status, + requestSize, + responseSize, rw.Elapsed(), ) rw.Log() } // Log is intended to be called once at the end of your request handler, via defer -func (rw *responseWriterDelegator) Log() { +func (rw *proxyRequestMonitor) Log() { latency := rw.Elapsed() logging := rw.logging verb := strings.ToUpper(rw.requestInfo.Verb) isLongRunning := server.DefaultLongRunningFunc(rw.req, rw.requestInfo) - if proxyLogPred(rw.status, verb, latency, isLongRunning) { + + var status int + var addedInfo string + if readerWriter := rw.extraRequestInfo.ReaderWriter; readerWriter != nil { + status = readerWriter.Status() + addedInfo = readerWriter.AddedInfo() + } + + if proxyLogPred(status, verb, latency, isLongRunning) { logging = true } if !logging { @@ -179,7 +158,7 @@ func (rw *responseWriterDelegator) Log() { rw.endpoint, rw.req.RequestURI, latency, - rw.status, + status, rw.user.GetName(), rw.user.GetGroups(), rw.req.UserAgent(), @@ -187,7 +166,7 @@ func (rw *responseWriterDelegator) Log() { rw.impersonator.GetGroups(), sourceIPs, traceId, - rw.addedInfo, + addedInfo, ) } else { klog.Infof("verb=%q host=%q endpoint=%q URI=%q latency=%v resp=%v user=%q userGroup=%v userAgent=%q srcIP=%v traceId=%v: %v", @@ -196,28 +175,17 @@ func (rw *responseWriterDelegator) Log() { rw.endpoint, rw.req.RequestURI, latency, - rw.status, + status, rw.user.GetName(), rw.user.GetGroups(), rw.req.UserAgent(), sourceIPs, traceId, - rw.addedInfo, + addedInfo, ) } } -func (rw *responseWriterDelegator) recordStatus(status int) { - rw.status = status - rw.statusRecorded = true - rw.captureErrorOutput = captureErrorOutput(status) -} - -// debugf adds additional data to be logged with this request. -func (rw *responseWriterDelegator) debugf(format string, data ...interface{}) { - rw.addedInfo += "\n" + fmt.Sprintf(format, data...) -} - func captureErrorOutput(code int) bool { return code >= http.StatusInternalServerError } diff --git a/pkg/util/tracing/trace.go b/pkg/util/tracing/trace.go index c70e233..4b5de71 100644 --- a/pkg/util/tracing/trace.go +++ b/pkg/util/tracing/trace.go @@ -31,11 +31,12 @@ const ( ) const ( - MetricStageClientRead = "client_read" - MetricStageClientWrite = "client_write" - MetricStageUpstreamRead = "upstream_read" - MetricStageUpstreamWrite = "upstream_write" - MetricStageHandlingDelay = "handling_delay" + MetricStageClientRead = "client_read" + MetricStageClientWrite = "client_write" + MetricStageUpstreamRead = "upstream_read" + MetricStageUpstreamWrite = "upstream_write" + MetricStageHandlingDelay = "handling_delay" + MetricStageClientOverCost = "client_over_cost" ) var ( @@ -181,6 +182,8 @@ func (t *RequestTraceInfo) StageLatency() map[string]time.Duration { } } stageLatency[MetricStageHandlingDelay] = t.endTime.Sub(t.startTime) - eliminatedLatency + stageLatency[MetricStageClientOverCost] = stageLatency[MetricStageClientRead] + stageLatency[MetricStageClientWrite] - + stageLatency[MetricStageUpstreamRead] - stageLatency[MetricStageUpstreamWrite] t.stageLatency = stageLatency return stageLatency