Skip to content

Commit 9be7194

Browse files
ashgtilabath
andauthored
[lldb] Improving synchronization of MainLoopWindows. (#147438)
This should improve synchronizing the MainLoopWindows monitor thread with the main loop state. This uses the `m_ready` and `m_event` event handles to manage when the Monitor thread continues and adds new tests to cover additional use cases. I believe this should fix #147291 but it is hard to ensure a race condition is fixed without running the CI on multiple machines/configurations. --------- Co-authored-by: Pavel Labath <pavel@labath.sk>
1 parent d258457 commit 9be7194

File tree

2 files changed

+165
-9
lines changed

2 files changed

+165
-9
lines changed

lldb/source/Host/windows/MainLoopWindows.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
#include "lldb/Host/windows/windows.h"
1313
#include "lldb/Utility/Status.h"
1414
#include "llvm/Config/llvm-config.h"
15-
#include "llvm/Support/Casting.h"
1615
#include "llvm/Support/WindowsError.h"
1716
#include <algorithm>
1817
#include <cassert>
19-
#include <cerrno>
20-
#include <csignal>
2118
#include <ctime>
2219
#include <io.h>
20+
#include <synchapi.h>
2321
#include <thread>
2422
#include <vector>
23+
#include <winbase.h>
24+
#include <winerror.h>
2525
#include <winsock2.h>
2626

2727
using namespace lldb;
@@ -42,11 +42,12 @@ namespace {
4242
class PipeEvent : public MainLoopWindows::IOEvent {
4343
public:
4444
explicit PipeEvent(HANDLE handle)
45-
: IOEvent(CreateEventW(NULL, /*bManualReset=*/FALSE,
45+
: IOEvent(CreateEventW(NULL, /*bManualReset=*/TRUE,
4646
/*bInitialState=*/FALSE, NULL)),
47-
m_handle(handle), m_ready(CreateEventW(NULL, /*bManualReset=*/FALSE,
47+
m_handle(handle), m_ready(CreateEventW(NULL, /*bManualReset=*/TRUE,
4848
/*bInitialState=*/FALSE, NULL)) {
4949
assert(m_event && m_ready);
50+
m_monitor_thread = std::thread(&PipeEvent::Monitor, this);
5051
}
5152

5253
~PipeEvent() override {
@@ -65,15 +66,27 @@ class PipeEvent : public MainLoopWindows::IOEvent {
6566
}
6667

6768
void WillPoll() override {
68-
if (!m_monitor_thread.joinable())
69-
m_monitor_thread = std::thread(&PipeEvent::Monitor, this);
69+
if (WaitForSingleObject(m_event, /*dwMilliseconds=*/0) != WAIT_TIMEOUT) {
70+
// The thread has already signalled that the data is available. No need
71+
// for further polling until we consume that event.
72+
return;
73+
}
74+
if (WaitForSingleObject(m_ready, /*dwMilliseconds=*/0) != WAIT_TIMEOUT) {
75+
// The thread is already waiting for data to become available.
76+
return;
77+
}
78+
// Start waiting.
79+
SetEvent(m_ready);
7080
}
7181

72-
void Disarm() override { SetEvent(m_ready); }
82+
void Disarm() override { ResetEvent(m_event); }
7383

7484
/// Monitors the handle performing a zero byte read to determine when data is
7585
/// avaiable.
7686
void Monitor() {
87+
// Wait until the MainLoop tells us to start.
88+
WaitForSingleObject(m_ready, INFINITE);
89+
7790
do {
7891
char buf[1];
7992
DWORD bytes_read = 0;
@@ -110,7 +123,11 @@ class PipeEvent : public MainLoopWindows::IOEvent {
110123
continue;
111124
}
112125

126+
// Notify that data is available on the pipe. It's important to set this
127+
// before clearing m_ready to avoid a race with WillPoll.
113128
SetEvent(m_event);
129+
// Stop polling until we're told to resume.
130+
ResetEvent(m_ready);
114131

115132
// Wait until the current read is consumed before doing the next read.
116133
WaitForSingleObject(m_ready, INFINITE);

lldb/unittests/Host/MainLoopTest.cpp

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "TestingSupport/SubsystemRAII.h"
1111
#include "lldb/Host/ConnectionFileDescriptor.h"
1212
#include "lldb/Host/FileSystem.h"
13+
#include "lldb/Host/MainLoopBase.h"
1314
#include "lldb/Host/PseudoTerminal.h"
1415
#include "lldb/Host/common/TCPSocket.h"
1516
#include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX
@@ -64,7 +65,7 @@ class MainLoopTest : public testing::Test {
6465
};
6566
} // namespace
6667

67-
TEST_F(MainLoopTest, ReadObject) {
68+
TEST_F(MainLoopTest, ReadSocketObject) {
6869
char X = 'X';
6970
size_t len = sizeof(X);
7071
ASSERT_TRUE(socketpair[0]->Write(&X, len).Success());
@@ -101,6 +102,144 @@ TEST_F(MainLoopTest, ReadPipeObject) {
101102
ASSERT_EQ(1u, callback_count);
102103
}
103104

105+
TEST_F(MainLoopTest, MultipleReadsPipeObject) {
106+
Pipe pipe;
107+
108+
ASSERT_TRUE(pipe.CreateNew().Success());
109+
110+
MainLoop loop;
111+
112+
std::future<void> async_writer = std::async(std::launch::async, [&] {
113+
for (int i = 0; i < 5; ++i) {
114+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
115+
char X = 'X';
116+
size_t len = sizeof(X);
117+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
118+
}
119+
});
120+
121+
Status error;
122+
lldb::FileSP file = std::make_shared<NativeFile>(
123+
pipe.GetReadFileDescriptor(), File::eOpenOptionReadOnly, false);
124+
auto handle = loop.RegisterReadObject(
125+
file,
126+
[&](MainLoopBase &loop) {
127+
callback_count++;
128+
if (callback_count == 5)
129+
loop.RequestTermination();
130+
131+
// Read some data to ensure the handle is not in a readable state.
132+
char buf[1024] = {0};
133+
size_t len = sizeof(buf);
134+
ASSERT_THAT_ERROR(file->Read(buf, len).ToError(), llvm::Succeeded());
135+
EXPECT_EQ(len, 1);
136+
EXPECT_EQ(buf[0], 'X');
137+
},
138+
error);
139+
ASSERT_TRUE(error.Success());
140+
ASSERT_TRUE(handle);
141+
ASSERT_TRUE(loop.Run().Success());
142+
ASSERT_EQ(5u, callback_count);
143+
async_writer.wait();
144+
}
145+
146+
TEST_F(MainLoopTest, PipeDelayBetweenRegisterAndRun) {
147+
Pipe pipe;
148+
149+
ASSERT_TRUE(pipe.CreateNew().Success());
150+
151+
MainLoop loop;
152+
153+
Status error;
154+
lldb::FileSP file = std::make_shared<NativeFile>(
155+
pipe.GetReadFileDescriptor(), File::eOpenOptionReadOnly, false);
156+
auto handle = loop.RegisterReadObject(
157+
file,
158+
[&](MainLoopBase &loop) {
159+
callback_count++;
160+
161+
// Read some data to ensure the handle is not in a readable state.
162+
char buf[1024] = {0};
163+
size_t len = sizeof(buf);
164+
ASSERT_THAT_ERROR(file->Read(buf, len).ToError(), llvm::Succeeded());
165+
EXPECT_EQ(len, 2);
166+
EXPECT_EQ(buf[0], 'X');
167+
EXPECT_EQ(buf[1], 'X');
168+
},
169+
error);
170+
auto cb = [&](MainLoopBase &) {
171+
callback_count++;
172+
char X = 'X';
173+
size_t len = sizeof(X);
174+
// Write twice and ensure we coalesce into a single read.
175+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
176+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
177+
};
178+
// Add a write that triggers a read events.
179+
loop.AddCallback(cb, std::chrono::milliseconds(500));
180+
loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
181+
std::chrono::milliseconds(1000));
182+
ASSERT_TRUE(error.Success());
183+
ASSERT_TRUE(handle);
184+
185+
// Write between RegisterReadObject / Run should NOT invoke the callback.
186+
cb(loop);
187+
ASSERT_EQ(1u, callback_count);
188+
189+
ASSERT_TRUE(loop.Run().Success());
190+
ASSERT_EQ(4u, callback_count);
191+
}
192+
193+
TEST_F(MainLoopTest, NoSelfTriggersDuringPipeHandler) {
194+
Pipe pipe;
195+
196+
ASSERT_TRUE(pipe.CreateNew().Success());
197+
198+
MainLoop loop;
199+
200+
Status error;
201+
lldb::FileSP file = std::make_shared<NativeFile>(
202+
pipe.GetReadFileDescriptor(), File::eOpenOptionReadOnly, false);
203+
auto handle = loop.RegisterReadObject(
204+
file,
205+
[&](MainLoopBase &lop) {
206+
callback_count++;
207+
208+
char X = 'Y';
209+
size_t len = sizeof(X);
210+
// writes / reads during the handler callback should NOT trigger itself.
211+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
212+
213+
char buf[1024] = {0};
214+
len = sizeof(buf);
215+
ASSERT_THAT_ERROR(file->Read(buf, len).ToError(), llvm::Succeeded());
216+
EXPECT_EQ(len, 2);
217+
EXPECT_EQ(buf[0], 'X');
218+
EXPECT_EQ(buf[1], 'Y');
219+
220+
if (callback_count == 2)
221+
loop.RequestTermination();
222+
},
223+
error);
224+
// Add a write that triggers a read event.
225+
loop.AddPendingCallback([&](MainLoopBase &) {
226+
char X = 'X';
227+
size_t len = sizeof(X);
228+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
229+
});
230+
loop.AddCallback(
231+
[&](MainLoopBase &) {
232+
char X = 'X';
233+
size_t len = sizeof(X);
234+
ASSERT_THAT_EXPECTED(pipe.Write(&X, len), llvm::HasValue(1));
235+
},
236+
std::chrono::milliseconds(500));
237+
ASSERT_TRUE(error.Success());
238+
ASSERT_TRUE(handle);
239+
ASSERT_TRUE(loop.Run().Success());
240+
ASSERT_EQ(2u, callback_count);
241+
}
242+
104243
TEST_F(MainLoopTest, NoSpuriousPipeReads) {
105244
Pipe pipe;
106245

0 commit comments

Comments
 (0)