Skip to content

Ensure data are ready for comms libraries with events #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: branch-25.06
Choose a base branch
from

Conversation

wence-
Copy link
Contributor

@wence- wence- commented May 2, 2025

Rather than using device synchronisation, use events to track work on streams for buffers and only hand over the buffer data pointer to the comms library when the event is ready.

@wence-
Copy link
Contributor Author

wence- commented May 2, 2025

An alternative approach (borrowing a bunch of the same ideas) to #225.

Copy link
Contributor Author

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signposts/thinking

}

Event::~Event() {
cudaEventDestroy(event_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread safety of the dtor is handled by the shared_ptr storage of events.

Comment on lines +49 to +44
if (!done_.load(std::memory_order_relaxed)) {
auto result = cudaEventQuery(event_);
if (result == cudaSuccess) {
done_.store(true, std::memory_order_relaxed);
return true;
} else if (result != cudaErrorNotReady) {
RAPIDSMPF_CUDA_TRY(result);
}
}
return done_.load(std::memory_order_relaxed);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do some stuff to ensure that cudaEventQuery is really only called once, but I think this is OK.

Comment on lines 30 to 36
void Event::add_as_dependency_to(rmm::cuda_stream_view stream) const {
if (!query()) {
RAPIDSMPF_CUDA_TRY(
cudaStreamWaitEvent(stream.value(), event_, cudaEventWaitDefault)
);
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preparing for if we start using an internal stream inside the shuffler that is distinct from the user stream. In that case, when we copy a user buffer on the internal stream we must add a dependency on the work on the user's stream.

@@ -51,6 +51,37 @@ std::vector<Chunk> PostBox<KeyType>::extract_all() {
return ret;
}

template <typename KeyType>
std::vector<Chunk> PostBox<KeyType>::extract_all_ready() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken straight from #225.

@@ -175,7 +176,7 @@ class Shuffler::Progress {

// Check for new chunks in the inbox and send off their metadata.
auto const t0_send_metadata = Clock::now();
for (auto&& chunk : shuffler_.outgoing_chunks_.extract_all()) {
for (auto&& chunk : shuffler_.outgoing_chunks_.extract_all_ready()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only process ready chunks.

Comment on lines 77 to 87
[&](const DeviceStorageT& storage) -> std::unique_ptr<Buffer> {
add_as_dependency_to(stream);
auto event = std::make_shared<Event>();
auto buf = std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
);
event->record(stream);
return std::unique_ptr<Buffer>(
new Buffer{std::move(buf), br, std::move(event)}
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer may have work queued on a stream that is not stream, so add a dependency (call cudaStreamWaitEvent) on that work before doing the copy, and then record a new event for the copy for the return value.

Comment on lines 103 to 111
// Have to ensure that any async work is complete.
synchronize();
auto buf = std::make_unique<rmm::device_buffer>(
storage->data(), storage->size(), stream, br->device_mr()
);
event->record(stream);

return std::unique_ptr<Buffer>(
new Buffer{std::move(buf), br, std::move(event)}
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Host to device copy must first synchronize to ensure that the host data is valid, and then record the memcpy for the return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this one can add a stream dependency as well...

Comment on lines 115 to 125
add_as_dependency_to(stream);
auto ret = std::make_unique<std::vector<uint8_t>>(storage->size());
RAPIDSMPF_CUDA_TRY_ALLOC(cudaMemcpyAsync(
ret->data(),
storage->data(),
storage->size(),
cudaMemcpyDeviceToHost,
stream
stream.value()
));
return std::unique_ptr<Buffer>(new Buffer{std::move(ret), br});
event->record(stream);
return std::unique_ptr<Buffer>(
new Buffer{std::move(ret), br, std::move(event)}
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Device to host copy needs to add a dependency on the buffer's event on stream and then record the memcpy for the return value.

Comment on lines +118 to +121
std::unique_ptr<Buffer> BufferResource::move(
std::unique_ptr<rmm::device_buffer> data, std::shared_ptr<Event> event
) {
return std::unique_ptr<Buffer>(new Buffer{std::move(data), this, event});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking ownership of a device_buffer now needs and event (and the ctor will ensure this is non-null appropriately).

@@ -105,11 +112,13 @@ std::unique_ptr<Buffer> BufferResource::allocate(
}

std::unique_ptr<Buffer> BufferResource::move(std::unique_ptr<std::vector<uint8_t>> data) {
return std::make_unique<Buffer>(Buffer{std::move(data), this});
return std::unique_ptr<Buffer>(new Buffer{std::move(data), this, nullptr});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking ownership of host data never needs an event: the caller promises to synchronise if necessary first.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind going with either #225 or #242, but to be honest I don't quite see what is the big difference. The only differences I see from here from #225 is that you created a new file for Event and added a few more methods, like synchronize() (which I personally feel it's a bad idea to even have most of the time), and add_as_dependency_to which after a high-level look I still fail to understand how it adds any important information that cannot be capture by recording the event.

Comment on lines 114 to 117
/**
* @brief ensure all stream-ordered work to populate the buffer is completed.
*/
void synchronize() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but how do we prevent abuse then? In my mind, providing an interface like this is recipe to "go do the easy thing".

Comment on lines 103 to 104
// Have to ensure that any async work is complete.
synchronize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one was wrong, so I fixed it with a stream dependency.

Comment on lines 71 to 72
// Have to ensure that any async work is complete.
synchronize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have to ensure async work is complete? Your source is host memory, seems like anything that has been done with it should have been synchronized before, it should never be up to the consumer to magically figure out how the host buffer has been processed, how do you know it is even the right stream then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose you spill a device side buffer to Buffer as host memory. You did that with cudaMemcpyAsync. Now you have a Buffer with host memory, but it might not be ready. So if you then copy it to a new Buffer with host memory you can either fail if it's not is_ready or you synchronize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ok, but when would that happen in practice? I'd argue the consumer needs to checking the Buffer (or whatever other object) that delivered the host memory should be checked if is_ready(), otherwise, to use your words, we are now putting a "big hammer" (although slightly smaller than device sync) here. What is the argument for whoever called Buffer::copy() not to have ensured that in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't do the synchronisation then every time you do a buffer host-to-host copy you must check that that is what you're doing and then sync. Or else check is_ready and then if you're not ready, push it back into a copy queue?

That seems like a very difficult to manage API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we care about host-to-host copy, why wouldn't they be sync? Sure, if the source buffer you're consuming has been copied into from the device, then yes, but having to make that assumption for every host buffer just seems wrong. Can you point to a practical scenario where this happens today?

Perhaps I'm missing something, but it doesn't feel like the right implementation to me that we have to add an implicit synchronization before consuming a host buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In almost all cases the source host buffer will have no event and thus the sync is a noop. But sometimes it might

@wence- wence- force-pushed the wence/event-ideas branch from 27113a8 to c8fe69b Compare May 2, 2025 16:20
@wence-
Copy link
Contributor Author

wence- commented May 2, 2025

@pentschev I think this turns out very similar to your #225 PR. Some differences:

  • I rely on shared_ptr to make destruction of Events thread safe: I think this could be done in yours too, which would simplify the Event class.
  • the Buffer ctors directly take Events rather than streams.
  • For copies from host buffers to host buffers, I add synchronisation (because you need it before calling memcpy).

@wence-
Copy link
Contributor Author

wence- commented May 2, 2025

and add_as_dependency_to which after a high-level look I still fail to understand how it adds any important information that cannot be capture by recording the event.

Suppose the application stream is streamA and the shuffler is internally using a stream streamB.

When we take ownership of the application buffer we record the state of streamA in an event.

Then suppose we spill that data, but we're using streamB to call the cudaMemcpyAsync. Unless we insert a dependency between streamA and streamB we will have an invalid result. The way to do this is to call cudaStreamWaitEvent(streamB, event) which is what add_as_dependency_to does.

@pentschev
Copy link
Member

Suppose the application stream is streamA and the shuffler is internally using a stream streamB.

When we take ownership of the application buffer we record the state of streamA in an event.

Then suppose we spill that data, but we're using streamB to call the cudaMemcpyAsync. Unless we insert a dependency between streamA and streamB we will have an invalid result. The way to do this is to call cudaStreamWaitEvent(streamB, event) which is what add_as_dependency_to does.

Fair, but to me this is beyond the scope of this change, although since it's already here it's fine, otherwise I would have argued it should be part of a follow-up PR where Shuffler uses another stream. I mean we could just do everything we can think of are good changes in one PR, but I feel there's value in keeping things minimal so we don't risk introducing more complexity than necessary and simultaneously risking introducing another bug, but that's just me.

@wence-
Copy link
Contributor Author

wence- commented May 2, 2025

Suppose the application stream is streamA and the shuffler is internally using a stream streamB.
When we take ownership of the application buffer we record the state of streamA in an event.
Then suppose we spill that data, but we're using streamB to call the cudaMemcpyAsync. Unless we insert a dependency between streamA and streamB we will have an invalid result. The way to do this is to call cudaStreamWaitEvent(streamB, event) which is what add_as_dependency_to does.

Fair, but to me this is beyond the scope of this change, although since it's already here it's fine, otherwise I would have argued it should be part of a follow-up PR where Shuffler uses another stream. I mean we could just do everything we can think of are good changes in one PR, but I feel there's value in keeping things minimal so we don't risk introducing more complexity than necessary and simultaneously risking introducing another bug, but that's just me.

I can certainly rip it out, but we still need to think about what to do about host-to-host copies (given that the source buffer in that scenario might have stream-queued work).

wence- added 8 commits May 2, 2025 17:04
This will be used to ensure that when passing to a library that is not
stream-aware all work used to populate the Buffer is complete.
Now that buffers track their state with events, we can extract only
those buffers which are ready to use with a library that is not
stream-aware.
@wence- wence- force-pushed the wence/event-ideas branch from c8fe69b to bcd6162 Compare May 2, 2025 17:05
@wence-
Copy link
Contributor Author

wence- commented May 2, 2025

I can certainly rip it out, but we still need to think about what to do about host-to-host copies (given that the source buffer in that scenario might have stream-queued work).

Added a RAPIDSMPF_EXPECTS(is_ready()) instead and removed all the synchronize APIs and add_as_dependency_to calls.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants