Skip to content

Commit fb40a9c

Browse files
authored
Merge pull request #303 from justinsb/direct_transport
mockkubeapiserver: Support direct mode without listening on a socket
2 parents 6f5fcd9 + fd5342c commit fb40a9c

File tree

1 file changed

+168
-0
lines changed

1 file changed

+168
-0
lines changed

mockkubeapiserver/direct_http.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package mockkubeapiserver
18+
19+
import (
20+
"bytes"
21+
"io"
22+
"net/http"
23+
"sync"
24+
)
25+
26+
// DirectTransport returns an http.RoundTripper that can serve requests without needing
27+
// a listening socket.
28+
func (s *MockKubeAPIServer) DirectTransport() http.RoundTripper {
29+
return &directHTTPRoundTripper{server: s}
30+
}
31+
32+
type directHTTPRoundTripper struct {
33+
server *MockKubeAPIServer
34+
}
35+
36+
var _ http.RoundTripper = &directHTTPRoundTripper{}
37+
38+
// RoundTrip implements http.RoundTripper
39+
func (t *directHTTPRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
40+
w := newDirectResponseWriter()
41+
42+
go func() {
43+
t.server.ServeHTTP(w, r)
44+
w.close()
45+
}()
46+
47+
// response blocks until we have the headers, at least.
48+
return w.response()
49+
}
50+
51+
// directResponseWriter implements http.ResponseWriter.
52+
type directResponseWriter struct {
53+
statusCode int
54+
header http.Header
55+
56+
// mutex guards all the below values
57+
mutex sync.Mutex
58+
// cond is signalled when one of the below values changes
59+
cond *sync.Cond
60+
// wroteHeader is true if we know the status code & headers
61+
wroteHeader bool
62+
// done is true if we have finished writing the response (the handler has returned)
63+
done bool
64+
// body is a buffer / pipe of our response body.
65+
// We can't use io.Pipe because it is blocking.
66+
body bytes.Buffer
67+
}
68+
69+
func newDirectResponseWriter() *directResponseWriter {
70+
w := &directResponseWriter{}
71+
w.cond = sync.NewCond(&w.mutex)
72+
w.header = make(http.Header)
73+
return w
74+
}
75+
76+
var _ http.ResponseWriter = &directResponseWriter{}
77+
78+
// response constructs an http.Response, blocking until we have at least the headers.
79+
func (w *directResponseWriter) response() (*http.Response, error) {
80+
w.mutex.Lock()
81+
for !w.wroteHeader {
82+
w.cond.Wait()
83+
}
84+
85+
response := &http.Response{}
86+
response.Body = &responseReader{w: w}
87+
response.Header = w.header
88+
response.StatusCode = w.statusCode
89+
if response.StatusCode == 0 {
90+
response.StatusCode = 200
91+
}
92+
93+
w.mutex.Unlock()
94+
95+
return response, nil
96+
}
97+
98+
type responseReader struct {
99+
w *directResponseWriter
100+
}
101+
102+
var _ io.ReadCloser = &responseReader{}
103+
104+
// Close implements io.ReadCloser
105+
func (r *responseReader) Close() error {
106+
// TODO: we could try to signal to the http server that the connection is closed
107+
return nil
108+
}
109+
110+
// Read implements io.ReadCloser
111+
func (r *responseReader) Read(data []byte) (int, error) {
112+
w := r.w
113+
114+
w.mutex.Lock()
115+
for {
116+
n, err := w.body.Read(data)
117+
if n == 0 {
118+
if err == io.EOF {
119+
if w.done {
120+
w.mutex.Unlock()
121+
return 0, io.EOF
122+
}
123+
w.cond.Wait()
124+
continue
125+
}
126+
}
127+
128+
w.mutex.Unlock()
129+
return n, err
130+
}
131+
}
132+
133+
func (w *directResponseWriter) close() {
134+
w.mutex.Lock()
135+
if !w.wroteHeader {
136+
w.wroteHeader = true
137+
}
138+
w.done = true
139+
w.cond.Broadcast()
140+
w.mutex.Unlock()
141+
}
142+
143+
// Header implements http.ResponseWriter
144+
func (w *directResponseWriter) Header() http.Header {
145+
return w.header
146+
}
147+
148+
// Write implements http.ResponseWriter
149+
func (w *directResponseWriter) Write(b []byte) (int, error) {
150+
w.mutex.Lock()
151+
if !w.wroteHeader {
152+
w.wroteHeader = true
153+
}
154+
n, err := w.body.Write(b)
155+
w.cond.Broadcast()
156+
w.mutex.Unlock()
157+
158+
return n, err
159+
}
160+
161+
// WriteHeader implements http.ResponseWriter
162+
func (w *directResponseWriter) WriteHeader(statusCode int) {
163+
w.mutex.Lock()
164+
w.statusCode = statusCode
165+
w.wroteHeader = true
166+
w.cond.Broadcast()
167+
w.mutex.Unlock()
168+
}

0 commit comments

Comments
 (0)