Skip to content

Use CUDA events instead of CUDA device/stream synchronization #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2c0dac4
Use CUDA events to track `Buffer` copy progress
pentschev Apr 23, 2025
1202c50
Test for copy completion
pentschev Apr 23, 2025
bc40a83
Throw exception if buffer copy didn't complete before `send()`
pentschev Apr 23, 2025
6954a34
Only send GPU data if copy is complete
pentschev Apr 23, 2025
c3b04e3
Prevent receiving on incomplete `Buffer` allocation
pentschev Apr 24, 2025
ecb9e2d
Use `BufferWithEvent`
pentschev Apr 24, 2025
fa236cb
Log destruction/release if allocation is not complete
pentschev Apr 24, 2025
e393aa0
Merge branch 'branch-25.06' into buffer-cuda-event
pentschev Apr 24, 2025
c23e13f
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
33015af
Disable event tracking instead of logger
pentschev Apr 24, 2025
2699326
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
23673ab
Replace incorrect use of `RAPIDSMPF_CUDA_TRY_ALLOC`
pentschev Apr 24, 2025
1180b8e
Create evens with `cudaEventDisableTiming`
pentschev Apr 24, 2025
aa2ade6
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
7714a6f
Simplify condition
pentschev Apr 24, 2025
eded688
Make synchronization the user's responsibility before `send()`
pentschev Apr 24, 2025
8887137
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
0bac2a6
Fix style
pentschev Apr 24, 2025
5f04b7a
Chunk CUDA event
pentschev Apr 28, 2025
01b2717
Ensure ready-for-data messages are not lost
pentschev Apr 29, 2025
95cb163
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 29, 2025
42d2b9f
Prevent reinsertion
pentschev Apr 29, 2025
f019d8d
Ensure thread-safety of `Chunk::Event::is_done()`
pentschev Apr 30, 2025
0226f52
Revert "Ensure ready-for-data messages are not lost"
pentschev Apr 30, 2025
a56070c
Only extract chunks that are ready from inbox
pentschev Apr 30, 2025
aaad38c
Combine all event checks within `Chunk`
pentschev Apr 30, 2025
02b6c01
Remove redundant check while processing `outgoing_chunks_`
pentschev Apr 30, 2025
1f41e92
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 30, 2025
4ae9a4e
Provide CUDA event for all Buffer relevant cases, remove BufferWithEvent
pentschev Apr 30, 2025
63f819e
Construct `Buffer` pointers without destroying original object
pentschev Apr 30, 2025
71a8786
Improvements
pentschev Apr 30, 2025
3055053
Clarify use of streams
pentschev May 1, 2025
725c3aa
Simplify `is_ready()` condition
pentschev May 1, 2025
21f04a1
Avoid `Chunk::Event` use-after-free
pentschev May 1, 2025
11bac26
Move all CUDA events into `Buffer`
pentschev May 1, 2025
dfd6e70
Move `Event` definition to `buffer.hpp`
pentschev May 1, 2025
6795eff
Update `Chunk::is_ready()` docstring
pentschev May 1, 2025
dc1d898
Allow use of a shared `Buffer::Event`
pentschev May 1, 2025
b448c7e
Fix use-after-free in `Buffer::Event`
pentschev May 1, 2025
007a536
Prevent constness cast
pentschev May 1, 2025
8f6ecaf
Remove IncomingChunk
pentschev May 2, 2025
10fc871
Fix Event destruction (again)
pentschev May 2, 2025
e2d8c9c
Fix Event smart-pointer type
pentschev May 2, 2025
27f856c
Simplify `is_ready()` condition
pentschev May 2, 2025
640c2bf
Fixed docstring
pentschev May 2, 2025
40ebb2a
Fix build errors
pentschev May 2, 2025
3f65ea3
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev May 5, 2025
17c01e2
Simplify `Event` destructor to rely on smart-pointer for thread-safety
pentschev May 5, 2025
8627cfc
Improve docs
pentschev May 5, 2025
afd84a1
Typo fixes
pentschev May 5, 2025
cb0b159
Terminate if buffers are not ready for use
pentschev May 5, 2025
1f2f966
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev May 5, 2025
de3db36
Remove expected_num_chunks from Chunks constructor with gpu_data
pentschev May 5, 2025
294d5b9
Fix Chunk::is_ready condition
pentschev May 5, 2025
6a1ae03
Update condition and is_ready docstring
pentschev May 5, 2025
16c8b17
Check for gpu_data_size
pentschev May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cpp/include/rapidsmpf/buffer/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <variant>
#include <vector>

#include <cuda_runtime.h>

#include <rmm/device_buffer.hpp>

#include <rapidsmpf/error.hpp>
Expand Down Expand Up @@ -122,12 +124,27 @@ class Buffer {
);
}

/**
* @brief Check if the last copy operation has completed.
*
* @return true if the copy operation has completed or no copy operation
* was performed, false if it is still in progress.
*/
[[nodiscard]] bool is_copy_complete() const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "copy" is, I think, the wrong phrasing. I think you mean "has any stream-ordered work to allocate this buffer completed".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this only applies to Buffer::copy at the moment, making this more general would IMO imply that it is also relevant for the constructor that just allocates without copies.


/// @brief Buffer has a move ctor but no copy or assign operator.
Buffer(Buffer&&) = default;
Buffer(Buffer const&) = delete;
Buffer& operator=(Buffer& o) = delete;
Buffer& operator=(Buffer&& o) = delete;

/**
* @brief Destructor for Buffer.
*
* Cleans up any allocated resources.
*/
~Buffer();

private:
/**
* @brief Construct a Buffer from host memory.
Expand Down Expand Up @@ -208,6 +225,8 @@ class Buffer {
/// @brief The underlying storage host memory or device memory buffer (where
/// applicable).
StorageT storage_;
/// @brief CUDA event used to track copy operations
cudaEvent_t cuda_event_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

That would only shift what we need to track, wouldn't it? If we had one event per stream, how would we know if a buffer was created before or after the event? From the check's perspective it could have happened either before or after and we would go back to have potentially invalid memory accesses.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cudaEventRecord records the state of a stream (i.e. any outstanding work on the stream is "noted") and then cudaEventWait waits for completion of all that work. So:

do_some_allocation(stream)

cudaEventRecord(event, stream)

....
cudaEventWait(event)

# allocation guaranteed to have completed

no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is why we have one event per buffer. If we had one event per stream we could end up with:

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventWait(event)

# buf1 is guaranteed to have completed, but buf2 isn't

That is what I don't think will work, or are you suggesting something different and I misunderstood the suggestion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

This is now done in 1180b8e .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventRecrod(event, stream)
cudaEventWait(event)

# now buf1 and buf2 are guaranteed completed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in that case aren't we just blocking until all buffers are completed? This would potentially prevent us from progressing buf1 until buf2 has completed too, which could increase memory pressure, so I'm not really sure the benefits outweigh the costs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional complication is that you would also need to keep track of the streams and events globally, meaning we probably wouldn't be able to get an arbitrary Buffer that we don't know which stream it's being allocated/copied nor where the stream's event is located. IOW, we would need to have some sort of manager visible everywhere to track streams and events.

};

} // namespace rapidsmpf
2 changes: 2 additions & 0 deletions cpp/include/rapidsmpf/communicator/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ class Communicator {
* @param rank The destination rank.
* @param tag Message tag for identification.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @throw std::logic_error if the buffer copy is not complete yet.
*/
[[nodiscard]] virtual std::unique_ptr<Future> send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
Expand Down
47 changes: 42 additions & 5 deletions cpp/src/buffer/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
#include <stdexcept>

#include <cuda_runtime.h>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/buffer/resource.hpp>

Expand All @@ -16,12 +18,19 @@ template <typename T>
RAPIDSMPF_EXPECTS(ptr, "unique pointer cannot be null", std::invalid_argument);
return ptr;
}

// Helper to create and record a CUDA event
void create_and_record_event(cudaEvent_t& event, rmm::cuda_stream_view stream) {
RAPIDSMPF_CUDA_TRY(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
RAPIDSMPF_CUDA_TRY(cudaEventRecord(event, stream));
}
} // namespace

Buffer::Buffer(std::unique_ptr<std::vector<uint8_t>> host_buffer, BufferResource* br)
: br{br},
size{host_buffer ? host_buffer->size() : 0},
storage_{std::move(host_buffer)} {
storage_{std::move(host_buffer)},
cuda_event_{nullptr} {
RAPIDSMPF_EXPECTS(
std::get<HostStorageT>(storage_) != nullptr, "the host_buffer cannot be NULL"
);
Expand All @@ -31,13 +40,20 @@ Buffer::Buffer(std::unique_ptr<std::vector<uint8_t>> host_buffer, BufferResource
Buffer::Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br)
: br{br},
size{device_buffer ? device_buffer->size() : 0},
storage_{std::move(device_buffer)} {
storage_{std::move(device_buffer)},
cuda_event_{nullptr} {
RAPIDSMPF_EXPECTS(
std::get<DeviceStorageT>(storage_) != nullptr, "the device buffer cannot be NULL"
);
RAPIDSMPF_EXPECTS(br != nullptr, "the BufferResource cannot be NULL");
}

Buffer::~Buffer() {
if (cuda_event_ != nullptr) {
cudaEventDestroy(cuda_event_);
}
}

void* Buffer::data() {
return std::visit([](auto&& storage) -> void* { return storage->data(); }, storage_);
}
Expand All @@ -55,12 +71,14 @@ std::unique_ptr<Buffer> Buffer::copy(rmm::cuda_stream_view stream) const {
);
},
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> {
return std::unique_ptr<Buffer>(new Buffer{
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{
std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
),
br
});
create_and_record_event(new_buffer->cuda_event_, stream);
return new_buffer;
}
},
storage_
Expand All @@ -76,12 +94,14 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st
return std::visit(
overloaded{
[&](const HostStorageT& storage) -> std::unique_ptr<Buffer> {
return std::unique_ptr<Buffer>(new Buffer{
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{
std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
),
br
});
create_and_record_event(new_buffer->cuda_event_, stream);
return new_buffer;
},
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> {
auto ret = std::make_unique<std::vector<uint8_t>>(storage->size());
Expand All @@ -92,11 +112,28 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st
cudaMemcpyDeviceToHost,
stream
));
return std::unique_ptr<Buffer>(new Buffer{std::move(ret), br});
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{std::move(ret), br});
create_and_record_event(new_buffer->cuda_event_, stream);
return new_buffer;
}
},
storage_
);
}

bool Buffer::is_copy_complete() const {
if (cuda_event_ == nullptr) {
return true; // No copy operation was performed
}
cudaError_t status = cudaEventQuery(cuda_event_);
if (status == cudaSuccess) {
return true;
} else if (status == cudaErrorNotReady) {
return false;
} else {
RAPIDSMPF_CUDA_TRY(status);
return false; // This line is unreachable due to the throw above
}
}

} // namespace rapidsmpf
1 change: 1 addition & 0 deletions cpp/src/communicator/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg->is_copy_complete(), "buffer copy has not completed yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg is already moved to send. Now, if we throw here, we are loosing data, right? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, you're right. It is unfortunate that you're right because this means there's no good way to push this check onto the communicator and we thus have to force the caller to ensure that. I'll revert those changes and update the docstrings to make the contract to the caller clear: the caller needs to ensure the allocation and data are ready.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in eded688 .

RAPIDSMPF_EXPECTS(
msg->size <= std::numeric_limits<int>::max(),
"send buffer size exceeds MPI max count"
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/communicator/ucxx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,6 @@ std::shared_ptr<::ucxx::Endpoint> UCXX::get_endpoint(Rank rank) {
std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<std::vector<uint8_t>> msg, Rank rank, Tag tag, BufferResource* br
) {
RAPIDSMPF_CUDA_TRY(cudaDeviceSynchronize());
auto req = get_endpoint(rank)->tagSend(
msg->data(),
msg->size(),
Expand All @@ -1086,7 +1085,7 @@ std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<Communicator::Future> UCXX::send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
) {
RAPIDSMPF_CUDA_TRY(cudaDeviceSynchronize());
RAPIDSMPF_EXPECTS(msg->is_copy_complete(), "buffer copy has not completed yet");
auto req = get_endpoint(rank)->tagSend(
msg->data(), msg->size, tag_with_rank(shared_resources_->rank(), tag)
);
Expand Down Expand Up @@ -1148,9 +1147,6 @@ std::vector<std::size_t> UCXX::test_some(
completed.push_back(i);
}
}
if (completed.size() > 0) {
RAPIDSMPF_CUDA_TRY(cudaDeviceSynchronize());
}
return completed;
}

Expand All @@ -1167,9 +1163,6 @@ std::vector<std::size_t> UCXX::test_some(
completed.push_back(key);
}
}
if (completed.size() > 0) {
RAPIDSMPF_CUDA_TRY(cudaDeviceSynchronize());
}
return completed;
}

Expand Down
Loading