diff --git a/lldb/include/lldb/Host/windows/MainLoopWindows.h b/lldb/include/lldb/Host/windows/MainLoopWindows.h index 3937a24645d95..53df815255c3d 100644 --- a/lldb/include/lldb/Host/windows/MainLoopWindows.h +++ b/lldb/include/lldb/Host/windows/MainLoopWindows.h @@ -31,17 +31,30 @@ class MainLoopWindows : public MainLoopBase { Status Run() override; + class IOEvent { + public: + IOEvent(IOObject::WaitableHandle event) : m_event(event) {} + virtual ~IOEvent() {} + virtual void WillPoll() {} + virtual void DidPoll() {} + virtual void Disarm() {} + IOObject::WaitableHandle GetHandle() { return m_event; } + + protected: + IOObject::WaitableHandle m_event; + }; + using IOEventUP = std::unique_ptr; + protected: void UnregisterReadObject(IOObject::WaitableHandle handle) override; void Interrupt() override; private: - void ProcessReadObject(IOObject::WaitableHandle handle); llvm::Expected Poll(); struct FdInfo { - void *event; + IOEventUP event; Callback callback; }; llvm::DenseMap m_read_fds; diff --git a/lldb/include/lldb/Utility/IOObject.h b/lldb/include/lldb/Utility/IOObject.h index 8cf42992e7be5..de6532a637083 100644 --- a/lldb/include/lldb/Utility/IOObject.h +++ b/lldb/include/lldb/Utility/IOObject.h @@ -14,6 +14,7 @@ #include #include "lldb/lldb-private.h" +#include "lldb/lldb-types.h" namespace lldb_private { @@ -24,9 +25,9 @@ class IOObject { eFDTypeSocket, // Socket requiring send/recv }; - // TODO: On Windows this should be a HANDLE, and wait should use - // WaitForMultipleObjects - typedef int WaitableHandle; + // A handle for integrating with the host event loop model. + using WaitableHandle = lldb::file_t; + static const WaitableHandle kInvalidHandleValue; IOObject(FDType type) : m_fd_type(type) {} diff --git a/lldb/source/Host/common/File.cpp b/lldb/source/Host/common/File.cpp index 9aa95ffda44cb..23b6dc9fe850d 100644 --- a/lldb/source/Host/common/File.cpp +++ b/lldb/source/Host/common/File.cpp @@ -274,7 +274,11 @@ int NativeFile::GetDescriptor() const { } IOObject::WaitableHandle NativeFile::GetWaitableHandle() { +#ifdef _WIN32 + return (HANDLE)_get_osfhandle(GetDescriptor()); +#else return GetDescriptor(); +#endif } FILE *NativeFile::GetStream() { diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp index 1a0851d5c4365..bf269ffa45966 100644 --- a/lldb/source/Host/common/JSONTransport.cpp +++ b/lldb/source/Host/common/JSONTransport.cpp @@ -42,7 +42,7 @@ ReadFull(IOObject &descriptor, size_t length, if (timeout && timeout_supported) { SelectHelper sh; sh.SetTimeout(*timeout); - sh.FDSetRead(descriptor.GetWaitableHandle()); + sh.FDSetRead((lldb::socket_t)descriptor.GetWaitableHandle()); Status status = sh.Select(); if (status.Fail()) { // Convert timeouts into a specific error. diff --git a/lldb/source/Host/common/Socket.cpp b/lldb/source/Host/common/Socket.cpp index 802ff9a1b5d1d..3511cde8bb36f 100644 --- a/lldb/source/Host/common/Socket.cpp +++ b/lldb/source/Host/common/Socket.cpp @@ -313,8 +313,7 @@ Socket::DecodeHostAndPort(llvm::StringRef host_and_port) { } IOObject::WaitableHandle Socket::GetWaitableHandle() { - // TODO: On Windows, use WSAEventSelect - return m_socket; + return (IOObject::WaitableHandle)m_socket; } Status Socket::Read(void *buf, size_t &num_bytes) { diff --git a/lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp b/lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp index 16e3f9910eefc..4fda978c092b3 100644 --- a/lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp +++ b/lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp @@ -276,7 +276,7 @@ size_t ConnectionFileDescriptor::Read(void *dst, size_t dst_len, "%p ConnectionFileDescriptor::Read() fd = %" PRIu64 ", dst = %p, dst_len = %" PRIu64 ") => %" PRIu64 ", error = %s", static_cast(this), - static_cast(m_io_sp->GetWaitableHandle()), + static_cast(m_io_sp->GetWaitableHandle()), static_cast(dst), static_cast(dst_len), static_cast(bytes_read), error.AsCString()); } @@ -380,7 +380,7 @@ size_t ConnectionFileDescriptor::Write(const void *src, size_t src_len, "%p ConnectionFileDescriptor::Write(fd = %" PRIu64 ", src = %p, src_len = %" PRIu64 ") => %" PRIu64 " (error = %s)", static_cast(this), - static_cast(m_io_sp->GetWaitableHandle()), + static_cast(m_io_sp->GetWaitableHandle()), static_cast(src), static_cast(src_len), static_cast(bytes_sent), error.AsCString()); } @@ -451,7 +451,8 @@ ConnectionFileDescriptor::BytesAvailable(const Timeout &timeout, if (timeout) select_helper.SetTimeout(*timeout); - select_helper.FDSetRead(handle); + // FIXME: Migrate to MainLoop. + select_helper.FDSetRead((lldb::socket_t)handle); #if defined(_WIN32) // select() won't accept pipes on Windows. The entire Windows codepath // needs to be converted over to using WaitForMultipleObjects and event @@ -493,7 +494,7 @@ ConnectionFileDescriptor::BytesAvailable(const Timeout &timeout, break; // Lets keep reading to until we timeout } } else { - if (select_helper.FDIsSetRead(handle)) + if (select_helper.FDIsSetRead((lldb::socket_t)handle)) return eConnectionStatusSuccess; if (select_helper.FDIsSetRead(pipe_fd)) { diff --git a/lldb/source/Host/windows/MainLoopWindows.cpp b/lldb/source/Host/windows/MainLoopWindows.cpp index f3ab2a710cd01..b4896dced0a48 100644 --- a/lldb/source/Host/windows/MainLoopWindows.cpp +++ b/lldb/source/Host/windows/MainLoopWindows.cpp @@ -8,8 +8,11 @@ #include "lldb/Host/windows/MainLoopWindows.h" #include "lldb/Host/Config.h" +#include "lldb/Host/Socket.h" #include "lldb/Utility/Status.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/Casting.h" +#include "llvm/Support/WindowsError.h" #include #include #include @@ -31,6 +34,122 @@ static DWORD ToTimeout(std::optional point) { return ceil(dur).count(); } +namespace { + +class PipeEvent : public MainLoopWindows::IOEvent { +public: + explicit PipeEvent(HANDLE handle) + : IOEvent((IOObject::WaitableHandle)CreateEventW( + NULL, /*bManualReset=*/FALSE, + /*bInitialState=*/FALSE, NULL)), + m_handle(handle), m_ready(CreateEventW(NULL, /*bManualReset=*/FALSE, + /*bInitialState=*/FALSE, NULL)) { + assert(m_event && m_ready); + } + + ~PipeEvent() override { + if (m_monitor_thread.joinable()) { + m_stopped = true; + SetEvent(m_ready); + // Keep trying to cancel ReadFile() until the thread exits. + do { + CancelIoEx((HANDLE)m_handle, /*lpOverlapped=*/NULL); + } while (WaitForSingleObject(m_monitor_thread.native_handle(), 1) == + WAIT_TIMEOUT); + m_monitor_thread.join(); + } + CloseHandle((HANDLE)m_event); + CloseHandle(m_ready); + } + + void WillPoll() override { + if (!m_monitor_thread.joinable()) + m_monitor_thread = std::thread(&PipeEvent::Monitor, this); + } + + void Disarm() override { SetEvent(m_ready); } + + /// Monitors the handle performing a zero byte read to determine when data is + /// avaiable. + void Monitor() { + do { + char buf[1]; + DWORD bytes_read = 0; + OVERLAPPED ov = {0}; + // Block on a 0-byte read; this will only resume when data is + // available in the pipe. The pipe must be PIPE_WAIT or this thread + // will spin. + BOOL success = + ReadFile(m_handle, buf, /*nNumberOfBytesToRead=*/0, &bytes_read, &ov); + DWORD bytes_available = 0; + DWORD err = GetLastError(); + if (!success && err == ERROR_IO_PENDING) { + success = GetOverlappedResult(m_handle, &ov, &bytes_read, + /*bWait=*/TRUE); + err = GetLastError(); + } + if (success) { + success = + PeekNamedPipe(m_handle, NULL, 0, NULL, &bytes_available, NULL); + err = GetLastError(); + } + if (success) { + if (bytes_available == 0) { + // This can happen with a zero-byte write. Try again. + continue; + } + } else if (err == ERROR_NO_DATA) { + // The pipe is nonblocking. Try again. + Sleep(0); + continue; + } else if (err == ERROR_OPERATION_ABORTED) { + // Read may have been cancelled, try again. + continue; + } + + SetEvent((HANDLE)m_event); + + // Wait until the current read is consumed before doing the next read. + WaitForSingleObject(m_ready, INFINITE); + } while (!m_stopped); + } + +private: + HANDLE m_handle; + HANDLE m_ready; + std::thread m_monitor_thread; + std::atomic m_stopped = false; +}; + +class SocketEvent : public MainLoopWindows::IOEvent { +public: + explicit SocketEvent(SOCKET socket) + : IOEvent((IOObject::WaitableHandle)WSACreateEvent()), m_socket(socket) { + assert(event != WSA_INVALID_EVENT); + } + + ~SocketEvent() override { WSACloseEvent((HANDLE)m_event); } + + void WillPoll() { + int result = WSAEventSelect(m_socket, (HANDLE)m_event, + FD_READ | FD_ACCEPT | FD_CLOSE); + assert(result == 0); + UNUSED_IF_ASSERT_DISABLED(result); + } + + void DidPoll() { + int result = WSAEventSelect(m_socket, WSA_INVALID_EVENT, 0); + assert(result == 0); + UNUSED_IF_ASSERT_DISABLED(result); + } + + void Disarm() override { WSAResetEvent((HANDLE)m_event); } + + SOCKET m_socket; +}; + +} // namespace + MainLoopWindows::MainLoopWindows() { m_interrupt_event = WSACreateEvent(); assert(m_interrupt_event != WSA_INVALID_EVENT); @@ -44,14 +163,11 @@ MainLoopWindows::~MainLoopWindows() { } llvm::Expected MainLoopWindows::Poll() { - std::vector events; + std::vector events; events.reserve(m_read_fds.size() + 1); - for (auto &[fd, info] : m_read_fds) { - int result = WSAEventSelect(fd, info.event, FD_READ | FD_ACCEPT | FD_CLOSE); - assert(result == 0); - UNUSED_IF_ASSERT_DISABLED(result); - - events.push_back(info.event); + for (auto &[_, fd_info] : m_read_fds) { + fd_info.event->WillPoll(); + events.push_back((HANDLE)fd_info.event->GetHandle()); } events.push_back(m_interrupt_event); @@ -59,11 +175,8 @@ llvm::Expected MainLoopWindows::Poll() { WSAWaitForMultipleEvents(events.size(), events.data(), FALSE, ToTimeout(GetNextWakeupTime()), FALSE); - for (auto &fd : m_read_fds) { - int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0); - assert(result == 0); - UNUSED_IF_ASSERT_DISABLED(result); - } + for (auto &[_, fd_info] : m_read_fds) + fd_info.event->DidPoll(); if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size()) return result - WSA_WAIT_EVENT_0; @@ -83,28 +196,25 @@ MainLoopWindows::RegisterReadObject(const IOObjectSP &object_sp, error = Status::FromErrorString("IO object is not valid."); return nullptr; } - if (object_sp->GetFdType() != IOObject::eFDTypeSocket) { - error = Status::FromErrorString( - "MainLoopWindows: non-socket types unsupported on Windows"); - return nullptr; - } - WSAEVENT event = WSACreateEvent(); - if (event == WSA_INVALID_EVENT) { - error = - Status::FromErrorStringWithFormat("Cannot create monitoring event."); + IOObject::WaitableHandle waitable_handle = object_sp->GetWaitableHandle(); + assert(waitable_handle != IOObject::kInvalidHandleValue); + + if (m_read_fds.find(waitable_handle) != m_read_fds.end()) { + error = Status::FromErrorStringWithFormat( + "File descriptor %d already monitored.", waitable_handle); return nullptr; } - const bool inserted = - m_read_fds - .try_emplace(object_sp->GetWaitableHandle(), FdInfo{event, callback}) - .second; - if (!inserted) { - WSACloseEvent(event); - error = Status::FromErrorStringWithFormat( - "File descriptor %d already monitored.", - object_sp->GetWaitableHandle()); + if (object_sp->GetFdType() == IOObject::eFDTypeSocket) + m_read_fds[waitable_handle] = { + std::make_unique((SOCKET)waitable_handle), callback}; + else if (GetFileType(waitable_handle) == FILE_TYPE_PIPE) + m_read_fds[waitable_handle] = { + std::make_unique((HANDLE)waitable_handle), callback}; + else { + error = Status::FromErrorStringWithFormat("Unsupported file type %d", + GetFileType(waitable_handle)); return nullptr; } @@ -114,18 +224,9 @@ MainLoopWindows::RegisterReadObject(const IOObjectSP &object_sp, void MainLoopWindows::UnregisterReadObject(IOObject::WaitableHandle handle) { auto it = m_read_fds.find(handle); assert(it != m_read_fds.end()); - BOOL result = WSACloseEvent(it->second.event); - assert(result == TRUE); - UNUSED_IF_ASSERT_DISABLED(result); m_read_fds.erase(it); } -void MainLoopWindows::ProcessReadObject(IOObject::WaitableHandle handle) { - auto it = m_read_fds.find(handle); - if (it != m_read_fds.end()) - it->second.callback(*this); // Do the work -} - Status MainLoopWindows::Run() { m_terminate_request = false; @@ -138,8 +239,8 @@ Status MainLoopWindows::Run() { if (*signaled_event < m_read_fds.size()) { auto &KV = *std::next(m_read_fds.begin(), *signaled_event); - WSAResetEvent(KV.second.event); - ProcessReadObject(KV.first); + KV.second.event->Disarm(); + KV.second.callback(*this); // Do the work. } else { assert(*signaled_event == m_read_fds.size()); WSAResetEvent(m_interrupt_event); diff --git a/lldb/source/Utility/IOObject.cpp b/lldb/source/Utility/IOObject.cpp index 964edce0ce10d..c0c07cc0b68e3 100644 --- a/lldb/source/Utility/IOObject.cpp +++ b/lldb/source/Utility/IOObject.cpp @@ -8,7 +8,16 @@ #include "lldb/Utility/IOObject.h" +#ifdef _WIN32 +#include "lldb/Host/windows/windows.h" +#endif + using namespace lldb_private; +#ifdef _WIN32 +const IOObject::WaitableHandle IOObject::kInvalidHandleValue = + INVALID_HANDLE_VALUE; +#else const IOObject::WaitableHandle IOObject::kInvalidHandleValue = -1; +#endif IOObject::~IOObject() = default; diff --git a/lldb/unittests/Host/FileTest.cpp b/lldb/unittests/Host/FileTest.cpp index 35c87bb200fad..d973d19430596 100644 --- a/lldb/unittests/Host/FileTest.cpp +++ b/lldb/unittests/Host/FileTest.cpp @@ -14,6 +14,10 @@ #include "llvm/Support/Program.h" #include "gtest/gtest.h" +#ifdef _WIN32 +#include "lldb/Host/windows/windows.h" +#endif + using namespace lldb; using namespace lldb_private; @@ -32,7 +36,11 @@ TEST(File, GetWaitableHandleFileno) { ASSERT_TRUE(stream); NativeFile file(stream, true); - EXPECT_EQ(file.GetWaitableHandle(), fd); +#ifdef _WIN32 + EXPECT_EQ(file.GetWaitableHandle(), (HANDLE)_get_osfhandle(fd)); +#else + EXPECT_EQ(file.GetWaitableHandle(), (file_t)fd); +#endif } TEST(File, GetStreamFromDescriptor) { @@ -53,5 +61,9 @@ TEST(File, GetStreamFromDescriptor) { ASSERT_TRUE(stream != NULL); EXPECT_EQ(file.GetDescriptor(), fd); - EXPECT_EQ(file.GetWaitableHandle(), fd); +#ifdef _WIN32 + EXPECT_EQ(file.GetWaitableHandle(), (HANDLE)_get_osfhandle(fd)); +#else + EXPECT_EQ(file.GetWaitableHandle(), (file_t)fd); +#endif } diff --git a/lldb/unittests/Host/MainLoopTest.cpp b/lldb/unittests/Host/MainLoopTest.cpp index e18489978e90c..502028ae1a343 100644 --- a/lldb/unittests/Host/MainLoopTest.cpp +++ b/lldb/unittests/Host/MainLoopTest.cpp @@ -79,7 +79,69 @@ TEST_F(MainLoopTest, ReadObject) { ASSERT_EQ(1u, callback_count); } -TEST_F(MainLoopTest, NoSpuriousReads) { +TEST_F(MainLoopTest, ReadPipeObject) { + Pipe pipe; + + ASSERT_TRUE(pipe.CreateNew().Success()); + + MainLoop loop; + + char X = 'X'; + size_t len = sizeof(X); + ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1)); + + Status error; + auto handle = loop.RegisterReadObject( + std::make_shared(pipe.GetReadFileDescriptor(), + File::eOpenOptionReadOnly, false), + make_callback(), error); + ASSERT_TRUE(error.Success()); + ASSERT_TRUE(handle); + ASSERT_TRUE(loop.Run().Success()); + ASSERT_EQ(1u, callback_count); +} + +TEST_F(MainLoopTest, NoSpuriousPipeReads) { + Pipe pipe; + + ASSERT_TRUE(pipe.CreateNew().Success()); + + char X = 'X'; + size_t len = sizeof(X); + ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::Succeeded()); + + lldb::IOObjectSP r = std::make_shared( + pipe.GetReadFileDescriptor(), File::eOpenOptionReadOnly, false); + + MainLoop loop; + + Status error; + auto handle = loop.RegisterReadObject( + r, + [&](MainLoopBase &) { + if (callback_count == 0) { + // Read the byte back the first time we're called. After that, the + // pipe is empty, and we should not be called anymore. + char X; + size_t len = sizeof(X); + ASSERT_THAT_ERROR(r->Read(&X, len).ToError(), llvm::Succeeded()); + EXPECT_EQ(len, sizeof(X)); + EXPECT_EQ(X, 'X'); + } + ++callback_count; + }, + error); + ASSERT_THAT_ERROR(error.ToError(), llvm::Succeeded()); + // Terminate the loop after one second. + loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); }, + std::chrono::seconds(1)); + ASSERT_THAT_ERROR(loop.Run().ToError(), llvm::Succeeded()); + + // Make sure the callback was called only once. + ASSERT_EQ(1u, callback_count); +} + +TEST_F(MainLoopTest, NoSpuriousSocketReads) { // Write one byte into the socket. char X = 'X'; size_t len = sizeof(X); @@ -99,6 +161,7 @@ TEST_F(MainLoopTest, NoSpuriousReads) { EXPECT_THAT_ERROR(socketpair[1]->Read(&X, len).ToError(), llvm::Succeeded()); EXPECT_EQ(len, sizeof(X)); + EXPECT_EQ(X, 'X'); } ++callback_count; }, @@ -164,9 +227,8 @@ TEST_F(MainLoopTest, PendingCallbackCalledOnlyOnce) { [&](MainLoopBase &loop) { // Add one pending callback on the first iteration. if (callback_count == 0) { - loop.AddPendingCallback([&](MainLoopBase &loop) { - callback_count++; - }); + loop.AddPendingCallback( + [&](MainLoopBase &loop) { callback_count++; }); } // Terminate the loop on second iteration. if (callback_count++ >= 1) @@ -321,7 +383,7 @@ TEST_F(MainLoopTest, UnmonitoredSignal) { MainLoop loop; Status error; struct sigaction sa; - sa.sa_sigaction = [](int, siginfo_t *, void *) { }; + sa.sa_sigaction = [](int, siginfo_t *, void *) {}; sa.sa_flags = SA_SIGINFO; // important: no SA_RESTART sigemptyset(&sa.sa_mask); ASSERT_EQ(0, sigaction(SIGUSR2, &sa, nullptr));