Skip to content

Conversation

@bechols97
Copy link

@bechols97 bechols97 commented Apr 28, 2025

Summary

This PR adds a feature for device messages (i.e. store function arguments on device to be handled by a host callback at a later time). The original idea for the feature is to have better error handling on the device with handling any output on with a host callback. This moves the code from Camp to RAJA due dependency on atomic operations.

Design review

For the design, there are some open questions. (regarding these open questions please see the Design notes below)

  1. Currently, this design supports a MPSC model with the expectation that the device side will produce messages while a single host thread consumes messages.
  2. Additionally, before consuming messages, the current implementation forces the stream to synchronize.
  3. If there are more messages than the buffer size, then those messages are lost. This is to avoid waits on the device side. However, would it be beneficial to have a configurable option to use a circular buffer?

Design notes

Based on discussion from a meeting:

  • As suggested in the meeting, there should be policies for the message queue to support different use cases and allows the message queue to be extended if there are any future use cases.
  • Based on current use cases, the default policy of the message queue should support a MPSC model, wait all should synchronize the stream, and not storing additional messages when the buffer is full.
  • The message_handler class that stores the callback should be move-only to prevent accidental copies to a lambda with supporting a view-like queue to copy to device kernels (as mentioned below by @MrBurmark).

  - This moves message container to RAJA to avoid dependency
    issues with required atomic operation. Currently, testing and
    waiting for messages will block until the stream is synchronized.
@MrBurmark
Copy link
Member

I think the design would benefit from a view like object that can be used in device kernels.

@artv3
Copy link
Member

artv3 commented May 11, 2025

@bechols97 , can you add an example in the examples folder? I have a use case where this may be handy. In my application if a thread gets a negative value, I want to take note and output it at the end of the kernel by the root rank. Currently I am using printf and every thread that encounters the negative value spews information onto the screen. To double check, would this be a use case?

@bechols97
Copy link
Author

Hi @artv3, yes that could be a use case for this. I will add an example of something similar to the examples folder.

bechols97 added 4 commits May 12, 2025 09:04
  - This allows the message queue to be passed to RAJA kernels
  - This also allows the message queue to be allocated with pinned
    memory when needed
  - Currently, example requires XNACK with HIP. (message queue should be using
    pinned memory so need to look into this)
@rcarson3
Copy link
Member

@bechols97 so one of the libraries I maintain has a failure macro that on the host side throw an error with some useful error messages associated with it which can provide useful context for users / developers for why something failed. Do you think we could emulate something like with this framework if we could provide the absolute max size of the char array / string literal that we'd like and then at the error site have that passed into your message class?

@MrBurmark
Copy link
Member

I think it would be possible to add a fixed capacity string-like object that could be passed through this interface.

@bechols97
Copy link
Author

@rcarson3 Yes, the original intention behind this idea is to be used as a device side error handler (though left to be more generic in case there are other use cases). As @MrBurmark mentioned, it would be possible to use a fixed-string object with the message handler. For string literals, you would want some type of fixed-string object / char array to store the string that is later passed to a host side callback.

In addition to the fixed-string object, any type that is trivially destructible should work as well.

@bechols97 bechols97 requested a review from a team June 3, 2025 20:10
@MrBurmark
Copy link
Member

MrBurmark commented Jun 5, 2025

The current state looks good if you're trying to handle a single kind of error.

RAJA::resources::Host res{};

auto logger = RAJA::message_handler<void(int*, int, int)>(num_messages, res, 
  [](int* ptr, int idx, int value) {
    std::cout << "\n pointer " << ptr << " a[" << idx << "] = " << value << "\n";
  }
);

auto cpu_msg_queue = logger.get_queue<RAJA::mpsc_queue>();
RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    cpu_msg_queue.try_post_message(a, i, a[i]); 
  }
});

logger.wait_all();

@MrBurmark
Copy link
Member

There are use cases where we might want to handle multiple kinds of errors each with different data in the same loop. Does anyone else have such a use case? What do you think of a slightly more general interface that looks more like this?

By moving the signature to the queues we don't know the message sizes upfront. So I'm not sure if it makes sense to do the sizing upfront or later when we know what kinds of messages are possible.

RAJA::resources::Host res{};

auto logger = RAJA::message_handler(res, num_bytes);

auto queue1 = logger.get_queue<RAJA::mpsc_queue, void(int*, int, int)>(num_messages, host, 
  [](int idx, int* a, int val_a) {
    std::cout << "\n Oh no! a{" << a << "}[" << idx << "] = " << val_a<< "\n";
  }
);

auto queue2 = logger.get_queue<RAJA::mpsc_queue, void(int*, int, int)>(num_messages, host, 
  [](int idx, int* a, int val_a, double* b, double val_b) {
    std::cout << "\n Inconceivable! a{" << a << "}[" << idx << "] = " << val_a <<
                             " and  b{" << b << "}[" << idx << "] = " << val_b << "\n";
  }
);

RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    queue1.try_post_message(i, a, a[i]); 
  }
  if (a[i] == 0 && b[i] < 0) { 
    queue2.try_post_message(i, a, a[i], b, b[i]); 
  }
});

logger.wait_all();

@MrBurmark
Copy link
Member

Another use case that I'm considering is having a long lived logger with an allocation. Then I can enqueue multiple types of messages in that while keeping the gpu running and check for messages occasionally to avoid extra synchronizes.
I am not sure this is feasible however as most of our logging use cases involve catching an error and stopping. If we did continue running we would likely encounter a hard error like a seg fault later.

@bechols97
Copy link
Author

bechols97 commented Jun 5, 2025

Being able to support multiple error/logging messages within the same loop is definitely a use case that we would want to support. This is something that the library I help maintain uses.

There are a couple of concerns with moving the callback to be a parameter of the get_queue function:

  • The callback is currently stored as a std::function in message_handler. Moving the callback to the get_queue the callback could still technically be stored in message_handler, but this would likely require additional virtual functions, which could increase the overhead on the host side. This seems reasonable since for the most part since the main use cases are for debugging/error handling where the callbacks aren't expected to be called often.
  • If the callback is stored in msg_queue, then queue1 and queue2 now have ownership of the std::function. This would then be copied to the RAJA kernel and likely run into compilation issues the device execution spaces.

Just to show another option with the current interface: (Please note this example is not entirely the same and requires some additional types to be created; however, one could create a type similar to std::inplace_vactor<.., 5> and std::variant to store up to 5 arguments with better type safety):

union msg_arg {
  int i;
  double d;
};

enum msg_type
{
  MSG_INT,
  MSG_DLB
};

RAJA::resources::Host res{};

auto logger = RAJA::message_handler<void(msg_type, int, void*, msg_arg)>(res, num_msg, 
  [] (msg_type type, int idx, void* ptr, msg_arg val) {
    if (type == msg_type::MSG_INT) {
      std::cout << "\n Oh no! a{" <<ptr << "}[" << idx << "] = " << val.i << "\n";
    } else if (type == msg_type::MSG_DBL) {
      std::cout << "\n Inconceivable!  b{" << ptr << "}[" << idx << "] = " << val.d << "\n";
    }
});
auto queue = logger.get_queue<RAJA::mpsc_queue>();

RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    queue.try_post_message(msg_type::MSG_INT, i, a, a[i]); 
  }
  if (a[i] == 0 && b[i] < 0) { 
    queue.try_post_message(msg_type::MSG_DBL, i, b, b[i]); 
  }
});

logger.wait_all()

Comment on lines +35 to +38
// This is a simplified example fixed string to show how
// custom types can be used with the message queue.
template <std::size_t N>
class my_string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding an example with a "string-like" type in it :) It's helps provide me a road map for how I could replace my current Warning / Error reporting on the GPUs with this framework.

@rhornung67 rhornung67 marked this pull request as ready for review September 30, 2025 16:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants