Skip to content

Use CUDA events instead of CUDA device/stream synchronization #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2c0dac4
Use CUDA events to track `Buffer` copy progress
pentschev Apr 23, 2025
1202c50
Test for copy completion
pentschev Apr 23, 2025
bc40a83
Throw exception if buffer copy didn't complete before `send()`
pentschev Apr 23, 2025
6954a34
Only send GPU data if copy is complete
pentschev Apr 23, 2025
c3b04e3
Prevent receiving on incomplete `Buffer` allocation
pentschev Apr 24, 2025
ecb9e2d
Use `BufferWithEvent`
pentschev Apr 24, 2025
fa236cb
Log destruction/release if allocation is not complete
pentschev Apr 24, 2025
e393aa0
Merge branch 'branch-25.06' into buffer-cuda-event
pentschev Apr 24, 2025
c23e13f
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
33015af
Disable event tracking instead of logger
pentschev Apr 24, 2025
2699326
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
23673ab
Replace incorrect use of `RAPIDSMPF_CUDA_TRY_ALLOC`
pentschev Apr 24, 2025
1180b8e
Create evens with `cudaEventDisableTiming`
pentschev Apr 24, 2025
aa2ade6
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 24, 2025
7714a6f
Simplify condition
pentschev Apr 24, 2025
eded688
Make synchronization the user's responsibility before `send()`
pentschev Apr 24, 2025
8887137
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev Apr 24, 2025
0bac2a6
Fix style
pentschev Apr 24, 2025
5f04b7a
Chunk CUDA event
pentschev Apr 28, 2025
01b2717
Ensure ready-for-data messages are not lost
pentschev Apr 29, 2025
95cb163
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 29, 2025
42d2b9f
Prevent reinsertion
pentschev Apr 29, 2025
f019d8d
Ensure thread-safety of `Chunk::Event::is_done()`
pentschev Apr 30, 2025
0226f52
Revert "Ensure ready-for-data messages are not lost"
pentschev Apr 30, 2025
a56070c
Only extract chunks that are ready from inbox
pentschev Apr 30, 2025
aaad38c
Combine all event checks within `Chunk`
pentschev Apr 30, 2025
02b6c01
Remove redundant check while processing `outgoing_chunks_`
pentschev Apr 30, 2025
1f41e92
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev Apr 30, 2025
4ae9a4e
Provide CUDA event for all Buffer relevant cases, remove BufferWithEvent
pentschev Apr 30, 2025
63f819e
Construct `Buffer` pointers without destroying original object
pentschev Apr 30, 2025
71a8786
Improvements
pentschev Apr 30, 2025
3055053
Clarify use of streams
pentschev May 1, 2025
725c3aa
Simplify `is_ready()` condition
pentschev May 1, 2025
21f04a1
Avoid `Chunk::Event` use-after-free
pentschev May 1, 2025
11bac26
Move all CUDA events into `Buffer`
pentschev May 1, 2025
dfd6e70
Move `Event` definition to `buffer.hpp`
pentschev May 1, 2025
6795eff
Update `Chunk::is_ready()` docstring
pentschev May 1, 2025
dc1d898
Allow use of a shared `Buffer::Event`
pentschev May 1, 2025
b448c7e
Fix use-after-free in `Buffer::Event`
pentschev May 1, 2025
007a536
Prevent constness cast
pentschev May 1, 2025
8f6ecaf
Remove IncomingChunk
pentschev May 2, 2025
10fc871
Fix Event destruction (again)
pentschev May 2, 2025
e2d8c9c
Fix Event smart-pointer type
pentschev May 2, 2025
27f856c
Simplify `is_ready()` condition
pentschev May 2, 2025
640c2bf
Fixed docstring
pentschev May 2, 2025
40ebb2a
Fix build errors
pentschev May 2, 2025
3f65ea3
Merge remote-tracking branch 'upstream/branch-25.06' into buffer-cuda…
pentschev May 5, 2025
17c01e2
Simplify `Event` destructor to rely on smart-pointer for thread-safety
pentschev May 5, 2025
8627cfc
Improve docs
pentschev May 5, 2025
afd84a1
Typo fixes
pentschev May 5, 2025
cb0b159
Terminate if buffers are not ready for use
pentschev May 5, 2025
1f2f966
Merge remote-tracking branch 'origin/buffer-cuda-event' into buffer-c…
pentschev May 5, 2025
de3db36
Remove expected_num_chunks from Chunks constructor with gpu_data
pentschev May 5, 2025
294d5b9
Fix Chunk::is_ready condition
pentschev May 5, 2025
6a1ae03
Update condition and is_ready docstring
pentschev May 5, 2025
16c8b17
Check for gpu_data_size
pentschev May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions cpp/include/rapidsmpf/buffer/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <variant>
#include <vector>

#include <cuda_runtime.h>

#include <rmm/device_buffer.hpp>

#include <rapidsmpf/error.hpp>
Expand Down Expand Up @@ -122,12 +124,27 @@ class Buffer {
);
}

/// @brief Buffer has a move ctor but no copy or assign operator.
Buffer(Buffer&&) = default;
/**
* @brief Check if the device memory operation has completed.
*
* @return true if the device memory operation has completed or no device
* memory operation was performed, false if it is still in progress.
*/
[[nodiscard]] bool is_ready() const;

/// @brief Delete move and copy constructors and assignment operators.
Buffer(Buffer&&) = delete;
Buffer(Buffer const&) = delete;
Buffer& operator=(Buffer& o) = delete;
Buffer& operator=(Buffer&& o) = delete;

/**
* @brief Destructor for Buffer.
*
* Cleans up any allocated resources.
*/
~Buffer();

private:
/**
* @brief Construct a Buffer from host memory.
Expand All @@ -143,13 +160,18 @@ class Buffer {
* @brief Construct a Buffer from device memory.
*
* @param device_buffer A unique pointer to a device buffer.
* @param stream CUDA stream for the operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There doesn't appear to be any "operation" here, so can you explain in the docstring what this stream is used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify the use for the stream in 3055053, I'm not sure if that is satisfactory to be honest. Please have a look at tell me what you think.

* @param br Buffer resource for memory allocation.
*
* @throws std::invalid_argument if `device_buffer` is null.
* @throws std::invalid_argument if `stream` or `br->mr` isn't the same used by
* `device_buffer`.
*/
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br);
Buffer(
std::unique_ptr<rmm::device_buffer> device_buffer,
rmm::cuda_stream_view stream,
BufferResource* br
);

/**
* @brief Access the underlying host memory buffer.
Expand Down Expand Up @@ -208,6 +230,8 @@ class Buffer {
/// @brief The underlying storage host memory or device memory buffer (where
/// applicable).
StorageT storage_;
/// @brief CUDA event used to track copy operations
cudaEvent_t cuda_event_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

That would only shift what we need to track, wouldn't it? If we had one event per stream, how would we know if a buffer was created before or after the event? From the check's perspective it could have happened either before or after and we would go back to have potentially invalid memory accesses.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cudaEventRecord records the state of a stream (i.e. any outstanding work on the stream is "noted") and then cudaEventWait waits for completion of all that work. So:

do_some_allocation(stream)

cudaEventRecord(event, stream)

....
cudaEventWait(event)

# allocation guaranteed to have completed

no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is why we have one event per buffer. If we had one event per stream we could end up with:

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventWait(event)

# buf1 is guaranteed to have completed, but buf2 isn't

That is what I don't think will work, or are you suggesting something different and I misunderstood the suggestion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

This is now done in 1180b8e .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventRecrod(event, stream)
cudaEventWait(event)

# now buf1 and buf2 are guaranteed completed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in that case aren't we just blocking until all buffers are completed? This would potentially prevent us from progressing buf1 until buf2 has completed too, which could increase memory pressure, so I'm not really sure the benefits outweigh the costs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional complication is that you would also need to keep track of the streams and events globally, meaning we probably wouldn't be able to get an arbitrary Buffer that we don't know which stream it's being allocated/copied nor where the stream's event is located. IOW, we would need to have some sort of manager visible everywhere to track streams and events.

};

} // namespace rapidsmpf
5 changes: 4 additions & 1 deletion cpp/include/rapidsmpf/buffer/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ class BufferResource {
* @brief Move device buffer data into a Buffer.
*
* @param data A unique pointer to the device buffer.
* @param stream CUDA stream for the operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What "operation" is being performed such that a stream is required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, tried to clarify it in 3055053, let me know if you think we should make it even more clear.

* @return A unique pointer to the resulting Buffer.
*/
std::unique_ptr<Buffer> move(std::unique_ptr<rmm::device_buffer> data);
std::unique_ptr<Buffer> move(
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
);

/**
* @brief Move a Buffer to the specified memory type.
Expand Down
10 changes: 10 additions & 0 deletions cpp/include/rapidsmpf/communicator/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ class Communicator {
* @param rank The destination rank.
* @param tag Message tag for identification.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* and data are already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function.
*/
[[nodiscard]] virtual std::unique_ptr<Future> send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
Expand All @@ -438,6 +443,11 @@ class Communicator {
* @param tag Message tag for identification.
* @param recv_buffer The receive buffer.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* is already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's throw an exception if Buffer::is_ready() == false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately that doesn't work, as @nirandaperera has noted previously in #225 (comment) . Doing that means we lose the std::unique_ptr<Buffer>.

Copy link
Member

@madsbk madsbk May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I think we should check Buffer::is_ready() == false. It might be unrecoverable but still better than a segfault. But let's make it clear in the doc that the buffer has been moved and freed!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a warning instead? Note that the exception will be raised in the shuffler's progress thread, so we'll probably need to handle it in some way. I know a warning is not a solution for the potential of a segfault but I think that will more clearly inform the user about what happened, rather than an exception which may not be handled correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works too, I would suggest that we do this now and add a new ABORT log-level that both logs an error and immediately terminate. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in cb0b159 and opened #246 to track this as well.

*/
[[nodiscard]] virtual std::unique_ptr<Future> recv(
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
Expand Down
61 changes: 60 additions & 1 deletion cpp/include/rapidsmpf/shuffler/chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
#pragma once

#include <atomic>
#include <memory>
#include <sstream>
#include <vector>
Expand All @@ -12,6 +13,7 @@
#include <cudf/table/table.hpp>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/shuffler/partition.hpp>

namespace rapidsmpf::shuffler::detail {
Expand All @@ -26,7 +28,47 @@ using ChunkID = std::uint64_t;
*/
class Chunk {
public:
/**
* @brief CUDA event to provide synchronization among set of chunks.
*
* This event is used to serve as a synchronization point for a set of chunks
* given a user-specified stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should promote this class to an object that is usable by the Buffer as well, rather than having two different ways of managing events.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as part of 11bac26 as well.

*/
class Event {
public:
/**
* @brief Construct a CUDA event for a given stream.
*
* @param stream CUDA stream used for device memory operations
* @param log Logger to warn if object is destroyed before event is ready.
*/
Event(rmm::cuda_stream_view stream, Communicator::Logger& log);

/**
* @brief Destructor for Event.
*
* Cleans up the CUDA event if one was created. If the event is not done,
* it will log a warning.
*/
~Event();

/**
* @brief Check if the CUDA event has been completed.
*
* @return true if the event has been completed, false otherwise.
*/
[[nodiscard]] bool is_ready();

private:
cudaEvent_t event_; ///< CUDA event used to track device memory allocation
Communicator::Logger&
log_; ///< Logger to warn if object is destroyed before event is ready
std::atomic<bool> done_{false
}; ///< Cache of the event status to avoid unnecessary queries.
};

PartID const pid; ///< Partition ID that this chunk belongs to.

ChunkID const cid; ///< Unique ID of this chunk.

/// If not zero, the number of chunks of the partition expected to get from the
Expand All @@ -42,6 +84,9 @@ class Chunk {
/// GPU data buffer of the packed `cudf::table` associated with this chunk.
std::unique_ptr<Buffer> gpu_data;

/// CUDA event to provide synchronization among set of chunks.
std::shared_ptr<Event> event;

/**
* @brief Construct a new chunk of a partition.
*
Expand All @@ -54,14 +99,16 @@ class Chunk {
* chunk.
* @param gpu_data The gpu_data of the packed `cudf::table` that makes up this
* chunk.
* @param event CUDA event to provide synchronization among set of chunks.
*/
Chunk(
PartID pid,
ChunkID cid,
std::size_t expected_num_chunks,
std::size_t gpu_data_size,
std::unique_ptr<std::vector<uint8_t>> metadata,
std::unique_ptr<Buffer> gpu_data
std::unique_ptr<Buffer> gpu_data,
std::shared_ptr<Event> event
);

/**
Expand Down Expand Up @@ -126,6 +173,18 @@ class Chunk {
std::size_t max_nbytes = 512,
rmm::cuda_stream_view stream = cudf::get_default_stream()
) const;

/**
* @brief Returns true if the chunk is ready for consumption.
*
* Checks that the shared CUDA event and the buffer's CUDA event are both ready.
* The shared CUDA event is used to synchronize the chunk's data across a set of
* chunks, while the buffer's CUDA event is used to synchronize the chunk's data
* if any spilling is involved.
*
* @return true if the chunk is ready, false otherwise.
*/
[[nodiscard]] bool is_ready() const;
};

/**
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/rapidsmpf/shuffler/postbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class PostBox {
std::unordered_map<ChunkID, Chunk> extract(PartID pid);

/**
* @brief Extracts all chunks from the PostBox.
* @brief Extracts all ready chunks from the PostBox.
*
* @return A vector of all chunks in the PostBox.
* @return A vector of all ready chunks in the PostBox.
*/
std::vector<Chunk> extract_all();
std::vector<Chunk> extract_all_ready();

/**
* @brief Checks if the PostBox is empty.
Expand Down
9 changes: 7 additions & 2 deletions cpp/include/rapidsmpf/shuffler/shuffler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,24 @@ class Shuffler {
* @param pid The partition ID of the new chunk.
* @param metadata The metadata of the new chunk, can be null.
* @param gpu_data The gpu data of the new chunk, can be null.
* @param stream The CUDA stream for BufferResource memory operations.
* @param event The event to use for the new chunk.
*/
[[nodiscard]] detail::Chunk create_chunk(
PartID pid,
std::unique_ptr<std::vector<uint8_t>> metadata,
std::unique_ptr<rmm::device_buffer> gpu_data
std::unique_ptr<rmm::device_buffer> gpu_data,
rmm::cuda_stream_view stream,
std::shared_ptr<detail::Chunk::Event> event
) {
return detail::Chunk{
pid,
get_new_cid(),
0, // expected_num_chunks
gpu_data ? gpu_data->size() : 0, // gpu_data_size
std::move(metadata),
br_->move(std::move(gpu_data))
br_->move(std::move(gpu_data), stream),
std::move(event)
};
}

Expand Down
59 changes: 53 additions & 6 deletions cpp/src/buffer/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
#include <stdexcept>

#include <cuda_runtime.h>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/buffer/resource.hpp>

Expand All @@ -16,28 +18,48 @@ template <typename T>
RAPIDSMPF_EXPECTS(ptr, "unique pointer cannot be null", std::invalid_argument);
return ptr;
}

// Helper to create and record a CUDA event
cudaEvent_t create_and_record_event(rmm::cuda_stream_view stream) {
cudaEvent_t event;
RAPIDSMPF_CUDA_TRY(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
RAPIDSMPF_CUDA_TRY(cudaEventRecord(event, stream));
return event;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we can make this simpler by either only storing an event in the Chunk (which we can think of as a Buffer + metadata) or in the Buffer.

I think I am inclined to go with storing it in the Buffer.

That is, a Buffer holds a std::shared_ptr<Event> (that may be null), on insertion of chunks, this event is either the shared event or the copy-specific event.

Then we only need to remember to check the event on the buffer.

If we start using an internal stream for copies during spilling then we should use this pattern:

insert(chunks..., stream) {
   auto event = std::make_shared<Event>();
   event->record(stream); 

   for (chunk in chunks) {
       if (must_spill(chunk)) {
           cudaStreamWaitEvent(internal_stream, event); // this doesn't block, it just sets up dependencies
           spill_chunk_on(internal_stream);
           auto nevent = std::make_shared<Event>();
           nevent->record(internal_stream);
           chunk.buffer.set_event(std::move(nevent));
       } else {
           chunk.buffer.set_event(event);
       }
   }
}

Now we only need to check chunk.buffer.is_ready() (or buffer.is_ready() if we don't have a chunk).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made those changes now in 11bac26, and I do agree we don't need the event in Chunk, it is redundant since we now create Buffer.Event for each chunk in insert(), so relying on the individual ones make sense. However, as it is we lose the ability to log if the event is not ready upon destruction, I started making changes to pass the logger through in a71055a , but it requires an enormous amount of API changes, plus it then raises a lot of warnings in tests because we indeed never check that allocation/copy completes, arguably this should be fixed, but I think we shouldn't do it all in this PR then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think that logging change can't be right. If we want to to log things, we should make the event construction take a reference to a logger (optional, I suppose), and we should split the event creation from the allocation then we don't need to make all these API changes, I hope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would technically work but I don't see the point in any of that. A logger whose purpose is to warn the user when he did something wrong being optional is completely pointless, we either have it there always or there's really no need for it. Also transferring the responsibility to properly create an Event back to the user is also just opening another door for errors, we are again exchanging reliability for the potential to warn the user what he's done is unreliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user in this scenario is another developer of the library, I think.

I suppose with respect to event creation I guess it's a philosophical difference. I think I prefer the case where object creation the event we're depending on is explicit, and then methods on the object the event is implicit (so you shouldn't need to provide an event when you do a copy because the API should manage that for you), but when you create an object that depends on an event explicitly (e.g. the Buffer ctor, or the BufferResource::move construction from device_buffers) then you should provide the event. I suppose the other thing you can do is provide the stream that captures the work which is kind of the same thing, but you may get some false dependencies if someone else submitted some work to the stream before you get to record your event.

} // namespace

Buffer::Buffer(std::unique_ptr<std::vector<uint8_t>> host_buffer, BufferResource* br)
: br{br},
size{host_buffer ? host_buffer->size() : 0},
storage_{std::move(host_buffer)} {
storage_{std::move(host_buffer)},
cuda_event_{nullptr} {
RAPIDSMPF_EXPECTS(
std::get<HostStorageT>(storage_) != nullptr, "the host_buffer cannot be NULL"
);
RAPIDSMPF_EXPECTS(br != nullptr, "the BufferResource cannot be NULL");
}

Buffer::Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br)
Buffer::Buffer(
std::unique_ptr<rmm::device_buffer> device_buffer,
rmm::cuda_stream_view stream,
BufferResource* br
)
: br{br},
size{device_buffer ? device_buffer->size() : 0},
storage_{std::move(device_buffer)} {
storage_{std::move(device_buffer)},
cuda_event_{create_and_record_event(stream)} {
RAPIDSMPF_EXPECTS(
std::get<DeviceStorageT>(storage_) != nullptr, "the device buffer cannot be NULL"
);
RAPIDSMPF_EXPECTS(br != nullptr, "the BufferResource cannot be NULL");
}

Buffer::~Buffer() {
if (cuda_event_ != nullptr) {
cudaEventDestroy(cuda_event_);
}
}

void* Buffer::data() {
return std::visit([](auto&& storage) -> void* { return storage->data(); }, storage_);
}
Expand All @@ -55,12 +77,14 @@ std::unique_ptr<Buffer> Buffer::copy(rmm::cuda_stream_view stream) const {
);
},
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> {
return std::unique_ptr<Buffer>(new Buffer{
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{
std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
),
stream,
br
});
return new_buffer;
}
},
storage_
Expand All @@ -76,12 +100,14 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st
return std::visit(
overloaded{
[&](const HostStorageT& storage) -> std::unique_ptr<Buffer> {
return std::unique_ptr<Buffer>(new Buffer{
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{
std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
),
stream,
br
});
return new_buffer;
},
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> {
auto ret = std::make_unique<std::vector<uint8_t>>(storage->size());
Expand All @@ -92,11 +118,32 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st
cudaMemcpyDeviceToHost,
stream
));
return std::unique_ptr<Buffer>(new Buffer{std::move(ret), br});
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{std::move(ret), br});

// The event is created here instead of the constructor because the
// memcpy is async, but the buffer is created on the host.
new_buffer->cuda_event_ = create_and_record_event(stream);

return new_buffer;
}
},
storage_
);
}

bool Buffer::is_ready() const {
if (cuda_event_ == nullptr) {
return true; // No device memory operation was performed
}
cudaError_t status = cudaEventQuery(cuda_event_);
if (status == cudaSuccess) {
return true;
} else if (status == cudaErrorNotReady) {
return false;
} else {
RAPIDSMPF_CUDA_TRY(status);
return false; // This line is unreachable due to the throw above
}
}

} // namespace rapidsmpf
Loading