|
17 | 17 | #include <rpc/protocol.h> // For HTTP status codes
|
18 | 18 | #include <shutdown.h>
|
19 | 19 | #include <sync.h>
|
| 20 | +#include <util/check.h> |
20 | 21 | #include <util/strencodings.h>
|
21 | 22 | #include <util/threadnames.h>
|
22 | 23 | #include <util/translation.h>
|
|
26 | 27 | #include <cstdlib>
|
27 | 28 | #include <deque>
|
28 | 29 | #include <memory>
|
| 30 | +#include <numeric> |
29 | 31 | #include <optional>
|
30 | 32 | #include <string>
|
31 |
| -#include <unordered_set> |
| 33 | +#include <unordered_map> |
32 | 34 |
|
33 | 35 | #include <sys/types.h>
|
34 | 36 | #include <sys/stat.h>
|
@@ -149,10 +151,68 @@ static GlobalMutex g_httppathhandlers_mutex;
|
149 | 151 | static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_mutex);
|
150 | 152 | //! Bound listening sockets
|
151 | 153 | static std::vector<evhttp_bound_socket *> boundSockets;
|
| 154 | + |
| 155 | +/** |
| 156 | + * @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests` |
| 157 | + * |
| 158 | + */ |
| 159 | +class HTTPRequestTracker |
| 160 | +{ |
| 161 | +private: |
| 162 | + mutable Mutex m_mutex; |
| 163 | + mutable std::condition_variable m_cv; |
| 164 | + //! For each connection, keep a counter of how many requests are open |
| 165 | + std::unordered_map<const evhttp_connection*, size_t> m_tracker GUARDED_BY(m_mutex); |
| 166 | + |
| 167 | + void RemoveConnectionInternal(const decltype(m_tracker)::iterator it) EXCLUSIVE_LOCKS_REQUIRED(m_mutex) |
| 168 | + { |
| 169 | + m_tracker.erase(it); |
| 170 | + if (m_tracker.empty()) m_cv.notify_all(); |
| 171 | + } |
| 172 | +public: |
| 173 | + //! Increase request counter for the associated connection by 1 |
| 174 | + void AddRequest(evhttp_request* req) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 175 | + { |
| 176 | + const evhttp_connection* conn{Assert(evhttp_request_get_connection(Assert(req)))}; |
| 177 | + WITH_LOCK(m_mutex, ++m_tracker[conn]); |
| 178 | + } |
| 179 | + //! Decrease request counter for the associated connection by 1, remove connection if counter is 0 |
| 180 | + void RemoveRequest(evhttp_request* req) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 181 | + { |
| 182 | + const evhttp_connection* conn{Assert(evhttp_request_get_connection(Assert(req)))}; |
| 183 | + LOCK(m_mutex); |
| 184 | + auto it{m_tracker.find(conn)}; |
| 185 | + if (it != m_tracker.end() && it->second > 0) { |
| 186 | + if (--(it->second) == 0) RemoveConnectionInternal(it); |
| 187 | + } |
| 188 | + } |
| 189 | + //! Remove a connection entirely |
| 190 | + void RemoveConnection(const evhttp_connection* conn) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 191 | + { |
| 192 | + LOCK(m_mutex); |
| 193 | + auto it{m_tracker.find(Assert(conn))}; |
| 194 | + if (it != m_tracker.end()) RemoveConnectionInternal(it); |
| 195 | + } |
| 196 | + |
| 197 | + size_t CountActiveRequests() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 198 | + { |
| 199 | + LOCK(m_mutex); |
| 200 | + return std::accumulate(m_tracker.begin(), m_tracker.end(), size_t(0), |
| 201 | + [](size_t acc_count, const auto& pair) { return acc_count + pair.second; }); |
| 202 | + } |
| 203 | + size_t CountActiveConnections() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 204 | + { |
| 205 | + return WITH_LOCK(m_mutex, return m_tracker.size()); |
| 206 | + } |
| 207 | + //! Wait until there are no more connections with active requests in the tracker |
| 208 | + void WaitUntilEmpty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
| 209 | + { |
| 210 | + WAIT_LOCK(m_mutex, lock); |
| 211 | + m_cv.wait(lock, [this]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_tracker.empty(); }); |
| 212 | + } |
| 213 | +}; |
152 | 214 | //! Track active requests
|
153 |
| -static GlobalMutex g_requests_mutex; |
154 |
| -static std::condition_variable g_requests_cv; |
155 |
| -static std::unordered_set<evhttp_request*> g_requests GUARDED_BY(g_requests_mutex); |
| 215 | +static HTTPRequestTracker g_requests; |
156 | 216 |
|
157 | 217 | /** Check if a network address is allowed to access the HTTP server */
|
158 | 218 | static bool ClientAllowed(const CNetAddr& netaddr)
|
@@ -210,14 +270,11 @@ std::string RequestMethodString(HTTPRequest::RequestMethod m)
|
210 | 270 | /** HTTP request callback */
|
211 | 271 | static void http_request_cb(struct evhttp_request* req, void* arg)
|
212 | 272 | {
|
213 |
| - // Track requests and notify when a request is completed. |
| 273 | + // Track active requests |
214 | 274 | {
|
215 |
| - WITH_LOCK(g_requests_mutex, g_requests.insert(req)); |
216 |
| - g_requests_cv.notify_all(); |
| 275 | + g_requests.AddRequest(req); |
217 | 276 | evhttp_request_set_on_complete_cb(req, [](struct evhttp_request* req, void*) {
|
218 |
| - auto n{WITH_LOCK(g_requests_mutex, return g_requests.erase(req))}; |
219 |
| - assert(n == 1); |
220 |
| - g_requests_cv.notify_all(); |
| 277 | + g_requests.RemoveRequest(req); |
221 | 278 | }, nullptr);
|
222 | 279 | }
|
223 | 280 |
|
@@ -473,13 +530,10 @@ void StopHTTPServer()
|
473 | 530 | }
|
474 | 531 | boundSockets.clear();
|
475 | 532 | {
|
476 |
| - WAIT_LOCK(g_requests_mutex, lock); |
477 |
| - if (!g_requests.empty()) { |
478 |
| - LogPrint(BCLog::HTTP, "Waiting for %d requests to stop HTTP server\n", g_requests.size()); |
| 533 | + if (g_requests.CountActiveConnections() != 0) { |
| 534 | + LogPrint(BCLog::HTTP, "Waiting for %d requests to stop HTTP server\n", g_requests.CountActiveRequests()); |
479 | 535 | }
|
480 |
| - g_requests_cv.wait(lock, []() EXCLUSIVE_LOCKS_REQUIRED(g_requests_mutex) { |
481 |
| - return g_requests.empty(); |
482 |
| - }); |
| 536 | + g_requests.WaitUntilEmpty(); |
483 | 537 | }
|
484 | 538 | if (eventHTTP) {
|
485 | 539 | // Schedule a callback to call evhttp_free in the event base thread, so
|
|
0 commit comments