Skip to content

Use CUDA events instead of CUDA device/stream synchronization #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2c0dac4
Use CUDA events to track `Buffer` copy progress
pentschev Apr 23, 2025
1202c50
Test for copy completion
pentschev Apr 23, 2025
bc40a83
Throw exception if buffer copy didn't complete before `send()`
pentschev Apr 23, 2025
6954a34
Only send GPU data if copy is complete
pentschev Apr 23, 2025
c3b04e3
Prevent receiving on incomplete `Buffer` allocation
pentschev Apr 24, 2025
ecb9e2d
Use `BufferWithEvent`
pentschev Apr 24, 2025
fa236cb
Log destruction/release if allocation is not complete
pentschev Apr 24, 2025
e393aa0
Merge branch 'branch-25.06' into buffer-cuda-event
pentschev Apr 24, 2025
c23e13f
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
33015af
Disable event tracking instead of logger
pentschev Apr 24, 2025
2699326
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
23673ab
Replace incorrect use of `RAPIDSMPF_CUDA_TRY_ALLOC`
pentschev Apr 24, 2025
1180b8e
Create evens with `cudaEventDisableTiming`
pentschev Apr 24, 2025
aa2ade6
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
7714a6f
Simplify condition
pentschev Apr 24, 2025
eded688
Make synchronization the user's responsibility before `send()`
pentschev Apr 24, 2025
8887137
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
0bac2a6
Fix style
pentschev Apr 24, 2025
5f04b7a
Chunk CUDA event
pentschev Apr 28, 2025
01b2717
Ensure ready-for-data messages are not lost
pentschev Apr 29, 2025
95cb163
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 29, 2025
42d2b9f
Prevent reinsertion
pentschev Apr 29, 2025
f019d8d
Ensure thread-safety of `Chunk::Event::is_done()`
pentschev Apr 30, 2025
0226f52
Revert "Ensure ready-for-data messages are not lost"
pentschev Apr 30, 2025
a56070c
Only extract chunks that are ready from inbox
pentschev Apr 30, 2025
aaad38c
Combine all event checks within `Chunk`
pentschev Apr 30, 2025
02b6c01
Remove redundant check while processing `outgoing_chunks_`
pentschev Apr 30, 2025
1f41e92
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 30, 2025
4ae9a4e
Provide CUDA event for all Buffer relevant cases, remove BufferWithEvent
pentschev Apr 30, 2025
63f819e
Construct `Buffer` pointers without destroying original object
pentschev Apr 30, 2025
71a8786
Improvements
pentschev Apr 30, 2025
3055053
Clarify use of streams
pentschev May 1, 2025
725c3aa
Simplify `is_ready()` condition
pentschev May 1, 2025
21f04a1
Avoid `Chunk::Event` use-after-free
pentschev May 1, 2025
11bac26
Move all CUDA events into `Buffer`
pentschev May 1, 2025
dfd6e70
Move `Event` definition to `buffer.hpp`
pentschev May 1, 2025
6795eff
Update `Chunk::is_ready()` docstring
pentschev May 1, 2025
dc1d898
Allow use of a shared `Buffer::Event`
pentschev May 1, 2025
b448c7e
Fix use-after-free in `Buffer::Event`
pentschev May 1, 2025
007a536
Prevent constness cast
pentschev May 1, 2025
8f6ecaf
Remove IncomingChunk
pentschev May 2, 2025
10fc871
Fix Event destruction (again)
pentschev May 2, 2025
e2d8c9c
Fix Event smart-pointer type
pentschev May 2, 2025
27f856c
Simplify `is_ready()` condition
pentschev May 2, 2025
640c2bf
Fixed docstring
pentschev May 2, 2025
40ebb2a
Fix build errors
pentschev May 2, 2025
3f65ea3
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev May 5, 2025
17c01e2
Simplify `Event` destructor to rely on smart-pointer for thread-safety
pentschev May 5, 2025
8627cfc
Improve docs
pentschev May 5, 2025
afd84a1
Typo fixes
pentschev May 5, 2025
cb0b159
Terminate if buffers are not ready for use
pentschev May 5, 2025
1f2f966
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev May 5, 2025
de3db36
Remove expected_num_chunks from Chunks constructor with gpu_data
pentschev May 5, 2025
294d5b9
Fix Chunk::is_ready condition
pentschev May 5, 2025
6a1ae03
Update condition and is_ready docstring
pentschev May 5, 2025
16c8b17
Check for gpu_data_size
pentschev May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions cpp/include/rapidsmpf/buffer/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
#pragma once

#include <array>
#include <atomic>
#include <memory>
#include <mutex>
#include <variant>
#include <vector>

#include <cuda_runtime.h>

#include <rmm/device_buffer.hpp>

#include <rapidsmpf/error.hpp>

namespace rapidsmpf {

class BufferResource;
class Event;

/// @brief Enum representing the type of memory.
enum class MemoryType : int {
Expand All @@ -37,6 +42,44 @@ class Buffer {
friend class BufferResource;

public:
/**
* @brief CUDA event to provide synchronization among set of chunks.
*
* This event is used to serve as a synchronization point for a set of chunks
* given a user-specified stream.
*/
class Event {
public:
/**
* @brief Construct a CUDA event for a given stream.
*
* @param stream CUDA stream used for device memory operations
*/
Event(rmm::cuda_stream_view stream);

/**
* @brief Destructor for Event.
*
* Cleans up the CUDA event if one was created.
*/
~Event();

/**
* @brief Check if the CUDA event has been completed.
*
* @return true if the event has been completed, false otherwise.
*/
[[nodiscard]] bool is_ready();

private:
cudaEvent_t event_; ///< CUDA event used to track device memory allocation
std::atomic<bool> done_{false
}; ///< Cache of the event status to avoid unnecessary queries.
mutable std::mutex mutex_; ///< Protects access to event_
std::atomic<bool> destroying_{false
}; ///< Flag to indicate destruction in progress
};

/// @brief Storage type for the device buffer.
using DeviceStorageT = std::unique_ptr<rmm::device_buffer>;

Expand Down Expand Up @@ -122,8 +165,16 @@ class Buffer {
);
}

/// @brief Buffer has a move ctor but no copy or assign operator.
Buffer(Buffer&&) = default;
/**
* @brief Check if the device memory operation has completed.
*
* @return true if the device memory operation has completed or no device
* memory operation was performed, false if it is still in progress.
*/
[[nodiscard]] bool is_ready() const;

/// @brief Delete move and copy constructors and assignment operators.
Buffer(Buffer&&) = delete;
Buffer(Buffer const&) = delete;
Buffer& operator=(Buffer& o) = delete;
Buffer& operator=(Buffer&& o) = delete;
Expand All @@ -143,13 +194,20 @@ class Buffer {
* @brief Construct a Buffer from device memory.
*
* @param device_buffer A unique pointer to a device buffer.
* @param stream CUDA stream used for the device buffer allocation.
* @param br Buffer resource for memory allocation.
* @param event The shared event to use for the buffer.
*
* @throws std::invalid_argument if `device_buffer` is null.
* @throws std::invalid_argument if `stream` or `br->mr` isn't the same used by
* `device_buffer`.
*/
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br);
Buffer(
std::unique_ptr<rmm::device_buffer> device_buffer,
rmm::cuda_stream_view stream,
BufferResource* br,
std::shared_ptr<Event> event = nullptr
);

/**
* @brief Access the underlying host memory buffer.
Expand Down Expand Up @@ -184,7 +242,7 @@ class Buffer {
/**
* @brief Create a copy of this buffer using the same memory type.
*
* @param stream CUDA stream used for device memory operations.
* @param stream CUDA stream used for the device buffer allocation and copy.
* @return A unique pointer to a new Buffer containing the copied data.
*/
[[nodiscard]] std::unique_ptr<Buffer> copy(rmm::cuda_stream_view stream) const;
Expand All @@ -193,7 +251,7 @@ class Buffer {
* @brief Create a copy of this buffer using the specified memory type.
*
* @param target The target memory type.
* @param stream CUDA stream used for device memory operations.
* @param stream CUDA stream used for device bufferallocation and copy.
* @return A unique pointer to a new Buffer containing the copied data.
*/
[[nodiscard]] std::unique_ptr<Buffer> copy(
Expand All @@ -208,6 +266,8 @@ class Buffer {
/// @brief The underlying storage host memory or device memory buffer (where
/// applicable).
StorageT storage_;
/// @brief CUDA event used to track copy operations
std::shared_ptr<Event> event_;
};

} // namespace rapidsmpf
16 changes: 11 additions & 5 deletions cpp/include/rapidsmpf/buffer/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,15 @@ class BufferResource {
* @brief Move device buffer data into a Buffer.
*
* @param data A unique pointer to the device buffer.
* @param stream CUDA stream used for the data allocation, copy, and/or move.
* @param event The event to use for the buffer.
* @return A unique pointer to the resulting Buffer.
*/
std::unique_ptr<Buffer> move(std::unique_ptr<rmm::device_buffer> data);
std::unique_ptr<Buffer> move(
std::unique_ptr<rmm::device_buffer> data,
rmm::cuda_stream_view stream,
std::shared_ptr<Buffer::Event> event = nullptr
);

/**
* @brief Move a Buffer to the specified memory type.
Expand All @@ -267,7 +273,7 @@ class BufferResource {
*
* @param target The target memory type.
* @param buffer The buffer to move.
* @param stream CUDA stream for the operation.
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
* @param reservation The reservation to use for memory allocations.
* @return A unique pointer to the moved Buffer.
*
Expand All @@ -287,7 +293,7 @@ class BufferResource {
* If and only if moving between different memory types will this perform a copy.
*
* @param buffer The buffer to move.
* @param stream CUDA stream for the operation.
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
* @param reservation The reservation to use for memory allocations.
* @return A unique pointer to the resulting device buffer.
*
Expand All @@ -307,7 +313,7 @@ class BufferResource {
* If and only if moving between different memory types will this perform a copy.
*
* @param buffer The buffer to move.
* @param stream CUDA stream for the operation.
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
* @param reservation The reservation to use for memory allocations.
* @return A unique pointer to the resulting host vector.
*
Expand All @@ -328,7 +334,7 @@ class BufferResource {
*
* @param target The target memory type.
* @param buffer The buffer to copy.
* @param stream CUDA stream for the operation.
* @param stream CUDA stream used for the buffer allocation and copy.
* @param reservation The reservation to use for memory allocations.
* @return A unique pointer to the new Buffer.
*
Expand Down
10 changes: 10 additions & 0 deletions cpp/include/rapidsmpf/communicator/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ class Communicator {
* @param rank The destination rank.
* @param tag Message tag for identification.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* and data are already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function.
*/
[[nodiscard]] virtual std::unique_ptr<Future> send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
Expand All @@ -438,6 +443,11 @@ class Communicator {
* @param tag Message tag for identification.
* @param recv_buffer The receive buffer.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* is already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's throw an exception if Buffer::is_ready() == false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately that doesn't work, as @nirandaperera has noted previously in #225 (comment) . Doing that means we lose the std::unique_ptr<Buffer>.

Copy link
Member

@madsbk madsbk May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I think we should check Buffer::is_ready() == false. It might be unrecoverable but still better than a segfault. But let's make it clear in the doc that the buffer has been moved and freed!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a warning instead? Note that the exception will be raised in the shuffler's progress thread, so we'll probably need to handle it in some way. I know a warning is not a solution for the potential of a segfault but I think that will more clearly inform the user about what happened, rather than an exception which may not be handled correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works too, I would suggest that we do this now and add a new ABORT log-level that both logs an error and immediately terminate. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in cb0b159 and opened #246 to track this as well.

*/
[[nodiscard]] virtual std::unique_ptr<Future> recv(
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
Expand Down
16 changes: 16 additions & 0 deletions cpp/include/rapidsmpf/shuffler/chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
*/
#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

#include <cudf/contiguous_split.hpp>
#include <cudf/table/table.hpp>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/shuffler/partition.hpp>

namespace rapidsmpf::shuffler::detail {
Expand All @@ -27,6 +30,7 @@ using ChunkID = std::uint64_t;
class Chunk {
public:
PartID const pid; ///< Partition ID that this chunk belongs to.

ChunkID const cid; ///< Unique ID of this chunk.

/// If not zero, the number of chunks of the partition expected to get from the
Expand Down Expand Up @@ -126,6 +130,18 @@ class Chunk {
std::size_t max_nbytes = 512,
rmm::cuda_stream_view stream = cudf::get_default_stream()
) const;

/**
* @brief Returns true if the chunk is ready for consumption.
*
* Checks that the gpu_data's CUDA event is ready, if gpu_data contains a valid
* buffer. The CUDA event is used to synchronize the chunk's data to ensure
* any allocation or copy (e.g., spilling) is complete before the chunk is
* consumed.
*
* @return true if the chunk is ready, false otherwise.
*/
[[nodiscard]] bool is_ready() const;
};

/**
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/rapidsmpf/shuffler/postbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class PostBox {
std::unordered_map<ChunkID, Chunk> extract(PartID pid);

/**
* @brief Extracts all chunks from the PostBox.
* @brief Extracts all ready chunks from the PostBox.
*
* @return A vector of all chunks in the PostBox.
* @return A vector of all ready chunks in the PostBox.
*/
std::vector<Chunk> extract_all();
std::vector<Chunk> extract_all_ready();

/**
* @brief Checks if the PostBox is empty.
Expand Down
8 changes: 6 additions & 2 deletions cpp/include/rapidsmpf/shuffler/shuffler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,23 @@ class Shuffler {
* @param pid The partition ID of the new chunk.
* @param metadata The metadata of the new chunk, can be null.
* @param gpu_data The gpu data of the new chunk, can be null.
* @param stream The CUDA stream for BufferResource memory operations.
* @param event The event to use for the new chunk.
*/
[[nodiscard]] detail::Chunk create_chunk(
PartID pid,
std::unique_ptr<std::vector<uint8_t>> metadata,
std::unique_ptr<rmm::device_buffer> gpu_data
std::unique_ptr<rmm::device_buffer> gpu_data,
rmm::cuda_stream_view stream,
std::shared_ptr<Buffer::Event> event
) {
return detail::Chunk{
pid,
get_new_cid(),
0, // expected_num_chunks
gpu_data ? gpu_data->size() : 0, // gpu_data_size
std::move(metadata),
br_->move(std::move(gpu_data))
br_->move(std::move(gpu_data), stream, event)
};
}

Expand Down
Loading