-
Notifications
You must be signed in to change notification settings - Fork 15
Use CUDA events instead of CUDA device/stream synchronization #225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 31 commits
2c0dac4
1202c50
bc40a83
6954a34
c3b04e3
ecb9e2d
fa236cb
e393aa0
c23e13f
33015af
2699326
23673ab
1180b8e
aa2ade6
7714a6f
eded688
8887137
0bac2a6
5f04b7a
01b2717
95cb163
42d2b9f
f019d8d
0226f52
a56070c
aaad38c
02b6c01
1f41e92
4ae9a4e
63f819e
71a8786
3055053
725c3aa
21f04a1
11bac26
dfd6e70
6795eff
dc1d898
b448c7e
007a536
8f6ecaf
10fc871
e2d8c9c
27f856c
640c2bf
40ebb2a
3f65ea3
17c01e2
8627cfc
afd84a1
cb0b159
1f2f966
de3db36
294d5b9
6a1ae03
16c8b17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
#include <variant> | ||
#include <vector> | ||
|
||
#include <cuda_runtime.h> | ||
|
||
#include <rmm/device_buffer.hpp> | ||
|
||
#include <rapidsmpf/error.hpp> | ||
|
@@ -122,12 +124,27 @@ class Buffer { | |
); | ||
} | ||
|
||
/// @brief Buffer has a move ctor but no copy or assign operator. | ||
Buffer(Buffer&&) = default; | ||
/** | ||
* @brief Check if the device memory operation has completed. | ||
* | ||
* @return true if the device memory operation has completed or no device | ||
* memory operation was performed, false if it is still in progress. | ||
*/ | ||
[[nodiscard]] bool is_ready() const; | ||
|
||
/// @brief Delete move and copy constructors and assignment operators. | ||
Buffer(Buffer&&) = delete; | ||
Buffer(Buffer const&) = delete; | ||
Buffer& operator=(Buffer& o) = delete; | ||
Buffer& operator=(Buffer&& o) = delete; | ||
|
||
/** | ||
* @brief Destructor for Buffer. | ||
* | ||
* Cleans up any allocated resources. | ||
*/ | ||
~Buffer(); | ||
|
||
private: | ||
/** | ||
* @brief Construct a Buffer from host memory. | ||
|
@@ -143,13 +160,18 @@ class Buffer { | |
* @brief Construct a Buffer from device memory. | ||
* | ||
* @param device_buffer A unique pointer to a device buffer. | ||
* @param stream CUDA stream for the operation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There doesn't appear to be any "operation" here, so can you explain in the docstring what this stream is used for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to clarify the use for the stream in 3055053, I'm not sure if that is satisfactory to be honest. Please have a look at tell me what you think. |
||
* @param br Buffer resource for memory allocation. | ||
* | ||
* @throws std::invalid_argument if `device_buffer` is null. | ||
* @throws std::invalid_argument if `stream` or `br->mr` isn't the same used by | ||
* `device_buffer`. | ||
*/ | ||
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br); | ||
Buffer( | ||
madsbk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
std::unique_ptr<rmm::device_buffer> device_buffer, | ||
rmm::cuda_stream_view stream, | ||
BufferResource* br | ||
); | ||
|
||
/** | ||
* @brief Access the underlying host memory buffer. | ||
|
@@ -208,6 +230,8 @@ class Buffer { | |
/// @brief The underlying storage host memory or device memory buffer (where | ||
/// applicable). | ||
StorageT storage_; | ||
/// @brief CUDA event used to track copy operations | ||
cudaEvent_t cuda_event_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see. Very minimally, we should definitely create the events with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That would only shift what we need to track, wouldn't it? If we had one event per stream, how would we know if a buffer was created before or after the event? From the check's perspective it could have happened either before or after and we would go back to have potentially invalid memory accesses.
Thanks for the suggestion, I'll do that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and this is why we have one event per buffer. If we had one event per stream we could end up with:
That is what I don't think will work, or are you suggesting something different and I misunderstood the suggestion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is now done in 1180b8e . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But in that case aren't we just blocking until all buffers are completed? This would potentially prevent us from progressing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An additional complication is that you would also need to keep track of the streams and events globally, meaning we probably wouldn't be able to get an arbitrary |
||
}; | ||
|
||
} // namespace rapidsmpf |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,9 +256,12 @@ class BufferResource { | |
* @brief Move device buffer data into a Buffer. | ||
* | ||
* @param data A unique pointer to the device buffer. | ||
* @param stream CUDA stream for the operation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What "operation" is being performed such that a stream is required? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, tried to clarify it in 3055053, let me know if you think we should make it even more clear. |
||
* @return A unique pointer to the resulting Buffer. | ||
*/ | ||
std::unique_ptr<Buffer> move(std::unique_ptr<rmm::device_buffer> data); | ||
std::unique_ptr<Buffer> move( | ||
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream | ||
); | ||
|
||
/** | ||
* @brief Move a Buffer to the specified memory type. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -426,6 +426,11 @@ class Communicator { | |
* @param rank The destination rank. | ||
* @param tag Message tag for identification. | ||
* @return A unique pointer to a `Future` representing the asynchronous operation. | ||
* | ||
* @warning The caller is responsible to ensure the underlying `Buffer` allocation | ||
* and data are already valid before calling, for example, when a CUDA allocation | ||
* and/or copy are done asynchronously. Specifically, the caller should ensure | ||
* `Buffer::is_ready()` returns true before calling this function. | ||
*/ | ||
[[nodiscard]] virtual std::unique_ptr<Future> send( | ||
std::unique_ptr<Buffer> msg, Rank rank, Tag tag | ||
|
@@ -438,6 +443,11 @@ class Communicator { | |
* @param tag Message tag for identification. | ||
* @param recv_buffer The receive buffer. | ||
* @return A unique pointer to a `Future` representing the asynchronous operation. | ||
* | ||
* @warning The caller is responsible to ensure the underlying `Buffer` allocation | ||
* is already valid before calling, for example, when a CUDA allocation | ||
* and/or copy are done asynchronously. Specifically, the caller should ensure | ||
* `Buffer::is_ready()` returns true before calling this function. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's throw an exception if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, unfortunately that doesn't work, as @nirandaperera has noted previously in #225 (comment) . Doing that means we lose the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still, I think we should check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about a warning instead? Note that the exception will be raised in the shuffler's progress thread, so we'll probably need to handle it in some way. I know a warning is not a solution for the potential of a segfault but I think that will more clearly inform the user about what happened, rather than an exception which may not be handled correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about https://en.cppreference.com/w/cpp/error/terminate ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That works too, I would suggest that we do this now and add a new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
*/ | ||
[[nodiscard]] virtual std::unique_ptr<Future> recv( | ||
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
*/ | ||
#pragma once | ||
|
||
#include <atomic> | ||
#include <memory> | ||
#include <sstream> | ||
#include <vector> | ||
|
@@ -12,6 +13,7 @@ | |
#include <cudf/table/table.hpp> | ||
|
||
#include <rapidsmpf/buffer/buffer.hpp> | ||
#include <rapidsmpf/communicator/communicator.hpp> | ||
#include <rapidsmpf/shuffler/partition.hpp> | ||
|
||
namespace rapidsmpf::shuffler::detail { | ||
|
@@ -26,7 +28,47 @@ using ChunkID = std::uint64_t; | |
*/ | ||
class Chunk { | ||
public: | ||
/** | ||
* @brief CUDA event to provide synchronization among set of chunks. | ||
* | ||
* This event is used to serve as a synchronization point for a set of chunks | ||
* given a user-specified stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should promote this class to an object that is usable by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done as part of 11bac26 as well. |
||
*/ | ||
class Event { | ||
public: | ||
/** | ||
* @brief Construct a CUDA event for a given stream. | ||
* | ||
* @param stream CUDA stream used for device memory operations | ||
* @param log Logger to warn if object is destroyed before event is ready. | ||
*/ | ||
Event(rmm::cuda_stream_view stream, Communicator::Logger& log); | ||
|
||
/** | ||
* @brief Destructor for Event. | ||
* | ||
* Cleans up the CUDA event if one was created. If the event is not done, | ||
* it will log a warning. | ||
*/ | ||
~Event(); | ||
|
||
/** | ||
* @brief Check if the CUDA event has been completed. | ||
* | ||
* @return true if the event has been completed, false otherwise. | ||
*/ | ||
[[nodiscard]] bool is_ready(); | ||
|
||
private: | ||
cudaEvent_t event_; ///< CUDA event used to track device memory allocation | ||
Communicator::Logger& | ||
log_; ///< Logger to warn if object is destroyed before event is ready | ||
std::atomic<bool> done_{false | ||
}; ///< Cache of the event status to avoid unnecessary queries. | ||
}; | ||
|
||
PartID const pid; ///< Partition ID that this chunk belongs to. | ||
|
||
ChunkID const cid; ///< Unique ID of this chunk. | ||
|
||
/// If not zero, the number of chunks of the partition expected to get from the | ||
|
@@ -42,6 +84,9 @@ class Chunk { | |
/// GPU data buffer of the packed `cudf::table` associated with this chunk. | ||
std::unique_ptr<Buffer> gpu_data; | ||
|
||
/// CUDA event to provide synchronization among set of chunks. | ||
std::shared_ptr<Event> event; | ||
|
||
/** | ||
* @brief Construct a new chunk of a partition. | ||
* | ||
|
@@ -54,14 +99,16 @@ class Chunk { | |
* chunk. | ||
* @param gpu_data The gpu_data of the packed `cudf::table` that makes up this | ||
* chunk. | ||
* @param event CUDA event to provide synchronization among set of chunks. | ||
*/ | ||
Chunk( | ||
PartID pid, | ||
ChunkID cid, | ||
std::size_t expected_num_chunks, | ||
std::size_t gpu_data_size, | ||
std::unique_ptr<std::vector<uint8_t>> metadata, | ||
std::unique_ptr<Buffer> gpu_data | ||
std::unique_ptr<Buffer> gpu_data, | ||
std::shared_ptr<Event> event | ||
); | ||
|
||
/** | ||
|
@@ -126,6 +173,18 @@ class Chunk { | |
std::size_t max_nbytes = 512, | ||
rmm::cuda_stream_view stream = cudf::get_default_stream() | ||
) const; | ||
|
||
/** | ||
* @brief Returns true if the chunk is ready for consumption. | ||
* | ||
* Checks that the shared CUDA event and the buffer's CUDA event are both ready. | ||
* The shared CUDA event is used to synchronize the chunk's data across a set of | ||
* chunks, while the buffer's CUDA event is used to synchronize the chunk's data | ||
* if any spilling is involved. | ||
* | ||
* @return true if the chunk is ready, false otherwise. | ||
*/ | ||
[[nodiscard]] bool is_ready() const; | ||
}; | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
*/ | ||
#include <stdexcept> | ||
|
||
#include <cuda_runtime.h> | ||
|
||
#include <rapidsmpf/buffer/buffer.hpp> | ||
#include <rapidsmpf/buffer/resource.hpp> | ||
|
||
|
@@ -16,28 +18,48 @@ template <typename T> | |
RAPIDSMPF_EXPECTS(ptr, "unique pointer cannot be null", std::invalid_argument); | ||
return ptr; | ||
} | ||
|
||
// Helper to create and record a CUDA event | ||
cudaEvent_t create_and_record_event(rmm::cuda_stream_view stream) { | ||
cudaEvent_t event; | ||
RAPIDSMPF_CUDA_TRY(cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); | ||
RAPIDSMPF_CUDA_TRY(cudaEventRecord(event, stream)); | ||
return event; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think we can make this simpler by either only storing an event in the I think I am inclined to go with storing it in the That is, a Then we only need to remember to check the event on the buffer. If we start using an internal stream for copies during spilling then we should use this pattern:
Now we only need to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made those changes now in 11bac26, and I do agree we don't need the event in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I think that logging change can't be right. If we want to to log things, we should make the event construction take a reference to a logger (optional, I suppose), and we should split the event creation from the allocation then we don't need to make all these API changes, I hope. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would technically work but I don't see the point in any of that. A logger whose purpose is to warn the user when he did something wrong being optional is completely pointless, we either have it there always or there's really no need for it. Also transferring the responsibility to properly create an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user in this scenario is another developer of the library, I think. I suppose with respect to event creation I guess it's a philosophical difference. I think I prefer the case where object creation the event we're depending on is explicit, and then methods on the object the event is implicit (so you shouldn't need to provide an event when you do a copy because the API should manage that for you), but when you create an object that depends on an event explicitly (e.g. the |
||
} // namespace | ||
|
||
Buffer::Buffer(std::unique_ptr<std::vector<uint8_t>> host_buffer, BufferResource* br) | ||
: br{br}, | ||
size{host_buffer ? host_buffer->size() : 0}, | ||
storage_{std::move(host_buffer)} { | ||
storage_{std::move(host_buffer)}, | ||
cuda_event_{nullptr} { | ||
RAPIDSMPF_EXPECTS( | ||
std::get<HostStorageT>(storage_) != nullptr, "the host_buffer cannot be NULL" | ||
); | ||
RAPIDSMPF_EXPECTS(br != nullptr, "the BufferResource cannot be NULL"); | ||
} | ||
|
||
Buffer::Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br) | ||
Buffer::Buffer( | ||
std::unique_ptr<rmm::device_buffer> device_buffer, | ||
rmm::cuda_stream_view stream, | ||
BufferResource* br | ||
) | ||
: br{br}, | ||
size{device_buffer ? device_buffer->size() : 0}, | ||
storage_{std::move(device_buffer)} { | ||
storage_{std::move(device_buffer)}, | ||
cuda_event_{create_and_record_event(stream)} { | ||
RAPIDSMPF_EXPECTS( | ||
pentschev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
std::get<DeviceStorageT>(storage_) != nullptr, "the device buffer cannot be NULL" | ||
); | ||
RAPIDSMPF_EXPECTS(br != nullptr, "the BufferResource cannot be NULL"); | ||
} | ||
|
||
Buffer::~Buffer() { | ||
if (cuda_event_ != nullptr) { | ||
cudaEventDestroy(cuda_event_); | ||
} | ||
} | ||
|
||
void* Buffer::data() { | ||
return std::visit([](auto&& storage) -> void* { return storage->data(); }, storage_); | ||
} | ||
|
@@ -55,12 +77,14 @@ std::unique_ptr<Buffer> Buffer::copy(rmm::cuda_stream_view stream) const { | |
); | ||
}, | ||
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> { | ||
return std::unique_ptr<Buffer>(new Buffer{ | ||
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{ | ||
std::make_unique<rmm::device_buffer>( | ||
storage->data(), storage->size(), stream, br->device_mr() | ||
), | ||
stream, | ||
br | ||
}); | ||
return new_buffer; | ||
pentschev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}, | ||
storage_ | ||
|
@@ -76,12 +100,14 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st | |
return std::visit( | ||
overloaded{ | ||
[&](const HostStorageT& storage) -> std::unique_ptr<Buffer> { | ||
return std::unique_ptr<Buffer>(new Buffer{ | ||
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{ | ||
std::make_unique<rmm::device_buffer>( | ||
storage->data(), storage->size(), stream, br->device_mr() | ||
), | ||
stream, | ||
br | ||
}); | ||
return new_buffer; | ||
}, | ||
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> { | ||
auto ret = std::make_unique<std::vector<uint8_t>>(storage->size()); | ||
|
@@ -92,11 +118,32 @@ std::unique_ptr<Buffer> Buffer::copy(MemoryType target, rmm::cuda_stream_view st | |
cudaMemcpyDeviceToHost, | ||
stream | ||
)); | ||
return std::unique_ptr<Buffer>(new Buffer{std::move(ret), br}); | ||
auto new_buffer = std::unique_ptr<Buffer>(new Buffer{std::move(ret), br}); | ||
|
||
// The event is created here instead of the constructor because the | ||
// memcpy is async, but the buffer is created on the host. | ||
new_buffer->cuda_event_ = create_and_record_event(stream); | ||
|
||
return new_buffer; | ||
} | ||
}, | ||
storage_ | ||
); | ||
} | ||
|
||
bool Buffer::is_ready() const { | ||
if (cuda_event_ == nullptr) { | ||
return true; // No device memory operation was performed | ||
} | ||
cudaError_t status = cudaEventQuery(cuda_event_); | ||
if (status == cudaSuccess) { | ||
return true; | ||
} else if (status == cudaErrorNotReady) { | ||
return false; | ||
} else { | ||
RAPIDSMPF_CUDA_TRY(status); | ||
return false; // This line is unreachable due to the throw above | ||
} | ||
} | ||
|
||
} // namespace rapidsmpf |
Uh oh!
There was an error while loading. Please reload this page.