Skip to content

Commit c4f326a

Browse files
authored
[SYCL][CUDA][PI] Improve performance of event synchronization (#6224)
Improve performance of event synchronization by reducing the number of calls to cuStreamWaitEvent. This call is now skipped for the stream, the event is coming from. Also when enqueueing a new command with a dependency on a previous one an attempt to use the same stream will be made, so both can be waited on by only one call to cuStreamWaitEvent.
1 parent 89460e8 commit c4f326a

File tree

2 files changed

+159
-35
lines changed

2 files changed

+159
-35
lines changed

sycl/plugins/cuda/pi_cuda.cpp

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,11 @@ pi_result enqueueEventsWait(pi_queue command_queue, CUstream stream,
306306
auto result = forLatestEvents(
307307
event_wait_list, num_events_in_wait_list,
308308
[stream](pi_event event) -> pi_result {
309-
return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0));
309+
if (event->get_stream() == stream) {
310+
return PI_SUCCESS;
311+
} else {
312+
return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0));
313+
}
310314
});
311315

312316
if (result != PI_SUCCESS) {
@@ -373,18 +377,58 @@ pi_result cuda_piEventRetain(pi_event event);
373377

374378
/// \endcond
375379

376-
CUstream _pi_queue::get_next_compute_stream() {
377-
if (num_compute_streams_ < compute_streams_.size()) {
378-
// the check above is for performance - so as not to lock mutex every time
379-
std::lock_guard<std::mutex> guard(compute_stream_mutex_);
380-
// The second check is done after mutex is locked so other threads can not
381-
// change num_compute_streams_ after that
380+
CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
381+
pi_uint32 stream_i;
382+
while (true) {
382383
if (num_compute_streams_ < compute_streams_.size()) {
383-
PI_CHECK_ERROR(
384-
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
384+
// the check above is for performance - so as not to lock mutex every time
385+
std::lock_guard<std::mutex> guard(compute_stream_mutex_);
386+
// The second check is done after mutex is locked so other threads can not
387+
// change num_compute_streams_ after that
388+
if (num_compute_streams_ < compute_streams_.size()) {
389+
PI_CHECK_ERROR(
390+
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
391+
}
392+
}
393+
stream_i = compute_stream_idx_++;
394+
// if a stream has been reused before it was next selected round-robin
395+
// fashion, we want to delay its next use and instead select another one
396+
// that is more likely to have completed all the enqueued work.
397+
if (delay_compute_[stream_i % compute_streams_.size()]) {
398+
delay_compute_[stream_i % compute_streams_.size()] = false;
399+
} else {
400+
break;
385401
}
386402
}
387-
return compute_streams_[compute_stream_idx_++ % compute_streams_.size()];
403+
if (stream_token) {
404+
*stream_token = stream_i;
405+
}
406+
return compute_streams_[stream_i % compute_streams_.size()];
407+
}
408+
409+
CUstream _pi_queue::get_next_compute_stream(pi_uint32 num_events_in_wait_list,
410+
const pi_event *event_wait_list,
411+
_pi_stream_guard &guard,
412+
pi_uint32 *stream_token) {
413+
for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) {
414+
pi_uint32 token = event_wait_list[i]->get_stream_token();
415+
if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) {
416+
std::unique_lock<std::mutex> compute_sync_guard(
417+
compute_stream_sync_mutex_);
418+
// redo the check after lock to avoid data races on
419+
// last_sync_compute_streams_
420+
if (can_reuse_stream(token)) {
421+
delay_compute_[token % delay_compute_.size()] = true;
422+
if (stream_token) {
423+
*stream_token = token;
424+
}
425+
guard = _pi_stream_guard{std::move(compute_sync_guard)};
426+
return event_wait_list[i]->get_stream();
427+
}
428+
}
429+
}
430+
guard = {};
431+
return get_next_compute_stream(stream_token);
388432
}
389433

390434
CUstream _pi_queue::get_next_transfer_stream() {
@@ -405,9 +449,10 @@ CUstream _pi_queue::get_next_transfer_stream() {
405449
}
406450

407451
_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue,
408-
CUstream stream)
452+
CUstream stream, pi_uint32 stream_token)
409453
: commandType_{type}, refCount_{1}, hasBeenWaitedOn_{false},
410-
isRecorded_{false}, isStarted_{false}, evEnd_{nullptr}, evStart_{nullptr},
454+
isRecorded_{false}, isStarted_{false},
455+
streamToken_{stream_token}, evEnd_{nullptr}, evStart_{nullptr},
411456
evQueued_{nullptr}, queue_{queue}, stream_{stream}, context_{context} {
412457

413458
bool profilingEnabled = queue_->properties_ & PI_QUEUE_PROFILING_ENABLE;
@@ -2948,7 +2993,10 @@ pi_result cuda_piEnqueueKernelLaunch(
29482993

29492994
std::unique_ptr<_pi_event> retImplEv{nullptr};
29502995

2951-
CUstream cuStream = command_queue->get_next_compute_stream();
2996+
pi_uint32 stream_token;
2997+
_pi_stream_guard guard;
2998+
CUstream cuStream = command_queue->get_next_compute_stream(
2999+
num_events_in_wait_list, event_wait_list, guard, &stream_token);
29523000
CUfunction cuFunc = kernel->get();
29533001

29543002
retError = enqueueEventsWait(command_queue, cuStream,
@@ -2973,8 +3021,9 @@ pi_result cuda_piEnqueueKernelLaunch(
29733021
auto &argIndices = kernel->get_arg_indices();
29743022

29753023
if (event) {
2976-
retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native(
2977-
PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, cuStream));
3024+
retImplEv = std::unique_ptr<_pi_event>(
3025+
_pi_event::make_native(PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue,
3026+
cuStream, stream_token));
29783027
retImplEv->start();
29793028
}
29803029

@@ -3809,7 +3858,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
38093858
auto result =
38103859
forLatestEvents(event_wait_list, num_events_in_wait_list,
38113860
[command_queue](pi_event event) -> pi_result {
3812-
return enqueueEventWait(command_queue, event);
3861+
if (event->get_queue()->has_been_synchronized(
3862+
event->get_stream_token())) {
3863+
return PI_SUCCESS;
3864+
} else {
3865+
return enqueueEventWait(command_queue, event);
3866+
}
38133867
});
38143868

38153869
if (result != PI_SUCCESS) {
@@ -3818,8 +3872,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
38183872
}
38193873

38203874
if (event) {
3875+
pi_uint32 stream_token;
3876+
_pi_stream_guard guard;
3877+
CUstream cuStream = command_queue->get_next_compute_stream(
3878+
num_events_in_wait_list, event_wait_list, guard, &stream_token);
38213879
*event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue,
3822-
command_queue->get_next_compute_stream());
3880+
cuStream, stream_token);
38233881
(*event)->start();
38243882
(*event)->record();
38253883
}
@@ -4879,12 +4937,15 @@ pi_result cuda_piextUSMEnqueueMemset(pi_queue queue, void *ptr, pi_int32 value,
48794937

48804938
try {
48814939
ScopedContext active(queue->get_context());
4882-
CUstream cuStream = queue->get_next_compute_stream();
4940+
pi_uint32 stream_token;
4941+
_pi_stream_guard guard;
4942+
CUstream cuStream = queue->get_next_compute_stream(
4943+
num_events_in_waitlist, events_waitlist, guard, &stream_token);
48834944
result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist,
48844945
events_waitlist);
48854946
if (event) {
48864947
event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native(
4887-
PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream));
4948+
PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream, stream_token));
48884949
event_ptr->start();
48894950
}
48904951
result = PI_CHECK_ERROR(cuMemsetD8Async(

sycl/plugins/cuda/pi_cuda.hpp

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pi_result cuda_piKernelGetGroupInfo(pi_kernel kernel, pi_device device,
5555
/// \endcond
5656
}
5757

58+
using _pi_stream_guard = std::unique_lock<std::mutex>;
59+
5860
/// A PI platform stores all known PI devices,
5961
/// in the CUDA plugin this is just a vector of
6062
/// available devices since initialization is done
@@ -387,6 +389,11 @@ struct _pi_queue {
387389

388390
std::vector<native_type> compute_streams_;
389391
std::vector<native_type> transfer_streams_;
392+
// delay_compute_ keeps track of which streams have been recently reused and
393+
// their next use should be delayed. If a stream has been recently reused it
394+
// will be skipped the next time it would be selected round-robin style. When
395+
// skipped, its delay flag is cleared.
396+
std::vector<bool> delay_compute_;
390397
_pi_context *context_;
391398
_pi_device *device_;
392399
pi_queue_properties properties_;
@@ -399,6 +406,10 @@ struct _pi_queue {
399406
unsigned int last_sync_compute_streams_;
400407
unsigned int last_sync_transfer_streams_;
401408
unsigned int flags_;
409+
// When compute_stream_sync_mutex_ and compute_stream_mutex_ both need to be
410+
// locked at the same time, compute_stream_sync_mutex_ should be locked first
411+
// to avoid deadlocks
412+
std::mutex compute_stream_sync_mutex_;
402413
std::mutex compute_stream_mutex_;
403414
std::mutex transfer_stream_mutex_;
404415
bool has_ownership_;
@@ -408,7 +419,8 @@ struct _pi_queue {
408419
_pi_device *device, pi_queue_properties properties,
409420
unsigned int flags, bool backend_owns = true)
410421
: compute_streams_{std::move(compute_streams)},
411-
transfer_streams_{std::move(transfer_streams)}, context_{context},
422+
transfer_streams_{std::move(transfer_streams)},
423+
delay_compute_(compute_streams_.size(), false), context_{context},
412424
device_{device}, properties_{properties}, refCount_{1}, eventCount_{0},
413425
compute_stream_idx_{0}, transfer_stream_idx_{0},
414426
num_compute_streams_{0}, num_transfer_streams_{0},
@@ -425,10 +437,47 @@ struct _pi_queue {
425437

426438
// get_next_compute/transfer_stream() functions return streams from
427439
// appropriate pools in round-robin fashion
428-
native_type get_next_compute_stream();
440+
native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr);
441+
// this overload tries select a stream that was used by one of dependancies.
442+
// If that is not possible returns a new stream. If a stream is reused it
443+
// returns a lock that needs to remain locked as long as the stream is in use
444+
native_type get_next_compute_stream(pi_uint32 num_events_in_wait_list,
445+
const pi_event *event_wait_list,
446+
_pi_stream_guard &guard,
447+
pi_uint32 *stream_token = nullptr);
429448
native_type get_next_transfer_stream();
430449
native_type get() { return get_next_compute_stream(); };
431450

451+
bool has_been_synchronized(pi_uint32 stream_token) {
452+
// stream token not associated with one of the compute streams
453+
if (stream_token == std::numeric_limits<pi_uint32>::max()) {
454+
return false;
455+
}
456+
return last_sync_compute_streams_ >= stream_token;
457+
}
458+
459+
bool can_reuse_stream(pi_uint32 stream_token) {
460+
// stream token not associated with one of the compute streams
461+
if (stream_token == std::numeric_limits<pi_uint32>::max()) {
462+
return true;
463+
}
464+
// If the command represented by the stream token was not the last command
465+
// enqueued to the stream we can not reuse the stream - we need to allow for
466+
// commands enqueued after it and the one we are about to enqueue to run
467+
// concurrently
468+
bool is_last_command =
469+
(compute_stream_idx_ - stream_token) <= compute_streams_.size();
470+
// If there was a barrier enqueued to the queue after the command
471+
// represented by the stream token we should not reuse the stream, as we can
472+
// not take that stream into account for the bookkeeping for the next
473+
// barrier - such a stream would not be synchronized with. Performance-wise
474+
// it does not matter that we do not reuse the stream, as the work
475+
// represented by the stream token is guaranteed to be complete by the
476+
// barrier before any work we are about to enqueue to the stream will start,
477+
// so the event does not need to be synchronized with.
478+
return is_last_command && !has_been_synchronized(stream_token);
479+
}
480+
432481
template <typename T> void for_each_stream(T &&f) {
433482
{
434483
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
@@ -451,30 +500,39 @@ struct _pi_queue {
451500
}
452501

453502
template <typename T> void sync_streams(T &&f) {
454-
auto sync = [&f](const std::vector<CUstream> &streams, unsigned int start,
455-
unsigned int stop) {
503+
auto sync_compute = [&f, &streams = compute_streams_,
504+
&delay = delay_compute_](unsigned int start,
505+
unsigned int stop) {
506+
for (unsigned int i = start; i < stop; i++) {
507+
f(streams[i]);
508+
delay[i] = false;
509+
}
510+
};
511+
auto sync_transfer = [&f, &streams = transfer_streams_](unsigned int start,
512+
unsigned int stop) {
456513
for (unsigned int i = start; i < stop; i++) {
457514
f(streams[i]);
458515
}
459516
};
460517
{
461518
unsigned int size = static_cast<unsigned int>(compute_streams_.size());
519+
std::lock_guard compute_sync_guard(compute_stream_sync_mutex_);
462520
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
463521
unsigned int start = last_sync_compute_streams_;
464522
unsigned int end = num_compute_streams_ < size
465523
? num_compute_streams_
466524
: compute_stream_idx_.load();
467525
last_sync_compute_streams_ = end;
468526
if (end - start >= size) {
469-
sync(compute_streams_, 0, size);
527+
sync_compute(0, size);
470528
} else {
471529
start %= size;
472530
end %= size;
473531
if (start < end) {
474-
sync(compute_streams_, start, end);
532+
sync_compute(start, end);
475533
} else {
476-
sync(compute_streams_, start, size);
477-
sync(compute_streams_, 0, end);
534+
sync_compute(start, size);
535+
sync_compute(0, end);
478536
}
479537
}
480538
}
@@ -488,15 +546,15 @@ struct _pi_queue {
488546
: transfer_stream_idx_.load();
489547
last_sync_transfer_streams_ = end;
490548
if (end - start >= size) {
491-
sync(transfer_streams_, 0, size);
549+
sync_transfer(0, size);
492550
} else {
493551
start %= size;
494552
end %= size;
495553
if (start < end) {
496-
sync(transfer_streams_, start, end);
554+
sync_transfer(start, end);
497555
} else {
498-
sync(transfer_streams_, start, size);
499-
sync(transfer_streams_, 0, end);
556+
sync_transfer(start, size);
557+
sync_transfer(0, end);
500558
}
501559
}
502560
}
@@ -538,6 +596,8 @@ struct _pi_event {
538596

539597
CUstream get_stream() const noexcept { return stream_; }
540598

599+
pi_uint32 get_stream_token() const noexcept { return streamToken_; }
600+
541601
pi_command_type get_command_type() const noexcept { return commandType_; }
542602

543603
pi_uint32 get_reference_count() const noexcept { return refCount_; }
@@ -581,9 +641,11 @@ struct _pi_event {
581641
pi_uint64 get_end_time() const;
582642

583643
// construct a native CUDA. This maps closely to the underlying CUDA event.
584-
static pi_event make_native(pi_command_type type, pi_queue queue,
585-
CUstream stream) {
586-
return new _pi_event(type, queue->get_context(), queue, stream);
644+
static pi_event
645+
make_native(pi_command_type type, pi_queue queue, CUstream stream,
646+
pi_uint32 stream_token = std::numeric_limits<pi_uint32>::max()) {
647+
return new _pi_event(type, queue->get_context(), queue, stream,
648+
stream_token);
587649
}
588650

589651
pi_result release();
@@ -594,7 +656,7 @@ struct _pi_event {
594656
// This constructor is private to force programmers to use the make_native /
595657
// make_user static members in order to create a pi_event for CUDA.
596658
_pi_event(pi_command_type type, pi_context context, pi_queue queue,
597-
CUstream stream);
659+
CUstream stream, pi_uint32 stream_token);
598660

599661
pi_command_type commandType_; // The type of command associated with event.
600662

@@ -610,6 +672,7 @@ struct _pi_event {
610672
// PI event has started or not
611673
//
612674

675+
pi_uint32 streamToken_;
613676
pi_uint32 eventId_; // Queue identifier of the event.
614677

615678
native_type evEnd_; // CUDA event handle. If this _pi_event represents a user

0 commit comments

Comments
 (0)