Skip to content

Commit a11e690

Browse files
committed
Merge #186: Fix mptest failures in bitcoin CI
6f340a5 doc: fix DrahtBot LLM Linter error (Ryan Ofsky) c6f7fdf type-context: revert client disconnect workaround (Ryan Ofsky) e09143d proxy-types: fix UndefinedBehaviorSanitizer: null-pointer-use (Ryan Ofsky) 84b292f mptest: fix MemorySanitizer: use-of-uninitialized-value (Ryan Ofsky) fe4a188 proxy-io: fix race conditions in disconnect callback code (Ryan Ofsky) d8011c8 proxy-io: fix race conditions in ProxyClientBase cleanup handler (Ryan Ofsky) 97e82ce doc: Add note about Waiter::m_mutex and interaction with the EventLoop::m_mutex (Ryan Ofsky) 81d58f5 refactor: Rename ProxyClient cleanup_it variable (Ryan Ofsky) 07230f2 refactor: rename ProxyClient<Thread>::m_cleanup_it (Ryan Ofsky) 0d986ff mptest: fix race condition in TestSetup constructor (Ryan Ofsky) d2f6aa2 ci: add thread sanitizer job (Ryan Ofsky) Pull request description: Recently merged PR #160 expanded unit tests to cover various unclean disconnection scenarios, but the new unit tests cause failures in bitcoin CI, despite passing in local CI (which doesn't test as many sanitizers and platforms). Some of the errors are just test bugs, but others are real library bugs and race conditions. The bugs were reported in two threads starting Sjors/bitcoin#90 (comment) and bitcoin/bitcoin#32345 (comment), and they are described in detail in individual commit messages in this PR. The changes here fix all the known bugs and add new CI jobs and tests to detect them and catch regressions. ACKs for top commit: Sjors: re-ACK 6f340a5 Tree-SHA512: 20aa1992080a0329739d663edb636f218e88d521b17cd66c328051629c8efea802c0ac52a44d51cd58cfe60cc6beb6cdd4a2afa00a0ce36801724540f9e43d42
2 parents c0efaa5 + 6f340a5 commit a11e690

File tree

9 files changed

+89
-68
lines changed

9 files changed

+89
-68
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
strategy:
1212
fail-fast: false
1313
matrix:
14-
config: [default, llvm, gnu32]
14+
config: [default, llvm, gnu32, sanitize]
1515

1616
name: build • ${{ matrix.config }}
1717

ci/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ All CI is just bash and nix.
1616
To run jobs locally:
1717

1818
```bash
19-
CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh
20-
CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh
21-
CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh
19+
CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh
20+
CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh
21+
CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh
22+
CI_CONFIG=ci/configs/sanitize.bash ci/scripts/run.sh
2223
```
2324

2425
By default CI jobs will reuse their build directories. `CI_CLEAN=1` can be specified to delete them before running instead.

ci/configs/sanitize.bash

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CI_DESC="CI job running ThreadSanitizer"
2+
CI_DIR=build-sanitize
3+
export CXX=clang++
4+
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
5+
CMAKE_ARGS=()
6+
BUILD_ARGS=(-k -j4)
7+
BUILD_TARGETS=(mptest)

ci/scripts/ci.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ set -o errexit -o nounset -o pipefail -o xtrace
1111
[ "${CI_CONFIG+x}" ] && source "$CI_CONFIG"
1212

1313
: "${CI_DIR:=build}"
14+
if ! [ -v BUILD_TARGETS ]; then
15+
BUILD_TARGETS=(all tests mpexamples)
16+
fi
1417

1518
[ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}"
1619

1720
cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
18-
cmake --build "$CI_DIR" -t all tests mpexamples -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
21+
cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
1922
ctest --test-dir "$CI_DIR" --output-on-failure

include/mp/proxy-io.h

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,18 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6666
ProxyClient(const ProxyClient&) = delete;
6767
~ProxyClient();
6868

69-
void setCleanup(const std::function<void()>& fn);
70-
71-
//! Cleanup function to run when the connection is closed. If the Connection
72-
//! gets destroyed before this ProxyClient<Thread> object, this cleanup
73-
//! callback lets it destroy this object and remove its entry in the
74-
//! thread's request_threads or callback_threads map (after resetting
75-
//! m_cleanup_it so the destructor does not try to access it). But if this
76-
//! object gets destroyed before the Connection, there's no need to run the
77-
//! cleanup function and the destructor will unregister it.
78-
std::optional<CleanupIt> m_cleanup_it;
69+
void setDisconnectCallback(const std::function<void()>& fn);
70+
71+
//! Reference to callback function that is run if there is a sudden
72+
//! disconnect and the Connection object is destroyed before this
73+
//! ProxyClient<Thread> object. The callback will destroy this object and
74+
//! remove its entry from the thread's request_threads or callback_threads
75+
//! map. It will also reset m_disconnect_cb so the destructor does not
76+
//! access it. In the normal case where there is no sudden disconnect, the
77+
//! destructor will unregister m_disconnect_cb so the callback is never run.
78+
//! Since this variable is accessed from multiple threads, accesses should
79+
//! be guarded with the associated Waiter::m_mutex.
80+
std::optional<CleanupIt> m_disconnect_cb;
7981
};
8082

8183
template <>
@@ -298,6 +300,13 @@ struct Waiter
298300
});
299301
}
300302

303+
//! Mutex mainly used internally by waiter class, but also used externally
304+
//! to guard access to related state. Specifically, since the thread_local
305+
//! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
306+
//! access to other parts of the struct to avoid needing to deal with more
307+
//! mutexes than necessary. This mutex can be held at the same time as
308+
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
309+
//! EventLoop::m_mutex is locked second.
301310
std::mutex m_mutex;
302311
std::condition_variable m_cv;
303312
std::optional<kj::Function<void()>> m_fn;
@@ -393,11 +402,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
393402

394403
{
395404
// Handler for the connection getting destroyed before this client object.
396-
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
405+
auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
397406
// Release client capability by move-assigning to temporary.
398407
{
399408
typename Interface::Client(std::move(m_client));
400409
}
410+
Lock lock{m_context.loop->m_mutex};
401411
m_context.connection = nullptr;
402412
});
403413

@@ -410,14 +420,10 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
410420
// down while external code is still holding client references.
411421
//
412422
// The first case is handled here when m_context.connection is not null. The
413-
// second case is handled by the cleanup function, which sets m_context.connection to
414-
// null so nothing happens here.
415-
m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
416-
if (m_context.connection) {
417-
// Remove cleanup callback so it doesn't run and try to access
418-
// this object after it's already destroyed.
419-
m_context.connection->removeSyncCleanup(cleanup_it);
420-
423+
// second case is handled by the disconnect_cb function, which sets
424+
// m_context.connection to null so nothing happens here.
425+
m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
426+
{
421427
// If the capnp interface defines a destroy method, call it to destroy
422428
// the remote object, waiting for it to be deleted server side. If the
423429
// capnp interface does not define a destroy method, this will just call
@@ -426,6 +432,14 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
426432

427433
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
428434
m_context.loop->sync([&]() {
435+
// Remove disconnect callback on cleanup so it doesn't run and try
436+
// to access this object after it's destroyed. This call needs to
437+
// run inside loop->sync() on the event loop thread because
438+
// otherwise, if there were an ill-timed disconnect, the
439+
// onDisconnect handler could fire and delete the Connection object
440+
// before the removeSyncCleanup call.
441+
if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
442+
429443
// Release client capability by move-assigning to temporary.
430444
{
431445
typename Interface::Client(std::move(m_client));

include/mp/proxy-types.h

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -609,59 +609,61 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
609609
<< "{" << g_thread_context.thread_name
610610
<< "} IPC client first request from current thread, constructing waiter";
611611
}
612-
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
612+
ThreadContext& thread_context{g_thread_context};
613+
std::optional<ClientInvokeContext> invoke_context; // Must outlive waiter->wait() call below
613614
std::exception_ptr exception;
614615
std::string kj_exception;
615616
bool done = false;
616617
const char* disconnected = nullptr;
617618
proxy_client.m_context.loop->sync([&]() {
618619
if (!proxy_client.m_context.connection) {
619-
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
620+
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620621
done = true;
621622
disconnected = "IPC client method called after disconnect.";
622-
invoke_context.thread_context.waiter->m_cv.notify_all();
623+
thread_context.waiter->m_cv.notify_all();
623624
return;
624625
}
625626

626627
auto request = (proxy_client.m_client.*get_request)(nullptr);
627628
using Request = CapRequestTraits<decltype(request)>;
628629
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
629-
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
630+
invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
631+
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
630632
proxy_client.m_context.loop->logPlain()
631-
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
633+
<< "{" << thread_context.thread_name << "} IPC client send "
632634
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
633635

634636
proxy_client.m_context.loop->m_task_set->add(request.send().then(
635637
[&](::capnp::Response<typename Request::Results>&& response) {
636638
proxy_client.m_context.loop->logPlain()
637-
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
639+
<< "{" << thread_context.thread_name << "} IPC client recv "
638640
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
639641
try {
640642
IterateFields().handleChain(
641-
invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
643+
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
642644
} catch (...) {
643645
exception = std::current_exception();
644646
}
645-
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
647+
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
646648
done = true;
647-
invoke_context.thread_context.waiter->m_cv.notify_all();
649+
thread_context.waiter->m_cv.notify_all();
648650
},
649651
[&](const ::kj::Exception& e) {
650652
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
651653
disconnected = "IPC client method call interrupted by disconnect.";
652654
} else {
653655
kj_exception = kj::str("kj::Exception: ", e).cStr();
654656
proxy_client.m_context.loop->logPlain()
655-
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
657+
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
656658
}
657-
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
659+
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
658660
done = true;
659-
invoke_context.thread_context.waiter->m_cv.notify_all();
661+
thread_context.waiter->m_cv.notify_all();
660662
}));
661663
});
662664

663-
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
664-
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
665+
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
666+
thread_context.waiter->wait(lock, [&done]() { return done; });
665667
if (exception) std::rethrow_exception(exception);
666668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
667669
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;

include/mp/type-context.h

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6969
const auto& params = call_context.getParams();
7070
Context::Reader context_arg = Accessor::get(params);
7171
ServerContext server_context{server, call_context, req};
72-
bool disconnected{false};
7372
{
7473
// Before invoking the function, store a reference to the
7574
// callbackThread provided by the client in the
@@ -101,7 +100,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
101100
// recursive call (IPC call calling back to the caller which
102101
// makes another IPC call), so avoid modifying the map.
103102
const bool erase_thread{inserted};
104-
KJ_DEFER({
103+
KJ_DEFER(if (erase_thread) {
105104
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
106105
// Call erase here with a Connection* argument instead
107106
// of an iterator argument, because the `request_thread`
@@ -112,24 +111,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
112111
// erases the thread from the map, and also because the
113112
// ProxyServer<Thread> destructor calls
114113
// request_threads.clear().
115-
if (erase_thread) {
116-
disconnected = !request_threads.erase(server.m_context.connection);
117-
} else {
118-
disconnected = !request_threads.count(server.m_context.connection);
119-
}
114+
request_threads.erase(server.m_context.connection);
120115
});
121116
fn.invoke(server_context, args...);
122117
}
123-
if (disconnected) {
124-
// If disconnected is true, the Connection object was
125-
// destroyed during the method call. Deal with this by
126-
// returning without ever fulfilling the promise, which will
127-
// cause the ProxyServer object to leak. This is not ideal,
128-
// but fixing the leak will require nontrivial code changes
129-
// because there is a lot of code assuming ProxyServer
130-
// objects are destroyed before Connection objects.
131-
return;
132-
}
133118
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
134119
server.m_context.loop->sync([&] {
135120
auto fulfiller_dispose = kj::mv(fulfiller);

src/mp/proxy.cpp

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,11 @@ Connection::~Connection()
132132
// on clean and unclean shutdowns. In unclean shutdown case when the
133133
// connection is broken, sync and async cleanup lists will filled with
134134
// callbacks. In the clean shutdown case both lists will be empty.
135+
Lock lock{m_loop->m_mutex};
135136
while (!m_sync_cleanup_fns.empty()) {
136-
m_sync_cleanup_fns.front()();
137-
m_sync_cleanup_fns.pop_front();
137+
CleanupList fn;
138+
fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
139+
Unlock(lock, fn.front());
138140
}
139141
}
140142

@@ -311,18 +313,18 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
311313
thread = threads.emplace(
312314
std::piecewise_construct, std::forward_as_tuple(connection),
313315
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
314-
thread->second.setCleanup([&threads, &mutex, thread] {
316+
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
315317
// Note: it is safe to use the `thread` iterator in this cleanup
316318
// function, because the iterator would only be invalid if the map entry
317319
// was removed, and if the map entry is removed the ProxyClient<Thread>
318320
// destructor unregisters the cleanup.
319321

320322
// Connection is being destroyed before thread client is, so reset
321-
// thread client m_cleanup_it member so thread client destructor does not
322-
// try unregister this callback after connection is destroyed.
323-
thread->second.m_cleanup_it.reset();
323+
// thread client m_disconnect_cb member so thread client destructor does not
324+
// try to unregister this callback after connection is destroyed.
324325
// Remove connection pointer about to be destroyed from the map
325326
const std::unique_lock<std::mutex> lock(mutex);
327+
thread->second.m_disconnect_cb.reset();
326328
threads.erase(thread);
327329
});
328330
return {thread, true};
@@ -333,16 +335,16 @@ ProxyClient<Thread>::~ProxyClient()
333335
// If thread is being destroyed before connection is destroyed, remove the
334336
// cleanup callback that was registered to handle the connection being
335337
// destroyed before the thread being destroyed.
336-
if (m_cleanup_it) {
337-
m_context.connection->removeSyncCleanup(*m_cleanup_it);
338+
if (m_disconnect_cb) {
339+
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
338340
}
339341
}
340342

341-
void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
343+
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
342344
{
343345
assert(fn);
344-
assert(!m_cleanup_it);
345-
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
346+
assert(!m_disconnect_cb);
347+
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
346348
}
347349

348350
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)

test/mp/test/test.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ namespace test {
4949
class TestSetup
5050
{
5151
public:
52-
std::thread thread;
5352
std::function<void()> server_disconnect;
5453
std::function<void()> client_disconnect;
5554
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
5655
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
5756
ProxyServer<messages::FooInterface>* server{nullptr};
57+
//! Thread variable should be after other struct members so the thread does
58+
//! not start until the other members are initialized.
59+
std::thread thread;
5860

5961
TestSetup(bool client_owns_connection = true)
6062
: thread{[&] {
@@ -266,12 +268,12 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
266268
// ProxyServer objects associated with the connection. Having an in-progress
267269
// RPC call requires keeping the ProxyServer longer.
268270

271+
std::promise<void> signal;
269272
TestSetup setup{/*client_owns_connection=*/false};
270273
ProxyClient<messages::FooInterface>* foo = setup.client.get();
271274
KJ_EXPECT(foo->add(1, 2) == 3);
272275

273276
foo->initThreadMap();
274-
std::promise<void> signal;
275277
setup.server->m_impl->m_fn = [&] {
276278
EventLoopRef loop{*setup.server->m_context.loop};
277279
setup.client_disconnect();
@@ -287,6 +289,11 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
287289
}
288290
KJ_EXPECT(disconnected);
289291

292+
// Now that the disconnect has been detected, set signal allowing the
293+
// callFnAsync() IPC call to return. Since signalling may not wake up the
294+
// thread right away, it is important for the signal variable to be declared
295+
// *before* the TestSetup variable so is not destroyed while
296+
// signal.get_future().get() is called.
290297
signal.set_value();
291298
}
292299

0 commit comments

Comments
 (0)