Skip to content

Commit b650ea3

Browse files
authored
Merge pull request #106 from postfinance/cjmn-patch-0314
httptrace and better error logging closes #45
2 parents 1d45495 + 03505e9 commit b650ea3

File tree

4 files changed

+114
-68
lines changed

4 files changed

+114
-68
lines changed

internal/servicecheck/httptrace.go

Lines changed: 91 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,123 @@
11
package servicecheck
22

33
import (
4+
"crypto/tls"
5+
"log"
46
"net/http"
7+
"net/http/httptrace"
8+
"time"
59

610
"github.com/prometheus/client_golang/prometheus"
711
"github.com/prometheus/client_golang/prometheus/promhttp"
812
)
913

10-
func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper {
11-
counter := prometheus.NewCounterVec(
14+
// unique type for context.Context to avoid collisions.
15+
type kubenurseTypeKey struct{}
16+
17+
// // http.RoundTripper
18+
type RoundTripperFunc func(req *http.Request) (*http.Response, error)
19+
20+
func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
21+
return rt(r)
22+
}
23+
24+
// This collects traces and logs errors. As promhttp.InstrumentRoundTripperTrace doesn't process
25+
// errors, this is custom made and inspired by prometheus/client_golang's promhttp
26+
func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper {
27+
httpclientReqTotal := prometheus.NewCounterVec(
1228
prometheus.CounterOpts{
1329
Namespace: metricsNamespace,
1430
Name: "httpclient_requests_total",
1531
Help: "A counter for requests from the kubenurse http client.",
1632
},
33+
// []string{"code", "method", "type"}, // TODO
1734
[]string{"code", "method"},
1835
)
1936

20-
latencyVec := prometheus.NewHistogramVec(
37+
httpclientReqDuration := prometheus.NewHistogramVec(
2138
prometheus.HistogramOpts{
2239
Namespace: metricsNamespace,
23-
Name: "httpclient_trace_request_duration_seconds",
24-
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
25-
Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1},
40+
Name: "httpclient_request_duration_seconds",
41+
Help: "A latency histogram of request latencies from the kubenurse http client.",
42+
Buckets: durationHistogram,
2643
},
27-
[]string{"event"},
44+
// []string{"type"}, // TODO
45+
[]string{},
2846
)
2947

30-
// histVec has no labels, making it a zero-dimensional ObserverVec.
31-
histVec := prometheus.NewHistogramVec(
48+
httpclientTraceReqDuration := prometheus.NewHistogramVec(
3249
prometheus.HistogramOpts{
3350
Namespace: metricsNamespace,
34-
Name: "httpclient_request_duration_seconds",
35-
Help: "A latency histogram of request latencies from the kubenurse http client.",
36-
Buckets: prometheus.DefBuckets,
51+
Name: "httpclient_trace_request_duration_seconds",
52+
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
53+
Buckets: durationHistogram,
3754
},
38-
[]string{},
55+
[]string{"event"},
56+
// []string{"event", "type"}, // TODO
3957
)
4058

41-
// Register all of the metrics in the standard registry.
42-
registry.MustRegister(counter, latencyVec, histVec)
59+
registry.MustRegister(httpclientReqTotal, httpclientReqDuration, httpclientTraceReqDuration)
4360

44-
// Define functions for the available httptrace.ClientTrace hook
45-
// functions that we want to instrument.
46-
trace := &promhttp.InstrumentTrace{
47-
DNSStart: func(t float64) {
48-
latencyVec.WithLabelValues("dns_start").Observe(t)
49-
},
50-
DNSDone: func(t float64) {
51-
latencyVec.WithLabelValues("dns_done").Observe(t)
52-
},
53-
ConnectStart: func(t float64) {
54-
latencyVec.WithLabelValues("connect_start").Observe(t)
55-
},
56-
ConnectDone: func(t float64) {
57-
latencyVec.WithLabelValues("connect_done").Observe(t)
58-
},
59-
TLSHandshakeStart: func(t float64) {
60-
latencyVec.WithLabelValues("tls_handshake_start").Observe(t)
61-
},
62-
TLSHandshakeDone: func(t float64) {
63-
latencyVec.WithLabelValues("tls_handshake_done").Observe(t)
64-
},
65-
WroteRequest: func(t float64) {
66-
latencyVec.WithLabelValues("wrote_request").Observe(t)
67-
},
68-
GotFirstResponseByte: func(t float64) {
69-
latencyVec.WithLabelValues("got_first_resp_byte").Observe(t)
70-
},
61+
collectMetric := func(traceEventType string, start time.Time, r *http.Request, err error) {
62+
td := time.Since(start).Seconds()
63+
kubenurseTypeLabel := r.Context().Value(kubenurseTypeKey{}).(string)
64+
65+
// If we got an error inside a trace, log it and do not collect metrics
66+
if err != nil {
67+
log.Printf("httptrace: failed %s for %s with %v", traceEventType, kubenurseTypeLabel, err)
68+
return
69+
}
70+
71+
httpclientTraceReqDuration.WithLabelValues(traceEventType).Observe(td) // TODO: add back kubenurseTypeKey
7172
}
7273

73-
// Wrap the default RoundTripper with middleware.
74-
roundTripper := promhttp.InstrumentRoundTripperCounter(counter,
75-
promhttp.InstrumentRoundTripperTrace(trace,
76-
promhttp.InstrumentRoundTripperDuration(histVec,
77-
transport,
78-
),
79-
),
80-
)
74+
// Return a http.RoundTripper for tracing requests
75+
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
76+
// Capture request time
77+
start := time.Now()
78+
79+
// Add tracing hooks
80+
trace := &httptrace.ClientTrace{
81+
GotConn: func(info httptrace.GotConnInfo) {
82+
collectMetric("got_conn", start, r, nil)
83+
},
84+
DNSStart: func(info httptrace.DNSStartInfo) {
85+
collectMetric("dns_start", start, r, nil)
86+
},
87+
DNSDone: func(info httptrace.DNSDoneInfo) {
88+
collectMetric("dns_done", start, r, info.Err)
89+
},
90+
ConnectStart: func(_, _ string) {
91+
collectMetric("connect_start", start, r, nil)
92+
},
93+
ConnectDone: func(_, _ string, err error) {
94+
collectMetric("connect_done", start, r, err)
95+
},
96+
TLSHandshakeStart: func() {
97+
collectMetric("tls_handshake_start", start, r, nil)
98+
},
99+
TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
100+
collectMetric("tls_handshake_done", start, r, nil)
101+
},
102+
WroteRequest: func(info httptrace.WroteRequestInfo) {
103+
collectMetric("wrote_request", start, r, info.Err)
104+
},
105+
GotFirstResponseByte: func() {
106+
collectMetric("got_first_resp_byte", start, r, nil)
107+
},
108+
}
109+
110+
// Do request with tracing enabled
111+
r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace))
112+
113+
// // TODO: uncomment when issue #55 is solved (N^2 request will increase cardinality of path_ metrics too much otherwise)
114+
// typeFromCtxFn := promhttp.WithLabelFromCtx("type", func(ctx context.Context) string {
115+
// return ctx.Value(kubenurseTypeKey{}).(string)
116+
// })
81117

82-
return roundTripper
118+
rt := next // variable pinning :) essential, to prevent always re-instrumenting the original variable
119+
rt = promhttp.InstrumentRoundTripperCounter(httpclientReqTotal, rt)
120+
rt = promhttp.InstrumentRoundTripperDuration(httpclientReqDuration, rt)
121+
return rt.RoundTrip(r)
122+
})
83123
}

internal/servicecheck/servicecheck.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome
7575

7676
httpClient := &http.Client{
7777
Timeout: 5 * time.Second,
78-
Transport: withRequestTracing(promRegistry, transport),
78+
Transport: withHttptrace(promRegistry, transport, durationHistogramBuckets),
7979
}
8080

8181
return &Checker{
@@ -163,43 +163,43 @@ func (c *Checker) StopScheduled() {
163163
}
164164

165165
// APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link
166-
func (c *Checker) APIServerDirect() (string, error) {
166+
func (c *Checker) APIServerDirect(ctx context.Context) (string, error) {
167167
if c.SkipCheckAPIServerDirect {
168168
return skippedStr, nil
169169
}
170170

171171
apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort)
172172

173-
return c.doRequest(apiurl)
173+
return c.doRequest(ctx, apiurl)
174174
}
175175

176176
// APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL
177-
func (c *Checker) APIServerDNS() (string, error) {
177+
func (c *Checker) APIServerDNS(ctx context.Context) (string, error) {
178178
if c.SkipCheckAPIServerDNS {
179179
return skippedStr, nil
180180
}
181181

182182
apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort)
183183

184-
return c.doRequest(apiurl)
184+
return c.doRequest(ctx, apiurl)
185185
}
186186

187187
// MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress
188-
func (c *Checker) MeIngress() (string, error) {
188+
func (c *Checker) MeIngress(ctx context.Context) (string, error) {
189189
if c.SkipCheckMeIngress {
190190
return skippedStr, nil
191191
}
192192

193-
return c.doRequest(c.KubenurseIngressURL + "/alwayshappy") //nolint:goconst // readability
193+
return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability
194194
}
195195

196196
// MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service
197-
func (c *Checker) MeService() (string, error) {
197+
func (c *Checker) MeService(ctx context.Context) (string, error) {
198198
if c.SkipCheckMeService {
199199
return skippedStr, nil
200200
}
201201

202-
return c.doRequest(c.KubenurseServiceURL + "/alwayshappy")
202+
return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy")
203203
}
204204

205205
// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes
@@ -210,12 +210,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
210210
if neighbour.Phase == v1.PodRunning && // only query running pods (excludes pending ones)
211211
!neighbour.Terminating && // exclude terminating pods
212212
(c.allowUnschedulable || neighbour.NodeSchedulable == kubediscovery.NodeSchedulable) {
213-
check := func() (string, error) {
213+
check := func(ctx context.Context) (string, error) {
214214
if c.UseTLS {
215-
return c.doRequest("https://" + neighbour.PodIP + ":8443/alwayshappy")
215+
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy")
216216
}
217217

218-
return c.doRequest("http://" + neighbour.PodIP + ":8080/alwayshappy")
218+
return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy")
219219
}
220220

221221
_, _ = c.measure(check, "path_"+neighbour.NodeName)
@@ -227,8 +227,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
227227
func (c *Checker) measure(check Check, label string) (string, error) {
228228
start := time.Now()
229229

230+
// Add our label (check type) to the context so our http tracer can annotate
231+
// metrics and errors based with the label
232+
ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, label)
233+
230234
// Execute check
231-
res, err := check()
235+
res, err := check(ctx)
232236

233237
// Process metrics
234238
c.durationHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())

internal/servicecheck/transport.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package servicecheck
22

33
import (
4+
"context"
45
"crypto/tls"
56
"crypto/x509"
67
"errors"
@@ -17,14 +18,14 @@ const (
1718
)
1819

1920
// doRequest does an http request only to get the http status code
20-
func (c *Checker) doRequest(url string) (string, error) {
21+
func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
2122
// Read Bearer Token file from ServiceAccount
2223
token, err := os.ReadFile(K8sTokenFile)
2324
if err != nil {
2425
return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err)
2526
}
2627

27-
req, _ := http.NewRequest("GET", url, http.NoBody)
28+
req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody)
2829

2930
// Only add the Bearer for API Server Requests
3031
if strings.HasSuffix(url, "/version") {

internal/servicecheck/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package servicecheck
22

33
import (
4+
"context"
45
"net/http"
56
"time"
67

@@ -63,8 +64,8 @@ type Result struct {
6364
Neighbourhood []kubediscovery.Neighbour `json:"neighbourhood"`
6465
}
6566

66-
// Check is the signature used by all checks that the checker can execute
67-
type Check func() (string, error)
67+
// Check is the signature used by all checks that the checker can execute.
68+
type Check func(ctx context.Context) (string, error)
6869

6970
// CachedResult represents a cached check result that is valid until the expiration.
7071
type CachedResult struct {

0 commit comments

Comments
 (0)