From e0633c453527a998cd0b6e0b133e6abfe4d9da98 Mon Sep 17 00:00:00 2001 From: Jigao Luo Date: Sun, 11 May 2025 22:38:13 +0200 Subject: [PATCH 1/2] fine-granularity locking instead of only one mutex: now each free_list has its own mutex Signed-off-by: Jigao Luo --- .../rmm/mr/device/detail/free_list.hpp | 8 ++ .../detail/stream_ordered_memory_resource.hpp | 83 +++++++++++++++---- .../rmm/mr/device/pool_memory_resource.hpp | 1 + .../mr/device/mr_ref_multithreaded_tests.cpp | 2 +- 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/cpp/include/rmm/mr/device/detail/free_list.hpp b/cpp/include/rmm/mr/device/detail/free_list.hpp index 2e33fc695..ed71a4355 100644 --- a/cpp/include/rmm/mr/device/detail/free_list.hpp +++ b/cpp/include/rmm/mr/device/detail/free_list.hpp @@ -19,6 +19,7 @@ #include #include +#include #ifdef RMM_DEBUG_PRINT #include #endif @@ -138,6 +139,12 @@ class free_list { } #endif + /** + * @brief Returns a reference to the mutex used for synchronizing the free list. + * + */ + [[nodiscard]] std::mutex& get_mutex() { return mtx_; } + protected: /** * @brief Insert a block in the free list before the specified position @@ -182,6 +189,7 @@ class free_list { private: list_type blocks; // The internal container of blocks + std::mutex mtx_; // The mutex for each free list }; } // namespace mr::detail diff --git a/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp b/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp index 5b1476d37..2f4058c96 100644 --- a/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp +++ b/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #ifdef RMM_DEBUG_PRINT #include @@ -87,9 +88,11 @@ class stream_ordered_memory_resource : public crtp, public device_ stream_ordered_memory_resource& operator=(stream_ordered_memory_resource&&) = delete; protected: - using free_list = FreeListType; - using block_type = typename free_list::block_type; - using lock_guard = std::lock_guard; + using free_list = FreeListType; + using block_type = typename free_list::block_type; + using lock_guard = std::lock_guard; + using read_lock_guard = std::shared_lock; + using write_lock_guard = std::unique_lock; // Derived classes must implement these four methods @@ -204,12 +207,11 @@ class stream_ordered_memory_resource : public crtp, public device_ */ void* do_allocate(std::size_t size, cuda_stream_view stream) override { + RMM_FUNC_RANGE(); RMM_LOG_TRACE("[A][stream %s][%zuB]", rmm::detail::format_stream(stream), size); if (size <= 0) { return nullptr; } - lock_guard lock(mtx_); - auto stream_event = get_event(stream); size = rmm::align_up(size, rmm::CUDA_ALLOCATION_ALIGNMENT); @@ -224,7 +226,8 @@ class stream_ordered_memory_resource : public crtp, public device_ size, block.pointer()); - log_summary_trace(); + // TODO(jigao): this logging is not protected by mutex! + // log_summary_trace(); return block.pointer(); } @@ -238,11 +241,11 @@ class stream_ordered_memory_resource : public crtp, public device_ */ void do_deallocate(void* ptr, std::size_t size, cuda_stream_view stream) override { + RMM_FUNC_RANGE(); RMM_LOG_TRACE("[D][stream %s][%zuB][%p]", rmm::detail::format_stream(stream), size, ptr); if (size <= 0 || ptr == nullptr) { return; } - lock_guard lock(mtx_); auto stream_event = get_event(stream); size = rmm::align_up(size, rmm::CUDA_ALLOCATION_ALIGNMENT); @@ -253,9 +256,21 @@ class stream_ordered_memory_resource : public crtp, public device_ // streams allows stealing from deleted streams. RMM_ASSERT_CUDA_SUCCESS(cudaEventRecord(stream_event.event, stream.value())); - stream_free_blocks_[stream_event].insert(block); - - log_summary_trace(); + read_lock_guard rlock(stream_free_blocks_mtx_); + // Try to find a satisfactory block in free list for the same stream (no sync required) + auto iter = stream_free_blocks_.find(stream_event); + if (iter != stream_free_blocks_.end()) { + // Hot path + lock_guard free_list_lock(iter->second.get_mutex()); + iter->second.insert(block); + } else { + rlock.unlock(); + // Cold path + write_lock_guard wlock(stream_free_blocks_mtx_); + stream_free_blocks_[stream_event].insert(block); // TODO(jigao): is it thread-safe? + } + // TODO(jigao): this logging is not protected by mutex! + // log_summary_trace(); } private: @@ -271,7 +286,9 @@ class stream_ordered_memory_resource : public crtp, public device_ */ stream_event_pair get_event(cuda_stream_view stream) { + RMM_FUNC_RANGE(); if (stream.is_per_thread_default()) { + // Hot path // Create a thread-local event for each device. These events are // deliberately leaked since the destructor needs to call into // the CUDA runtime and thread_local destructors (can) run below @@ -289,6 +306,8 @@ class stream_ordered_memory_resource : public crtp, public device_ }(); return stream_event_pair{stream.value(), event}; } + write_lock_guard wlock(stream_events_mtx_); + // Cold path // We use cudaStreamLegacy as the event map key for the default stream for consistency between // PTDS and non-PTDS mode. In PTDS mode, the cudaStreamLegacy map key will only exist if the // user explicitly passes it, so it is used as the default location for the free list @@ -319,6 +338,7 @@ class stream_ordered_memory_resource : public crtp, public device_ */ block_type allocate_and_insert_remainder(block_type block, std::size_t size, free_list& blocks) { + RMM_FUNC_RANGE(); auto const [allocated, remainder] = this->underlying().allocate_from_block(block, size); if (remainder.is_valid()) { blocks.insert(remainder); } return allocated; @@ -333,15 +353,30 @@ class stream_ordered_memory_resource : public crtp, public device_ */ block_type get_block(std::size_t size, stream_event_pair stream_event) { - // Try to find a satisfactory block in free list for the same stream (no sync required) - auto iter = stream_free_blocks_.find(stream_event); - if (iter != stream_free_blocks_.end()) { - block_type const block = iter->second.get_block(size); - if (block.is_valid()) { return allocate_and_insert_remainder(block, size, iter->second); } + RMM_FUNC_RANGE(); + { + // The hot path of get_block: + // 1. Read-lock the map for lookup + // 2. then exclusively lock the free_list to get a block locally. + read_lock_guard rlock(stream_free_blocks_mtx_); + // Try to find a satisfactory block in free list for the same stream (no sync required) + auto iter = stream_free_blocks_.find(stream_event); + if (iter != stream_free_blocks_.end()) { + lock_guard free_list_lock(iter->second.get_mutex()); + block_type const block = iter->second.get_block(size); + if (block.is_valid()) { return allocate_and_insert_remainder(block, size, iter->second); } + } } + // The cold path of get_block: + // Write lock the map to safely perform another lookup and possibly modify entries. + // This exclusive lock ensures no other threads can access the map and all free lists in the + // map. + write_lock_guard wlock(stream_free_blocks_mtx_); + auto iter = stream_free_blocks_.find(stream_event); free_list& blocks = (iter != stream_free_blocks_.end()) ? iter->second : stream_free_blocks_[stream_event]; + lock_guard free_list_lock(blocks.get_mutex()); // Try to find an existing block in another stream { @@ -382,6 +417,7 @@ class stream_ordered_memory_resource : public crtp, public device_ free_list& blocks, bool merge_first) { + RMM_FUNC_RANGE(); auto find_block = [&](auto iter) { auto other_event = iter->first.event; auto& other_blocks = iter->second; @@ -415,6 +451,7 @@ class stream_ordered_memory_resource : public crtp, public device_ ++next_iter; // Points to element after `iter` to allow erasing `iter` in the loop body if (iter->first.event != stream_event.event) { + lock_guard free_list_lock(iter->second.get_mutex()); block_type const block = find_block(iter); if (block.is_valid()) { @@ -435,6 +472,7 @@ class stream_ordered_memory_resource : public crtp, public device_ cudaEvent_t other_event, free_list&& other_blocks) { + RMM_FUNC_RANGE(); // Since we found a block associated with a different stream, we have to insert a wait // on the stream's associated event into the allocating stream. RMM_CUDA_TRY(cudaStreamWaitEvent(stream_event.stream, other_event, 0)); @@ -450,7 +488,10 @@ class stream_ordered_memory_resource : public crtp, public device_ */ void release() { - lock_guard lock(mtx_); + RMM_FUNC_RANGE(); + // lock_guard lock(mtx_); TOOD(jigao): rethink mtx_ + write_lock_guard stream_event_lock(stream_events_mtx_); + write_lock_guard wlock(stream_free_blocks_mtx_); for (auto s_e : stream_events_) { RMM_ASSERT_CUDA_SUCCESS(cudaEventSynchronize(s_e.second.event)); @@ -464,6 +505,7 @@ class stream_ordered_memory_resource : public crtp, public device_ void log_summary_trace() { #if (RMM_LOG_ACTIVE_LEVEL <= RMM_LOG_LEVEL_TRACE) + RMM_FUNC_RANGE(); std::size_t num_blocks{0}; std::size_t max_block{0}; std::size_t free_mem{0}; @@ -491,8 +533,17 @@ class stream_ordered_memory_resource : public crtp, public device_ // bidirectional mapping between non-default streams and events std::unordered_map stream_events_; + // TODO(jigao): think about get_mutex function? std::mutex mtx_; // mutex for thread-safe access + // mutex for thread-safe access to stream_free_blocks_ + // Used in the writing part of get_block, get_block_from_other_stream + std::shared_mutex stream_free_blocks_mtx_; + + // mutex for thread-safe access to stream_events_ + // Used in the NON-PTDS part of get_event + std::shared_mutex stream_events_mtx_; + rmm::cuda_device_id device_id_{rmm::get_current_cuda_device()}; }; // namespace detail diff --git a/cpp/include/rmm/mr/device/pool_memory_resource.hpp b/cpp/include/rmm/mr/device/pool_memory_resource.hpp index 6abf10e2c..a9f5338b9 100644 --- a/cpp/include/rmm/mr/device/pool_memory_resource.hpp +++ b/cpp/include/rmm/mr/device/pool_memory_resource.hpp @@ -400,6 +400,7 @@ class pool_memory_resource final */ block_type free_block(void* ptr, std::size_t size) noexcept { + RMM_FUNC_RANGE(); #ifdef RMM_POOL_TRACK_ALLOCATIONS if (ptr == nullptr) return block_type{}; auto const iter = allocated_blocks_.find(static_cast(ptr)); diff --git a/cpp/tests/mr/device/mr_ref_multithreaded_tests.cpp b/cpp/tests/mr/device/mr_ref_multithreaded_tests.cpp index 47961f4fb..bde1acfd3 100644 --- a/cpp/tests/mr/device/mr_ref_multithreaded_tests.cpp +++ b/cpp/tests/mr/device/mr_ref_multithreaded_tests.cpp @@ -57,7 +57,7 @@ void spawn_n(std::size_t num_threads, Task task, Arguments&&... args) template void spawn(Task task, Arguments&&... args) { - spawn_n(4, task, std::forward(args)...); + spawn_n(16, task, std::forward(args)...); } TEST(DefaultTest, UseCurrentDeviceResource_mt) { spawn(test_get_current_device_resource); } From 25ddff24b15ef49bd2dae736aa97f72203308870 Mon Sep 17 00:00:00 2001 From: Jigao Luo Date: Thu, 15 May 2025 14:31:54 +0200 Subject: [PATCH 2/2] Better code style in do_deallocate. More comments on hot and cold path Signed-off-by: Jigao Luo --- .../detail/stream_ordered_memory_resource.hpp | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp b/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp index 2f4058c96..0506ffd6d 100644 --- a/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp +++ b/cpp/include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp @@ -267,9 +267,48 @@ class stream_ordered_memory_resource : public crtp, public device_ rlock.unlock(); // Cold path write_lock_guard wlock(stream_free_blocks_mtx_); - stream_free_blocks_[stream_event].insert(block); // TODO(jigao): is it thread-safe? + // Recheck the map since another thread from the same stream + // might have acquired the write lock first and inserted a new free_list into map. + auto iter = stream_free_blocks_.find(stream_event); + free_list& blocks = + (iter != stream_free_blocks_.end()) ? iter->second : stream_free_blocks_[stream_event]; + lock_guard free_list_lock(blocks.get_mutex()); + blocks.insert(block); + } + + { + // Hot Path of do_deallocate: + // 1. Acquire shared read-lock on map for fast lookup + // 2. If entry exists, proceed to hot path + // 3. Acquire exclusive write-lock on free_list for block insertion + read_lock_guard rlock(stream_free_blocks_mtx_); + auto iter = stream_free_blocks_.find(stream_event); + if (iter != stream_free_blocks_.end()) { + lock_guard free_list_lock(iter->second.get_mutex()); + iter->second.insert(block); + return; + } } + + { + // Cold Path of do_deallocate: + // 1. Acquire exclusive write-lock on map to: + // - Recheck map state (another thread might have inserted a new free_list) + // - Insert a new free_list into map if still empty + // 2. Acquire exclusive write-lock on the new free_list for block insertion + // (Locking the newly created free_list is redundant as it protected by the map's + // write-lock, but retained for consistency and readability.) + write_lock_guard wlock(stream_free_blocks_mtx_); + auto iter = stream_free_blocks_.find(stream_event); + free_list& blocks = + (iter != stream_free_blocks_.end()) ? iter->second : stream_free_blocks_[stream_event]; + lock_guard free_list_lock(blocks.get_mutex()); + blocks.insert(block); + return; + } + // TODO(jigao): this logging is not protected by mutex! + // TODO(jigao): do it before return // log_summary_trace(); } @@ -288,7 +327,10 @@ class stream_ordered_memory_resource : public crtp, public device_ { RMM_FUNC_RANGE(); if (stream.is_per_thread_default()) { - // Hot path + // Hot Path (PTDS optimization): + // Leverage thread-local storage for each stream to eliminate contention + // and avoid locking entirely. + // Create a thread-local event for each device. These events are // deliberately leaked since the destructor needs to call into // the CUDA runtime and thread_local destructors (can) run below @@ -307,7 +349,10 @@ class stream_ordered_memory_resource : public crtp, public device_ return stream_event_pair{stream.value(), event}; } write_lock_guard wlock(stream_events_mtx_); - // Cold path + // Cold Path: + // Without PTDS, use pessimistic locking with a broader critical section + // to handle potential future writes to the stream_events_ map. + // We use cudaStreamLegacy as the event map key for the default stream for consistency between // PTDS and non-PTDS mode. In PTDS mode, the cudaStreamLegacy map key will only exist if the // user explicitly passes it, so it is used as the default location for the free list @@ -355,9 +400,10 @@ class stream_ordered_memory_resource : public crtp, public device_ { RMM_FUNC_RANGE(); { - // The hot path of get_block: - // 1. Read-lock the map for lookup - // 2. then exclusively lock the free_list to get a block locally. + // Hot Path of get_block: + // 1. Acquire shared read-lock on map for fast lookup + // 2. Acquire exclusive write-lock on free_list for local block allocation + read_lock_guard rlock(stream_free_blocks_mtx_); // Try to find a satisfactory block in free list for the same stream (no sync required) auto iter = stream_free_blocks_.find(stream_event); @@ -368,10 +414,9 @@ class stream_ordered_memory_resource : public crtp, public device_ } } - // The cold path of get_block: - // Write lock the map to safely perform another lookup and possibly modify entries. - // This exclusive lock ensures no other threads can access the map and all free lists in the - // map. + // Cold Path of get_block: + // Acquire write-lock on map to lookup again and modify map entries if needed + // This exclusive write-lock prevents concurrent access to map and its free_lists write_lock_guard wlock(stream_free_blocks_mtx_); auto iter = stream_free_blocks_.find(stream_event); free_list& blocks =