Simplify passing data between threads with this portable C language callback library.
- Asynchronous Multicast Callbacks in C
- Table of Contents
- Preface
- Introduction
- Callbacks Background
- Using the Code
- SysData Publisher Example
- Examples
- Callback Signature Limitations
- Implementation
- Heap
- Porting
- Asynchronous Library Comparison
- References
Originally published on CodeProject at Asynchronous Multicast Callbacks in C with a perfect 5.0 feedback rating.
CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.
Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my event data from one thread to another safely. A portable and easy to use framework. No more monster switch statements inside a thread loop that typecast OS message queue void*
values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal.
On systems that use event loops, a message queue and switch statement are sometimes employed to hande incoming messages. Over time, the event loop function grows larger and larger as more message types are dispatched to the thread. Weird hacks are added in order to solve various system issues. Ultimately what ensues is a fragile, hard to read function that is constantly touched by engineers over the life of the project. This solution simplifies and standardizes the event loop in order to generalize the movement of data between threads.
The C language callback solution presented here provides the following features:
- Asynchronous callbacks – support asynchronous callbacks to and from any thread
- Thread targeting – specify the destination thread for the asynchronous callback
- Callbacks – invoke any C or C++ free/static function with a matching signature
- Type safe – user defined, type safe callback function data argument
- Multicast callbacks – store multiple callbacks within an array for sequential invocation
- Thread-safe – suitable for use on a multi-threaded system
- Compact – small, easy to maintain code base consuming minimal code space
- Portable – portable to an embedded or PC-based platform
- Any compiler – support for any C language compiler
- Any OS - easy porting to any operating system
- Elegant syntax – intuitive and easy to use
The asynchronous callback paradigm significantly eases multithreaded application development by placing the callback function pointer and callback data onto the thread of control that you specify. Exposing a callback interface for a single module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.
The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.
Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.
If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.
One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.
A publisher uses the CB_DECLARE
macro to expose a callback interface to potential subscribers, typically within a header file. The first argument is the callback name. The second argument is the callback function argument type. In the example below, int*
is the callback function argument.
CB_DECLARE(TestCb, int*)
The publisher uses the CB_DEFINE
macro within a source file to complete the callback definition. The first argument is the callback name. The second argument is the callback function argument type. The third argument is the size of the data pointed to by the callback function argument. The last argument is the maximum number of subscribers that can register for callback notifications.
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)
To subscribe to callback, create a function (static
class member or global) as shown. I’ll explain why the function signature argument requires a (int*, void*)
function signature shortly.
void TestCallback1(int* val, void* userData)
{
printf(“TestCallback1 %d”, *val);
}
The subscriber registers to receive callbacks using the CB_Register()
function macro. The first argument is the callback name. The second argument is a pointer to the callback function. The third argument is a pointer to a thread dispatch function or NULL
if a synchronous callback is desired. And the last argument is a pointer to optional user data passed during callback invocation. The framework internally does nothing with user data other than pass it back to the callback function. The user data value can be anything the caller wants or NULL
.
CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);
On C/C++ mixed projects, the userData
callback argument can be used to store a this
class instance pointer. Pass a class static
member function pointer for the callback function and a this
pointer for user data to CB_Register()
. Within the subscriber callback function, typecast userData
back to a class instance pointer. This provides an easy means of accessing class instance functions and data within a static
callback function.
Use CB_Invoke()
when a publisher needs to invoke the callback for all registered subscribers. The function dispatches the callback and data argument onto the destination thread of control. In the example below, TestCallback1()
is called on DispatchCallbackThread1
.
int data = 123;
CB_Invoke(TestCb, &data);
Use CB_Unregister()
to unsubscribe from a callback.
CB_Unregister(TestCb, TestCallback1, DispatchCallbackThread1);
Asynchronous callbacks are easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.
SysData
is a simple module showing how to expose an outgoing asynchronous interface. The module stores system data and provides asynchronous subscriber notifications when the mode changes. The module interface is shown below.
typedef enum
{
STARTING,
NORMAL,
SERVICE,
SYS_INOP
} SystemModeType;
typedef struct
{
SystemModeType PreviousSystemMode;
SystemModeType CurrentSystemMode;
} SystemModeData;
// Declare a SysData callback interface
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)
void SD_Init(void);
void SD_Term(void);
void SD_SetSystemMode(SystemModeType systemMode);
The publisher callback interface is SystemModeChangedCb
. Calling SD_SetSystemMode()
saves the new mode into _systemMode
and notifies all registered subscribers.
void SD_SetSystemMode(SystemModeType systemMode)
{
LK_LOCK(_hLock);
// Create the callback data
SystemModeData callbackData;
callbackData.PreviousSystemMode = _systemMode;
callbackData.CurrentSystemMode = systemMode;
// Update the system mode
_systemMode = systemMode;
// Callback all registered subscribers
CB_Invoke(SystemModeChangedCb, &callbackData);
LK_UNLOCK(_hLock);
}
The subscriber creates a callback function that conforms to the publisher's callback function signature.
void SysDataCallback(const SystemModeData* data, void* userData)
{
cout << "SysDataCallback: " << data->CurrentSystemMode << endl;
}
At runtime, CB_Register()
is used to register for SysData
callbacks on DispatchCallbackThread1
.
CB_Register(SystemModeChangedCb, SysDataCallback, DispatchCallbackThread1, NULL);
When SD_SetSystemMode()
is called, any client interested in the mode changes are notified asynchronously on their desired execution thread.
SD_SetSystemMode(STARTING);
SD_SetSystemMode(NORMAL);
SysDataNoLock
is an alternate implementation that uses a private callback for setting the system mode asynchronously and without locks.
// Declare a public SysData callback interface
CB_DECLARE(SystemModeChangedNoLockCb, const SystemModeData*)
void SDNL_Init(void);
void SDNL_Term(void);
void SDNL_SetSystemMode(SystemModeType systemMode);
The initialize function registers with the private SetSystemModeCb
callback.
// Define a private callback interface
CB_DECLARE(SetSystemModeCb, SystemModeType*)
CB_DEFINE(SetSystemModeCb, SystemModeType*, sizeof(SystemModeType), 1)
void SDNL_Init(void)
{
// Register with private callback
CB_Register(SetSystemModeCb, SDNL_SetSystemModePrivate, DispatchCallbackThread1, NULL);
}
The SSNL_SetSystemMode()
function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but, under the hood, a private function call is invoked asynchronously. In this case, invoking SetSystemModeCb
causes SDNL_SetSystemModePrivate()
to be called on DispatchCallbackThread1
.
void SDNL_SetSystemMode(SystemModeType systemMode)
{
// Invoke the private callback. SDNL_SetSystemModePrivate() will be called
// on DispatchCallbackThread1.
CB_Invoke(SetSystemModeCb, &systemMode);
}
Since this private function is always invoked asynchronously on DispatchCallbackThread1
it doesn't require locks.
static void SDNL_SetSystemModePrivate(SystemModeType* systemMode, void* userData)
{
// Create the callback data
SystemModeData callbackData;
callbackData.PreviousSystemMode = _systemMode;
callbackData.CurrentSystemMode = *systemMode;
// Update the system mode
_systemMode = *systemMode;
// Callback all registered subscribers
CB_Invoke(SystemModeChangedNoLockCb, &callbackData);
}
This design has the following limitations imposed on all callback functions:
- Each callback handles a single user-defined argument type.
- The argument may be a
const
or non-const
pointer (e.g.const MyData*
orMyData*
). - The two callback function arguments are always:
MyData*
andvoid*
. - Each callback has a
void
return type.
For instance, if a callback is declared as:
CB_DECLARE(TestCb, const MyData*)
The callback function signature is:
void MyCallback(const MyData* data, void* userData);
The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:
- Dynamically create an instance to a struct or class and populate data.
- Post a pointer to the data through an OS message as a
void*
. - Get the data from the OS message queue and typecast the
void*
back to the original type. - Delete the dynamically created data.
In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class/struct and used as the callback data argument.
The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.
The implementation uses macros and token pasting to provide a unique type-safe interface for each callback. The token pasting operator (##
) is used to merge two tokens when the preprocessor expands the macro. The CB_DECLARE
macro is shown below.
#define CB_DECLARE(cbName, cbArg) \
typedef void(*cbName##CallbackFuncType)(cbArg cbData, void* cbUserData); \
BOOL cbName##_Register(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData); \
BOOL cbName##_IsRegistered(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); \
BOOL cbName##_Unregister(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); \
BOOL cbName##_Invoke(cbArg cbData); \
BOOL cbName##_InvokeArray(cbArg cbData, size_t num, size_t size);
In the SysData
example used above, the compiler preprocessor expands CB_DECLARE
to:
typedef void(*SystemModeChangedCbCallbackFuncType)(const SystemModeData* cbData, void* cbUserData);
BOOL SystemModeChangedCb_Register(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData);
BOOL SystemModeChangedCb_IsRegistered(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc);
BOOL SystemModeChangedCb_Unregister(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc);
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData);
BOOL SystemModeChangedCb_InvokeArray(const SystemModeData* cbData, size_t num, size_t size);
Notice every cbName##
location is replaced by the macro name argument, in this case, being SystemModeChangedCb
from the declaration below.
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)
Similarly, the CB_DEFINE
macro expands to create the callback function implementations. Notice the macro provides a thin, type-safe wrapper around private functions such as _CB_AddCallback()
and _CB_Dispatch()
. If attempting to register the wrong function signature, the compiler generates an error or warning. The macros automate the monotonous, boilerplate code that you’d normally write by hand.
The registered callbacks are stored in a static
array of CB_Info
instances. Calling CB_Invoke(SystemModeChangedCb, &callbackData)
executes SystemModeChangedCb_Invoke()
. Then _CB_Dispatch()
iterates over the CB_Info
array and dispatches one CB_CallbackMsg
message to each target thread. The message data is dynamically created to travel through an OS message queue.
// Macro generated unique invoke function
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData)
{
return _CB_Dispatch(&SystemModeChangedCbMulticast[0], 2, cbData, sizeof(SystemModeData));
}
BOOL _CB_Dispatch(CB_Info* cbInfo, size_t cbInfoLen, const void* cbData,
size_t cbDataSize)
{
BOOL invoked = FALSE;
LK_LOCK(_hLock);
// For each CB_Info instance within the array
for (size_t idx = 0; idx<cbInfoLen; idx++)
{
// Is a client registered?
if (cbInfo[idx].cbFunc)
{
// Dispatch callback onto the OS task
if (CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize))
{
invoked = TRUE;
}
}
}
LK_UNLOCK(_hLock);
return invoked;
}
The target OS task event loop dequeues a CB_CallbackMsg*
and calls CB_TargetInvoke()
. The dynamic data created is freed before the function exits.
void CB_TargetInvoke(const CB_CallbackMsg* cbMsg)
{
ASSERT_TRUE(cbMsg);
ASSERT_TRUE(cbMsg->cbFunc);
// Invoke callback function with the callback data
cbMsg->cbFunc(cbMsg->cbData, cbMsg->cbUserData);
// Free data sent through OS queue
XFREE((void*)cbMsg->cbData);
XFREE((void*)cbMsg);
}
Asynchronous callbacks impose certain limitations because everything the callback destination thread must be created on the heap, packaged into a CB_CallbackMsg
structure, and placed into an OS message queue.
The insertion into an OS queue is platform specific. The CB_DispatchCallbackFuncType
function pointer typedef
provides the OS queue interface to be implemented for each thread event loop on the target platform. See the Porting section below for a more complete discussion.
typedef BOOL (*CB_DispatchCallbackFuncType)(const CB_CallbackMsg* cbMsg);
Once the message is placed into the message queue, platform specific code unpacks the message and calls the CB_TargetInvoke()
function and destroys dynamically allocated data. For this example, a simple WorkerThreadStd
class provides the thread event loop leveraging the C++ thread support library. While this example uses C++ threads, the callback modules are written in plain C. Abstracting the OS details from the callback implementation makes this possible.
void WorkerThread::Process()
{
while (1)
{
ThreadMsg* msg = 0;
{
// Wait for a message to be added to the queue
std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
switch (msg->GetId())
{
case MSG_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg->GetData() != NULL);
// Convert the ThreadMsg void* data back to a CB_CallbackMsg*
const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());
// Invoke the callback on the target thread
CB_TargetInvoke(callbackMsg);
// Delete dynamic data passed through message queue
delete msg;
break;
}
}
}
}
Notice the thread loop is unlike most systems that have a huge switch statement handling various incoming data messages, type casting void*
data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_DELEGATE
message. Once setup, the same small thread loop handles every callback. New publishers and subscribers come and go as the system is designed, but the code in-between doesn't change.
This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here you do none of that. All the stuff in-between is neatly handled for users.
The dynamic data is required to send data structures through the message queue. Remember, the data pointed to by your callback argument is bitwise copied during a callback.
On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The x_allocator
implementation solves the dynamic storage issues and is much faster than the global heap. To use, just define USE_CALLBACK_ALLOCATOR
within callback.c. See the References section for more information on x_allocator
.
The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into four directories.
- Callback – core library implementation files
- Port – Windows-specific files (thread/lock)
- Examples – sample code showing usage
- Allocator – optional fixed-block memory allocator
Porting to another platform requires implementing a dispatch function that accepts a const CB_CallbackMsg*
for each thread. The functions below show an example.
// C language interface to a callback dispatch function
extern "C" BOOL DispatchCallbackThread1(const CB_CallbackMsg* cbMsg)
{
workerThread1.DispatchCallback(cbMsg);
return TRUE;
}
void WorkerThread::DispatchCallback(const CB_CallbackMsg* msg)
{
ASSERT_TRUE(m_thread);
A
// Create a new ThreadMsg
ThreadMsg* threadMsg = new ThreadMsg(MSG_DISPATCH_DELEGATE, msg);
// Add dispatch delegate msg to queue and notify worker thread
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
The thread event loop gets the message and calls the CB_TargetInvoke()
function. The data sent through the queue is deleted once complete.
case MSG_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg->GetData() != NULL);
// Convert the ThreadMsg void* data back to a CB_CallbackMsg*
const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());
// Invoke the callback on the target thread
CB_TargetInvoke(callbackMsg);
// Delete dynamic data passed through message queue
delete msg;
break;
}
Software locks are handled by the LockGuard
module. This file can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places. Define USE_LOCKS
within callback.c to use LockGuard
module locks.
Asynchronous function invocation allows for easy movement of data between threads. The table below summarizes the various asynchronous function invocation implementations available in C and C++.
Repository | Language | Key Delegate Features | Notes |
---|---|---|---|
DelegateMQ | C++17 | * Function-like template syntax * Any delegate target function type (member, static, free, lambda) * N target function arguments * N delegate subscribers * Variadic templates * Template metaprogramming |
* Modern C++ * Invoke synchronously, asynchronously or remotely * Extensive unit tests |
AsyncCallback | C++ | * Traditional template syntax * Delegate target function type (static, free) * 1 target function argument * N delegate subscribers |
* Low lines of source code * Most compact C++ implementation * Any C++ compiler |
C_AsyncCallback | C | * Macros provide type-safety * Delegate target function type (static, free) * 1 target function argument * Fixed delegate subscribers (set at compile time) * Optional fixed block allocator |
* Low lines of source code * Very compact implementation * Any C compiler |
- A Fixed Block Memory Allocator in C - by David Lafreniere