-
Notifications
You must be signed in to change notification settings - Fork 15
Use CUDA events instead of CUDA device/stream synchronization #225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use CUDA events instead of CUDA device/stream synchronization #225
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very sold on this approach, I have to say. Can we make the comms routines stream-ordered by passing streams in, recording an event and then waiting on the event before going into the library?
@@ -208,6 +225,8 @@ class Buffer { | |||
/// @brief The underlying storage host memory or device memory buffer (where | |||
/// applicable). | |||
StorageT storage_; | |||
/// @brief CUDA event used to track copy operations | |||
cudaEvent_t cuda_event_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.
Very minimally, we should definitely create the events with cudaEventDisableTiming
so they are as lightweight as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.
That would only shift what we need to track, wouldn't it? If we had one event per stream, how would we know if a buffer was created before or after the event? From the check's perspective it could have happened either before or after and we would go back to have potentially invalid memory accesses.
Very minimally, we should definitely create the events with
cudaEventDisableTiming
so they are as lightweight as possible.
Thanks for the suggestion, I'll do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cudaEventRecord
records the state of a stream (i.e. any outstanding work on the stream is "noted") and then cudaEventWait
waits for completion of all that work. So:
do_some_allocation(stream)
cudaEventRecord(event, stream)
....
cudaEventWait(event)
# allocation guaranteed to have completed
no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and this is why we have one event per buffer. If we had one event per stream we could end up with:
buf1 = do_some_allocation(stream)
cudaEventRecord(event, stream)
buf2 = do_some_allocation(stream)
cudaEventWait(event)
# buf1 is guaranteed to have completed, but buf2 isn't
That is what I don't think will work, or are you suggesting something different and I misunderstood the suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minimally, we should definitely create the events with
cudaEventDisableTiming
so they are as lightweight as possible.Thanks for the suggestion, I'll do that.
This is now done in 1180b8e .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buf1 = do_some_allocation(stream)
cudaEventRecord(event, stream)
buf2 = do_some_allocation(stream)
cudaEventRecrod(event, stream)
cudaEventWait(event)
# now buf1 and buf2 are guaranteed completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in that case aren't we just blocking until all buffers are completed? This would potentially prevent us from progressing buf1
until buf2
has completed too, which could increase memory pressure, so I'm not really sure the benefits outweigh the costs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An additional complication is that you would also need to keep track of the streams and events globally, meaning we probably wouldn't be able to get an arbitrary Buffer
that we don't know which stream it's being allocated/copied nor where the stream's event is located. IOW, we would need to have some sort of manager visible everywhere to track streams and events.
/** | ||
* @brief Check if the last copy operation has completed. | ||
* | ||
* @return true if the copy operation has completed or no copy operation | ||
* was performed, false if it is still in progress. | ||
*/ | ||
[[nodiscard]] bool is_copy_complete() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "copy" is, I think, the wrong phrasing. I think you mean "has any stream-ordered work to allocate this buffer completed".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this only applies to Buffer::copy
at the moment, making this more general would IMO imply that it is also relevant for the constructor that just allocates without copies.
cpp/src/buffer/buffer.cpp
Outdated
} else if (status == cudaErrorNotReady) { | ||
return false; | ||
} else { | ||
RAPIDSMPF_CUDA_TRY_ALLOC(status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not also have a RAPIDSMPF_CUDA_TRY
?
If not, we should consider introducing. Or something like RAPIDSMPF_EXPECTS(status == cudaSuccess || stats == cudaErrorNotReady, "Unexpected status")
and then return status == cudaSuccess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, that was my mistake, will fix this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 23673ab .
cpp/src/shuffler/shuffler.cpp
Outdated
auto future = shuffler_.comm_->recv( | ||
src, gpu_data_tag, incoming.buffer_with_event->release() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense (probably) to make the communication routines stream-ordered, and push the sync into there.
This refactoring ensures things are correctly allocated and ready to go by not enqueuing the allocated buffer receive until it is ready (basically we take things out and then put them back in). But this is still a delicate interface and requires everyone to remember to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense (probably) to make the communication routines stream-ordered, and push the sync into there.
See #225 (comment), this would mean we block the Shuffler
progress thread.
This refactoring ensures things are correctly allocated and ready to go by not enqueuing the allocated buffer receive until it is ready (basically we take things out and then put them back in). But this is still a delicate interface and requires everyone to remember to do that.
Yes, this is fortunately only the internal implementation, but is required to make sure we don't block the progress either. We could make a larger refactoring that prevents extract/reinsert, but not really sure if that would make everything much less delicate.
Doing so means that comms become blocking, which will go against the way the |
@pentschev I am still reviewing this PR. When reading the description, I felt like we are missing something.
rmm::device_buffer construction is async and stream ordered.
|
@nirandaperera please read item 4 in the description as well as #227 (also mentioned in the description). |
@pentschev Why can't we use the |
Maybe we could, that's what #227 is about. It will take more work to get that done and maybe requires some discussion before doing that, so we should leave that for a follow-up PR. |
Let's finish #227 discussion then. For me, construction and copying are not fundamentally different. So, I propose that we handle both in the same PR. |
The proposal I see more suitable from #227 is the item 2 of the description:
This will have to touch a lot of the codebase and potentially the Python layer as well, so I think making them in separate PRs is a better option. Also note this is a critical issue that is hard to "get right" and requires testing on H100 after every iteration to guarantee it didn't break, therefore making changes in incremental steps is much more manageable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some comments. I think its best if we schedule a call and finalize the design.
cpp/src/communicator/mpi.cpp
Outdated
@@ -116,6 +116,7 @@ std::unique_ptr<Communicator::Future> MPI::send( | |||
std::unique_ptr<Communicator::Future> MPI::send( | |||
std::unique_ptr<Buffer> msg, Rank rank, Tag tag | |||
) { | |||
RAPIDSMPF_EXPECTS(msg->is_copy_complete(), "buffer copy has not completed yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg
is already moved to send
. Now, if we throw here, we are loosing data, right? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, you're right. It is unfortunate that you're right because this means there's no good way to push this check onto the communicator and we thus have to force the caller to ensure that. I'll revert those changes and update the docstrings to make the contract to the caller clear: the caller needs to ensure the allocation and data are ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in eded688 .
cpp/src/shuffler/shuffler.cpp
Outdated
/** | ||
* @brief Combination of a Buffer with associated CUDA event. | ||
* | ||
* Combining a CUDA event with a buffer allows us to track the completion of | ||
* the asynchronous allocation of device memory. It allows disabling the event | ||
* if it is not needed, for example, when allocating an empty buffer or if it | ||
* is known to that the allocation has been completed. | ||
*/ | ||
class BufferWithEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cuda event is already a class member of the Buffer
right? Why do we need a separate class here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The (valid) event only exists for Buffer::copy
, here there's no Buffer::copy
being called, only the Buffer
constructor, in which case the event does not get created. This again boils down to the #227 discussion.
cpp/src/shuffler/shuffler.cpp
Outdated
[[nodiscard]] MemoryType mem_type() const { | ||
return buffer_->mem_type(); | ||
} | ||
|
||
/** | ||
* @brief Get the size of the buffer. | ||
* | ||
* @return The size of the buffer in bytes. | ||
*/ | ||
[[nodiscard]] std::size_t size() const { | ||
return buffer_->size; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these can be constexpr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, Buffer::size
and Buffer::mem_type()
aren't constexpr
.
cpp/src/shuffler/shuffler.cpp
Outdated
@@ -34,21 +138,27 @@ namespace { | |||
* @param size The size of the buffer in bytes. | |||
* @param stream CUDA stream to use for device allocations. | |||
* @param br Buffer resource used for the reservation and allocation. | |||
* @param log Logger to warn if object is destroyed before event is ready. | |||
* @param enable_event Whether to track CUDA events for this buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this is redundant. If this is a device buffer, enable_event
will always be true
isnt it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't, see the docstrings. This is used in practice for empty allocations here, otherwise we immediately get warnings in the destructor, because the event will not have been completed, further discussion why empty allocations are different in #226 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc comments needs to be updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 640c2bf .
cpp/src/shuffler/shuffler.cpp
Outdated
} catch (std::logic_error& e) { | ||
RAPIDSMPF_EXPECTS( | ||
outgoing_chunks_ | ||
.insert({ready_for_data_msg.cid, std::move(chunk)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this chunk
doesn't have any gpu_data
now, does it?
I personally dislike the blanket exception catch here. This catches non-synchronization exceptions like these as well.
I suggest using a return flag, like pair<bool, unique_ptr<Buffer>>
as the output from comm::send
. This will allow us to throw and exit as well as return with failure gracefully (and revisit later).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. However, returning this return type will be very clunky to manage, in fact the current return type is a Future
, which would make doing something like what you suggested even more clunky. We would need something like pair<bool, unique_ptr<Future>>
and the user would be responsible to check the value of the bool
and then if the send failed extract the Buffer
from Future
to then continue, at this point it is much simpler for comm::send
and the caller to just add a Buffer::is_copy_complete()
before calling comm::send
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per #225 (comment), I'll revert those changes and update the docstring to clarify the contract.
Co-authored-by: Niranda Perera <niranda.perera@gmail.com>
cpp/src/shuffler/shuffler.cpp
Outdated
@@ -34,21 +138,27 @@ namespace { | |||
* @param size The size of the buffer in bytes. | |||
* @param stream CUDA stream to use for device allocations. | |||
* @param br Buffer resource used for the reservation and allocation. | |||
* @param log Logger to warn if object is destroyed before event is ready. | |||
* @param enable_event Whether to track CUDA events for this buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc comments needs to be updated
cpp/src/shuffler/shuffler.cpp
Outdated
struct IncomingChunk { | ||
detail::Chunk chunk; | ||
std::unique_ptr<Buffer> buffer; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its better to address this. The idea AFAIU is, chunks will be moved from the incoming_chunks_
map to in-transit map ONLY when the receive buffers are allocated AND they are ready.
...
if (chunk.gpu_data_size > 0) {
// chunk has data
if (!chunk.gpu_data){
// buffer has not been allocated. allocate one
chunk.gpu_data = ...;
}
// chunk.gpu_data is valid
if (!chunk.gpu_data.is_ready()){
// data buffer is not ready to receive data. Let's move on, and come back later.
it++;
}
// data buffer is allocated and ready. Schedule recv
...
// move the chunk to intransit chunks and future to intransit future
...
} else {
// chunk has no data. Just a control message. Nothing else to receive. create empty buffer with a null event.
// Pass to outbox.
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @pentschev.
I think we should merge this PR and then work on a follow-up PR that implement and test multi-stream support using ideas from #242, which might include the user to create events explicitly.
* @warning The caller is responsible to ensure the underlying `Buffer` allocation | ||
* is already valid before calling, for example, when a CUDA allocation | ||
* and/or copy are done asynchronously. Specifically, the caller should ensure | ||
* `Buffer::is_ready()` returns true before calling this function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's throw an exception if Buffer::is_ready() == false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, unfortunately that doesn't work, as @nirandaperera has noted previously in #225 (comment) . Doing that means we lose the std::unique_ptr<Buffer>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, I think we should check Buffer::is_ready() == false
. It might be unrecoverable but still better than a segfault. But let's make it clear in the doc that the buffer has been moved and freed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a warning instead? Note that the exception will be raised in the shuffler's progress thread, so we'll probably need to handle it in some way. I know a warning is not a solution for the potential of a segfault but I think that will more clearly inform the user about what happened, rather than an exception which may not be handled correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about https://en.cppreference.com/w/cpp/error/terminate ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works too, I would suggest that we do this now and add a new ABORT
log-level that both logs an error and immediately terminate. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
cpp/src/shuffler/chunk.cpp
Outdated
@@ -87,6 +86,10 @@ std::unique_ptr<cudf::table> Chunk::unpack(rmm::cuda_stream_view stream) const { | |||
return unpack_and_concat(std::move(packed_vec), stream, br->device_mr()); | |||
} | |||
|
|||
bool Chunk::is_ready() const { | |||
return !gpu_data || gpu_data->is_ready(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a small nuance here. Chunk serves 2 purposes,
- Carrying messages
- Flagging end-of-messages (by setting a non-zero value for
expected_num_chunks
).
I think this should have the following logic, IINM. @madsbk WDYT?
if (expected_num_chunks > 0){
// always ready. Doesn't have any data buffers
return true;
} else { // chunk has data, and its not ready until the data buffer is ready
// now, gpu_data buffer needs to be set at some point
if (gpu_data){
return gpu_data->is_ready();
} else {
return false;
}
}
This could reduce to,
return (expected_num_chunks > 0) || (gpu_data && gpu_data->is_ready());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, I don't want to block the PR on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (expected_num_chunks > 0) || (gpu_data && gpu_data->is_ready());
Agree, this will make the intention more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this now, but it wasn't immediately obvious. I think it makes sense to remove expected_num_chunks
from the public Chunk
constructor that gets gpu_data
, I've applied your suggestion and mine in de3db36, let me know if there's any reason not to make the change with constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the private constructor remains there because it's used in Chunk::from_metadata_message
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nirandaperera pointed the condition from the commit mentioned above was wrong, it's now fixed in 294d5b9.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on constructors. I think its much more clearer now. LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just made a small comment on Chunk::is_ready
impl. Other than that, all good.
@pentschev thanks for entertaining my laundry list of comments 🙂
@pentschev all LGTM. Let's merge this on green CI |
It seems A100 queues are currently long, I'll nevertheless trigger automerge. Thanks all for the reviews! |
/merge |
As per our discussion last Friday we were ok following Mads' preference here, which was to get this PR merged for the near-term.
The changes contained here resolve #216 , preventing the need for synchronizing CUDA devices and streams and instead relying on events to determine whether the
Buffer
operations have completed. At a high-level, the changes here do:Buffer
device operations implementing a newis_ready()
method that ensure the CUDA allocation and/or copy have completed on the stream.Chunk::Event
is used to validate all chunks that are inserted on theShuffler
have reached the event, ensuring data is ready to be consumed;is_ready()
method is used to validate allBuffer
s before they are consumed for bothsend()
andrecv()
operations.Communicator
'ssend()
/recv()
operations, informing the caller is responsible to check whetherBuffer::is_ready()
before making the call.