From 39076721636a6ad00acf3f85d82d88ac1fe5705e Mon Sep 17 00:00:00 2001 From: ochafik Date: Fri, 4 Oct 2024 05:08:54 +0100 Subject: [PATCH 1/4] Add {Stream,DataSink}::is_alive --- httplib.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/httplib.h b/httplib.h index 121ce34955..6f4efc7260 100644 --- a/httplib.h +++ b/httplib.h @@ -530,6 +530,7 @@ class DataSink { std::function write; std::function is_writable; + std::function is_alive; std::function done; std::function done_with_trailer; std::ostream os; @@ -722,6 +723,7 @@ class Stream { virtual bool is_readable() const = 0; virtual bool is_writable() const = 0; + virtual bool is_alive() const = 0; virtual ssize_t read(char *ptr, size_t size) = 0; virtual ssize_t write(const char *ptr, size_t size) = 0; @@ -2333,6 +2335,7 @@ class BufferStream final : public Stream { bool is_readable() const override; bool is_writable() const override; + bool is_alive() const override; ssize_t read(char *ptr, size_t size) override; ssize_t write(const char *ptr, size_t size) override; void get_remote_ip_and_port(std::string &ip, int &port) const override; @@ -3205,6 +3208,7 @@ class SocketStream final : public Stream { bool is_readable() const override; bool is_writable() const override; + bool is_alive() const override; ssize_t read(char *ptr, size_t size) override; ssize_t write(const char *ptr, size_t size) override; void get_remote_ip_and_port(std::string &ip, int &port) const override; @@ -3235,6 +3239,7 @@ class SSLSocketStream final : public Stream { bool is_readable() const override; bool is_writable() const override; + bool is_alive() const override; ssize_t read(char *ptr, size_t size) override; ssize_t write(const char *ptr, size_t size) override; void get_remote_ip_and_port(std::string &ip, int &port) const override; @@ -4410,6 +4415,7 @@ inline bool write_content(Stream &strm, const ContentProvider &content_provider, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; + data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; while (offset < end_offset && !is_shutting_down()) { if (!strm.is_writable()) { @@ -4456,6 +4462,7 @@ write_content_without_length(Stream &strm, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; + data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; data_sink.done = [&](void) { data_available = false; }; @@ -4508,6 +4515,7 @@ write_content_chunked(Stream &strm, const ContentProvider &content_provider, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; + data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; auto done_with_trailer = [&](const Headers *trailer) { if (!ok) { return; } @@ -5821,6 +5829,10 @@ inline bool SocketStream::is_writable() const { is_socket_alive(sock_); } +inline bool SocketStream::is_alive() const { + return is_socket_alive(sock_); +} + inline ssize_t SocketStream::read(char *ptr, size_t size) { #ifdef _WIN32 size = @@ -5895,6 +5907,8 @@ inline bool BufferStream::is_readable() const { return true; } inline bool BufferStream::is_writable() const { return true; } +inline bool BufferStream::is_alive() const { return true; } + inline ssize_t BufferStream::read(char *ptr, size_t size) { #if defined(_MSC_VER) && _MSC_VER < 1910 auto len_read = buffer._Copy_s(ptr, size, size, position); @@ -8904,6 +8918,10 @@ inline bool SSLSocketStream::is_writable() const { is_socket_alive(sock_); } +inline bool SSLSocketStream::is_alive() const { + return is_socket_alive(sock_); +} + inline ssize_t SSLSocketStream::read(char *ptr, size_t size) { if (SSL_pending(ssl_) > 0) { return SSL_read(ssl_, ptr, static_cast(size)); From 993bee49e66a3798b9403416f3536e9152927eeb Mon Sep 17 00:00:00 2001 From: ochafik Date: Fri, 4 Oct 2024 19:04:52 +0100 Subject: [PATCH 2/4] add FuzzedStream::is_alive() --- test/fuzzing/server_fuzzer.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/fuzzing/server_fuzzer.cc b/test/fuzzing/server_fuzzer.cc index 3cffbae244..f92415fc68 100644 --- a/test/fuzzing/server_fuzzer.cc +++ b/test/fuzzing/server_fuzzer.cc @@ -27,6 +27,8 @@ class FuzzedStream : public httplib::Stream { bool is_writable() const override { return true; } + bool is_alive() const override { return true; } + void get_remote_ip_and_port(std::string &ip, int &port) const override { ip = "127.0.0.1"; port = 8080; From 7ba067b7897687764a48ae3b96748eb5c4bfbbc7 Mon Sep 17 00:00:00 2001 From: ochafik Date: Fri, 4 Oct 2024 19:14:01 +0100 Subject: [PATCH 3/4] test is_alive alongside is_writable --- test/test.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/test.cc b/test/test.cc index 76c6f60d42..787534361b 100644 --- a/test/test.cc +++ b/test/test.cc @@ -165,9 +165,11 @@ TEST(SocketStream, is_writable_UNIX) { }; asSocketStream(fds[0], [&](Stream &s0) { EXPECT_EQ(s0.socket(), fds[0]); + EXPECT_TRUE(s0.is_alive()); EXPECT_TRUE(s0.is_writable()); EXPECT_EQ(0, close(fds[1])); + EXPECT_FALSE(s0.is_alive()); EXPECT_FALSE(s0.is_writable()); return true; @@ -209,6 +211,7 @@ TEST(SocketStream, is_writable_INET) { }; asSocketStream(disconnected_svr_sock, [&](Stream &ss) { EXPECT_EQ(ss.socket(), disconnected_svr_sock); + EXPECT_FALSE(ss.is_alive()); EXPECT_FALSE(ss.is_writable()); return true; @@ -5456,14 +5459,16 @@ TEST(LongPollingTest, ClientCloseDetection) { svr.Get("/events", [&](const Request & /*req*/, Response &res) { res.set_chunked_content_provider( "text/plain", [](std::size_t const, DataSink &sink) -> bool { - EXPECT_TRUE(sink.is_writable()); // the socket is alive + EXPECT_TRUE(sink.is_alive()); + EXPECT_TRUE(sink.is_writable()); sink.os << "hello"; auto count = 10; - while (count > 0 && sink.is_writable()) { + while (count > 0 && sink.is_writable() && sink.is_alive()) { this_thread::sleep_for(chrono::milliseconds(10)); } - EXPECT_FALSE(sink.is_writable()); // the socket is closed + EXPECT_FALSE(sink.is_alive()); + EXPECT_FALSE(sink.is_writable()); return true; }); }); From 5a257af98daa4b4727cbf9114ad1825d4c941893 Mon Sep 17 00:00:00 2001 From: ochafik Date: Sat, 5 Oct 2024 01:05:38 +0100 Subject: [PATCH 4/4] Move DataSink::is_alive to Response + test cancel use case w/o providers (before headers sent) --- httplib.h | 7 +++---- test/test.cc | 47 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/httplib.h b/httplib.h index 6f4efc7260..01459fed5c 100644 --- a/httplib.h +++ b/httplib.h @@ -530,7 +530,6 @@ class DataSink { std::function write; std::function is_writable; - std::function is_alive; std::function done; std::function done_with_trailer; std::ostream os; @@ -666,6 +665,7 @@ struct Response { Headers headers; std::string body; std::string location; // Redirect location + std::function is_alive; bool has_header(const std::string &key) const; std::string get_header_value(const std::string &key, const char *def = "", @@ -4415,7 +4415,6 @@ inline bool write_content(Stream &strm, const ContentProvider &content_provider, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; - data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; while (offset < end_offset && !is_shutting_down()) { if (!strm.is_writable()) { @@ -4462,7 +4461,6 @@ write_content_without_length(Stream &strm, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; - data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; data_sink.done = [&](void) { data_available = false; }; @@ -4515,7 +4513,6 @@ write_content_chunked(Stream &strm, const ContentProvider &content_provider, }; data_sink.is_writable = [&]() -> bool { return strm.is_writable(); }; - data_sink.is_alive = [&]() -> bool { return strm.is_alive(); }; auto done_with_trailer = [&](const Headers *trailer) { if (!ok) { return; } @@ -4609,6 +4606,7 @@ inline bool redirect(T &cli, Request &req, Response &res, } Response new_res; + new_res.is_alive = res.is_alive; auto ret = cli.send(new_req, new_res, error); if (ret) { @@ -7005,6 +7003,7 @@ Server::process_request(Stream &strm, const std::string &remote_addr, Request req; Response res; + res.is_alive = [&strm]() { return strm.is_alive(); }; res.version = "HTTP/1.1"; res.headers = default_headers_; diff --git a/test/test.cc b/test/test.cc index 787534361b..7e6fb6886c 100644 --- a/test/test.cc +++ b/test/test.cc @@ -165,11 +165,9 @@ TEST(SocketStream, is_writable_UNIX) { }; asSocketStream(fds[0], [&](Stream &s0) { EXPECT_EQ(s0.socket(), fds[0]); - EXPECT_TRUE(s0.is_alive()); EXPECT_TRUE(s0.is_writable()); EXPECT_EQ(0, close(fds[1])); - EXPECT_FALSE(s0.is_alive()); EXPECT_FALSE(s0.is_writable()); return true; @@ -211,7 +209,6 @@ TEST(SocketStream, is_writable_INET) { }; asSocketStream(disconnected_svr_sock, [&](Stream &ss) { EXPECT_EQ(ss.socket(), disconnected_svr_sock); - EXPECT_FALSE(ss.is_alive()); EXPECT_FALSE(ss.is_writable()); return true; @@ -5459,15 +5456,13 @@ TEST(LongPollingTest, ClientCloseDetection) { svr.Get("/events", [&](const Request & /*req*/, Response &res) { res.set_chunked_content_provider( "text/plain", [](std::size_t const, DataSink &sink) -> bool { - EXPECT_TRUE(sink.is_alive()); - EXPECT_TRUE(sink.is_writable()); + EXPECT_TRUE(sink.is_writable()); // the socket is alive sink.os << "hello"; auto count = 10; - while (count > 0 && sink.is_writable() && sink.is_alive()) { + while (count > 0 && sink.is_writable()) { this_thread::sleep_for(chrono::milliseconds(10)); } - EXPECT_FALSE(sink.is_alive()); EXPECT_FALSE(sink.is_writable()); return true; }); @@ -5492,6 +5487,44 @@ TEST(LongPollingTest, ClientCloseDetection) { ASSERT_FALSE(res); } +TEST(LongPollingTest, ClientCloseDetectionOnResponse) { + Server svr; + + bool cancelled = false; + std::thread processing_thread; + svr.Get("/events", [&](const Request & /*req*/, Response &res) { + processing_thread = std::thread([&]() { + EXPECT_TRUE(res.is_alive()); + auto count = 10; + while (count > 0 && res.is_alive()) { + this_thread::sleep_for(chrono::milliseconds(10)); + } + EXPECT_FALSE(res.is_alive()); + cancelled = true; + }); + }); + + auto listen_thread = std::thread([&svr]() { svr.listen("localhost", PORT); }); + auto se = detail::scope_exit([&] { + svr.stop(); + listen_thread.join(); + ASSERT_FALSE(svr.is_running()); + }); + + svr.wait_until_ready(); + + Client cli("localhost", PORT); + + cli.Get("/events", [&](const char *data, size_t data_length) { + EXPECT_EQ("hello", string(data, data_length)); + return false; // close the socket immediately. + }); + + processing_thread.join(); + + ASSERT_TRUE(cancelled); +} + TEST(GetWithParametersTest, GetWithParameters) { Server svr;