Skip to content

Use CUDA events instead of CUDA device/stream synchronization #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
May 5, 2025

Conversation

pentschev
Copy link
Member

@pentschev pentschev commented Apr 24, 2025

The changes contained here resolve #216 , preventing the need for synchronizing CUDA devices and streams and instead relying on events to determine whether the Buffer operations have completed. At a high-level, the changes here do:

  1. Add CUDA event-based tracking for Buffer device operations implementing a new is_ready() method that ensure the CUDA allocation and/or copy have completed on the stream.
  2. A new shared Chunk::Event is used to validate all chunks that are inserted on the Shuffler have reached the event, ensuring data is ready to be consumed;
  3. A new is_ready() method is used to validate all Buffers before they are consumed for both send() and recv() operations.
  4. Update contract of Communicator's send()/recv() operations, informing the caller is responsible to check whether Buffer::is_ready() before making the call.

@pentschev pentschev added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Apr 24, 2025
@pentschev pentschev requested a review from a team as a code owner April 24, 2025 12:01
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very sold on this approach, I have to say. Can we make the comms routines stream-ordered by passing streams in, recording an event and then waiting on the event before going into the library?

@@ -208,6 +225,8 @@ class Buffer {
/// @brief The underlying storage host memory or device memory buffer (where
/// applicable).
StorageT storage_;
/// @brief CUDA event used to track copy operations
cudaEvent_t cuda_event_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like this design. This produces one event per buffer that we create whereas really we only need one event per stream that we see.

That would only shift what we need to track, wouldn't it? If we had one event per stream, how would we know if a buffer was created before or after the event? From the check's perspective it could have happened either before or after and we would go back to have potentially invalid memory accesses.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cudaEventRecord records the state of a stream (i.e. any outstanding work on the stream is "noted") and then cudaEventWait waits for completion of all that work. So:

do_some_allocation(stream)

cudaEventRecord(event, stream)

....
cudaEventWait(event)

# allocation guaranteed to have completed

no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is why we have one event per buffer. If we had one event per stream we could end up with:

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventWait(event)

# buf1 is guaranteed to have completed, but buf2 isn't

That is what I don't think will work, or are you suggesting something different and I misunderstood the suggestion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minimally, we should definitely create the events with cudaEventDisableTiming so they are as lightweight as possible.

Thanks for the suggestion, I'll do that.

This is now done in 1180b8e .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf1 = do_some_allocation(stream)

cudaEventRecord(event, stream)

buf2 = do_some_allocation(stream)

cudaEventRecrod(event, stream)
cudaEventWait(event)

# now buf1 and buf2 are guaranteed completed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in that case aren't we just blocking until all buffers are completed? This would potentially prevent us from progressing buf1 until buf2 has completed too, which could increase memory pressure, so I'm not really sure the benefits outweigh the costs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional complication is that you would also need to keep track of the streams and events globally, meaning we probably wouldn't be able to get an arbitrary Buffer that we don't know which stream it's being allocated/copied nor where the stream's event is located. IOW, we would need to have some sort of manager visible everywhere to track streams and events.

Comment on lines 127 to 133
/**
* @brief Check if the last copy operation has completed.
*
* @return true if the copy operation has completed or no copy operation
* was performed, false if it is still in progress.
*/
[[nodiscard]] bool is_copy_complete() const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "copy" is, I think, the wrong phrasing. I think you mean "has any stream-ordered work to allocate this buffer completed".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this only applies to Buffer::copy at the moment, making this more general would IMO imply that it is also relevant for the constructor that just allocates without copies.

} else if (status == cudaErrorNotReady) {
return false;
} else {
RAPIDSMPF_CUDA_TRY_ALLOC(status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not also have a RAPIDSMPF_CUDA_TRY?

If not, we should consider introducing. Or something like RAPIDSMPF_EXPECTS(status == cudaSuccess || stats == cudaErrorNotReady, "Unexpected status") and then return status == cudaSuccess

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, that was my mistake, will fix this too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 23673ab .

Comment on lines 371 to 373
auto future = shuffler_.comm_->recv(
src, gpu_data_tag, incoming.buffer_with_event->release()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense (probably) to make the communication routines stream-ordered, and push the sync into there.

This refactoring ensures things are correctly allocated and ready to go by not enqueuing the allocated buffer receive until it is ready (basically we take things out and then put them back in). But this is still a delicate interface and requires everyone to remember to do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense (probably) to make the communication routines stream-ordered, and push the sync into there.

See #225 (comment), this would mean we block the Shuffler progress thread.

This refactoring ensures things are correctly allocated and ready to go by not enqueuing the allocated buffer receive until it is ready (basically we take things out and then put them back in). But this is still a delicate interface and requires everyone to remember to do that.

Yes, this is fortunately only the internal implementation, but is required to make sure we don't block the progress either. We could make a larger refactoring that prevents extract/reinsert, but not really sure if that would make everything much less delicate.

@pentschev
Copy link
Member Author

I am not very sold on this approach, I have to say. Can we make the comms routines stream-ordered by passing streams in, recording an event and then waiting on the event before going into the library?

Doing so means that comms become blocking, which will go against the way the Shuffler progress is expected to go: just progressing asynchronous work.

@nirandaperera
Copy link
Contributor

@pentschev I am still reviewing this PR. When reading the description, I felt like we are missing something.
Synchronization guarantees are not just related to the copy right? Even buffer creation is async. So, we during receive, we can have a scenario when the BufferResource::allocate

std::unique_ptr<Buffer> allocate(
is async, but the data ptr is not ready. This is because rmm::device_buffer construction is async and stream ordered.

@pentschev
Copy link
Member Author

@pentschev I am still reviewing this PR. When reading the description, I felt like we are missing something. Synchronization guarantees are not just related to the copy right? Even buffer creation is async. So, we during receive, we can have a scenario when the BufferResource::allocate

std::unique_ptr<Buffer> allocate(

is async, but the data ptr is not ready. This is because rmm::device_buffer construction is async and stream ordered.

@nirandaperera please read item 4 in the description as well as #227 (also mentioned in the description).

@nirandaperera
Copy link
Contributor

@pentschev Why can't we use the Buffer::cuda_event_ event for allocation, rather than a separate BufferWithEvent class?

@pentschev
Copy link
Member Author

@pentschev Why can't we use the Buffer::cuda_event_ event for allocation, rather than a separate BufferWithEvent class?

Maybe we could, that's what #227 is about. It will take more work to get that done and maybe requires some discussion before doing that, so we should leave that for a follow-up PR.

@nirandaperera
Copy link
Contributor

Let's finish #227 discussion then. For me, construction and copying are not fundamentally different. So, I propose that we handle both in the same PR.

@pentschev
Copy link
Member Author

Let's finish #227 discussion then. For me, construction and copying are not fundamentally different. So, I propose that we handle both in the same PR.

The proposal I see more suitable from #227 is the item 2 of the description:

Write a mechanism similar to that added in #225 for Buffer::copy but for allocations as well. The difficulty in here is that the user will always be required to specify the same rmm::cuda_stream_view to both the rmm::device_buffer and Buffer constructors, this requires careful analysis of how the rmm::device_buffer is being created, whether using the default CUDA stream, PTDS or an explicit asynchronous CUDA stream.

This will have to touch a lot of the codebase and potentially the Python layer as well, so I think making them in separate PRs is a better option. Also note this is a critical issue that is hard to "get right" and requires testing on H100 after every iteration to guarantee it didn't break, therefore making changes in incremental steps is much more manageable.

Copy link
Contributor

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some comments. I think its best if we schedule a call and finalize the design.

@@ -116,6 +116,7 @@ std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Communicator::Future> MPI::send(
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
) {
RAPIDSMPF_EXPECTS(msg->is_copy_complete(), "buffer copy has not completed yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg is already moved to send. Now, if we throw here, we are loosing data, right? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, you're right. It is unfortunate that you're right because this means there's no good way to push this check onto the communicator and we thus have to force the caller to ensure that. I'll revert those changes and update the docstrings to make the contract to the caller clear: the caller needs to ensure the allocation and data are ready.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in eded688 .

Comment on lines 27 to 35
/**
* @brief Combination of a Buffer with associated CUDA event.
*
* Combining a CUDA event with a buffer allows us to track the completion of
* the asynchronous allocation of device memory. It allows disabling the event
* if it is not needed, for example, when allocating an empty buffer or if it
* is known to that the allocation has been completed.
*/
class BufferWithEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cuda event is already a class member of the Buffer right? Why do we need a separate class here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (valid) event only exists for Buffer::copy, here there's no Buffer::copy being called, only the Buffer constructor, in which case the event does not get created. This again boils down to the #227 discussion.

Comment on lines 110 to 121
[[nodiscard]] MemoryType mem_type() const {
return buffer_->mem_type();
}

/**
* @brief Get the size of the buffer.
*
* @return The size of the buffer in bytes.
*/
[[nodiscard]] std::size_t size() const {
return buffer_->size;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these can be constexpr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, Buffer::size and Buffer::mem_type() aren't constexpr.

@@ -34,21 +138,27 @@ namespace {
* @param size The size of the buffer in bytes.
* @param stream CUDA stream to use for device allocations.
* @param br Buffer resource used for the reservation and allocation.
* @param log Logger to warn if object is destroyed before event is ready.
* @param enable_event Whether to track CUDA events for this buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is redundant. If this is a device buffer, enable_event will always be true isnt it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't, see the docstrings. This is used in practice for empty allocations here, otherwise we immediately get warnings in the destructor, because the event will not have been completed, further discussion why empty allocations are different in #226 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc comments needs to be updated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 640c2bf .

} catch (std::logic_error& e) {
RAPIDSMPF_EXPECTS(
outgoing_chunks_
.insert({ready_for_data_msg.cid, std::move(chunk)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this chunk doesn't have any gpu_data now, does it?

I personally dislike the blanket exception catch here. This catches non-synchronization exceptions like these as well.
I suggest using a return flag, like pair<bool, unique_ptr<Buffer>> as the output from comm::send. This will allow us to throw and exit as well as return with failure gracefully (and revisit later).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. However, returning this return type will be very clunky to manage, in fact the current return type is a Future, which would make doing something like what you suggested even more clunky. We would need something like pair<bool, unique_ptr<Future>> and the user would be responsible to check the value of the bool and then if the send failed extract the Buffer from Future to then continue, at this point it is much simpler for comm::send and the caller to just add a Buffer::is_copy_complete() before calling comm::send.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per #225 (comment), I'll revert those changes and update the docstring to clarify the contract.

@@ -34,21 +138,27 @@ namespace {
* @param size The size of the buffer in bytes.
* @param stream CUDA stream to use for device allocations.
* @param br Buffer resource used for the reservation and allocation.
* @param log Logger to warn if object is destroyed before event is ready.
* @param enable_event Whether to track CUDA events for this buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc comments needs to be updated

Comment on lines 375 to 378
struct IncomingChunk {
detail::Chunk chunk;
std::unique_ptr<Buffer> buffer;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to address this. The idea AFAIU is, chunks will be moved from the incoming_chunks_ map to in-transit map ONLY when the receive buffers are allocated AND they are ready.

...
if (chunk.gpu_data_size > 0) {
   // chunk has data 
   if (!chunk.gpu_data){
     // buffer has not been allocated. allocate one 
     chunk.gpu_data = ...; 
   }
   
   // chunk.gpu_data is valid
   if (!chunk.gpu_data.is_ready()){
      // data buffer is not ready to receive data. Let's move on, and come back later. 
     it++;
   }
   
   // data buffer is allocated and ready. Schedule recv 
   ...
   // move the chunk to intransit chunks and future to intransit future
   ...
} else {
 // chunk has no data. Just a control message. Nothing else to receive. create empty buffer with a null event. 
// Pass to outbox. 
...
}

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @pentschev.

I think we should merge this PR and then work on a follow-up PR that implement and test multi-stream support using ideas from #242, which might include the user to create events explicitly.

Comment on lines 447 to 450
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
* is already valid before calling, for example, when a CUDA allocation
* and/or copy are done asynchronously. Specifically, the caller should ensure
* `Buffer::is_ready()` returns true before calling this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's throw an exception if Buffer::is_ready() == false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately that doesn't work, as @nirandaperera has noted previously in #225 (comment) . Doing that means we lose the std::unique_ptr<Buffer>.

Copy link
Member

@madsbk madsbk May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I think we should check Buffer::is_ready() == false. It might be unrecoverable but still better than a segfault. But let's make it clear in the doc that the buffer has been moved and freed!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a warning instead? Note that the exception will be raised in the shuffler's progress thread, so we'll probably need to handle it in some way. I know a warning is not a solution for the potential of a segfault but I think that will more clearly inform the user about what happened, rather than an exception which may not be handled correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works too, I would suggest that we do this now and add a new ABORT log-level that both logs an error and immediately terminate. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in cb0b159 and opened #246 to track this as well.

Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
@@ -87,6 +86,10 @@ std::unique_ptr<cudf::table> Chunk::unpack(rmm::cuda_stream_view stream) const {
return unpack_and_concat(std::move(packed_vec), stream, br->device_mr());
}

bool Chunk::is_ready() const {
return !gpu_data || gpu_data->is_ready();
Copy link
Contributor

@nirandaperera nirandaperera May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small nuance here. Chunk serves 2 purposes,

  1. Carrying messages
  2. Flagging end-of-messages (by setting a non-zero value for expected_num_chunks).

I think this should have the following logic, IINM. @madsbk WDYT?

   if (expected_num_chunks > 0){
      // always ready. Doesn't have any data buffers 
      return true;
   } else { // chunk has data, and its not ready until the data buffer is ready 
      // now, gpu_data buffer needs to be set at some point 
      if (gpu_data){
         return gpu_data->is_ready();
      } else {
         return false;
      } 
   }

This could reduce to,

return (expected_num_chunks > 0) || (gpu_data && gpu_data->is_ready()); 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, I don't want to block the PR on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return (expected_num_chunks > 0) || (gpu_data && gpu_data->is_ready());

Agree, this will make the intention more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this now, but it wasn't immediately obvious. I think it makes sense to remove expected_num_chunks from the public Chunk constructor that gets gpu_data, I've applied your suggestion and mine in de3db36, let me know if there's any reason not to make the change with constructors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the private constructor remains there because it's used in Chunk::from_metadata_message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirandaperera pointed the condition from the commit mentioned above was wrong, it's now fixed in 294d5b9.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on constructors. I think its much more clearer now. LGTM

Copy link
Contributor

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made a small comment on Chunk::is_ready impl. Other than that, all good.
@pentschev thanks for entertaining my laundry list of comments 🙂

@nirandaperera
Copy link
Contributor

@pentschev all LGTM. Let's merge this on green CI

@pentschev
Copy link
Member Author

It seems A100 queues are currently long, I'll nevertheless trigger automerge. Thanks all for the reviews!

@pentschev
Copy link
Member Author

/merge

@pentschev pentschev dismissed wence-’s stale review May 5, 2025 21:40

As per our discussion last Friday we were ok following Mads' preference here, which was to get this PR merged for the near-term.

@rapids-bot rapids-bot bot merged commit 0317f38 into rapidsai:branch-25.06 May 5, 2025
21 checks passed
@pentschev pentschev deleted the buffer-cuda-event branch May 6, 2025 08:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improves an existing functionality non-breaking Introduces a non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Synchronize CUDA streams in communicators
4 participants