Skip to content

endurodave/C_AsyncCallback

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License MIT conan Ubuntu conan Ubuntu conan Windows

Asynchronous Multicast Callbacks in C

Simplify passing data between threads with this portable C language callback library.

Table of Contents

Preface

Originally published on CodeProject at Asynchronous Multicast Callbacks in C with a perfect 5.0 feedback rating.

CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.

Introduction

Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my event data from one thread to another safely. A portable and easy to use framework. No more monster switch statements inside a thread loop that typecast OS message queue void* values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal. 

On systems that use event loops, a message queue and switch statement are sometimes employed to hande incoming messages. Over time, the event loop function grows larger and larger as more message types are dispatched to the thread. Weird hacks are added in order to solve various system issues. Ultimately what ensues is a fragile, hard to read function that is constantly touched by engineers over the life of the project. This solution simplifies and standardizes the event loop in order to generalize the movement of data between threads. 

The C language callback solution presented here provides the following features:

  • Asynchronous callbacks – support asynchronous callbacks to and from any thread
  • Thread targeting – specify the destination thread for the asynchronous callback
  • Callbacks – invoke any C or C++ free/static function with a matching signature
  • Type safe – user defined, type safe callback function data argument
  • Multicast callbacks – store multiple callbacks within an array for sequential invocation
  • Thread-safe – suitable for use on a multi-threaded system
  • Compact – small, easy to maintain code base consuming minimal code space
  • Portable – portable to an embedded or PC-based platform
  • Any compiler – support for any C language compiler
  • Any OS - easy porting to any operating system
  • Elegant syntax – intuitive and easy to use

The asynchronous callback paradigm significantly eases multithreaded application development by placing the callback function pointer and callback data onto the thread of control that you specify. Exposing a callback interface for a single module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.

Callbacks Background

The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.

Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.

If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.

One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.

Using the Code

A publisher uses the CB_DECLARE macro to expose a callback interface to potential subscribers, typically within a header file. The first argument is the callback name. The second argument is the callback function argument type. In the example below, int* is the callback function argument.

CB_DECLARE(TestCb, int*)

The publisher uses the CB_DEFINE macro within a source file to complete the callback definition. The first argument is the callback name. The second argument is the callback function argument type. The third argument is the size of the data pointed to by the callback function argument. The last argument is the maximum number of subscribers that can register for callback notifications. 

CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)

To subscribe to callback, create a function (static class member or global) as shown. I’ll explain why the function signature argument requires a (int*, void*) function signature shortly.

void TestCallback1(int* val, void* userData)
{
    printf(“TestCallback1 %d”, *val);
}

The subscriber registers to receive callbacks using the CB_Register() function macro. The first argument is the callback name. The second argument is a pointer to the callback function. The third argument is a pointer to a thread dispatch function or NULL if a synchronous callback is desired. And the last argument is a pointer to optional user data passed during callback invocation. The framework internally does nothing with user data other than pass it back to the callback function. The user data value can be anything the caller wants or NULL.

CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);

On C/C++ mixed projects, the userData callback argument can be used to store a this class instance pointer. Pass a class static member function pointer for the callback function and a this pointer for user data to CB_Register(). Within the subscriber callback function, typecast userData back to a class instance pointer. This provides an easy means of accessing class instance functions and data within a static callback function.  

Use CB_Invoke() when a publisher needs to invoke the callback for all registered subscribers. The function dispatches the callback and data argument onto the destination thread of control. In the example below, TestCallback1() is called on DispatchCallbackThread1

int data = 123;
CB_Invoke(TestCb, &data);

Use CB_Unregister() to unsubscribe from a callback.

CB_Unregister(TestCb, TestCallback1, DispatchCallbackThread1);

Asynchronous callbacks are easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.

SysData Publisher Example

SysData is a simple module showing how to expose an outgoing asynchronous interface. The module stores system data and provides asynchronous subscriber notifications when the mode changes. The module interface is shown below.

typedef enum
{
    STARTING,
    NORMAL,
    SERVICE,
    SYS_INOP
} SystemModeType;

typedef struct
{
    SystemModeType PreviousSystemMode;
    SystemModeType CurrentSystemMode;
} SystemModeData;

// Declare a SysData callback interface
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

void SD_Init(void);
void SD_Term(void);
void SD_SetSystemMode(SystemModeType systemMode);

The publisher callback interface is SystemModeChangedCb. Calling SD_SetSystemMode() saves the new mode into _systemMode and notifies all registered subscribers.

void SD_SetSystemMode(SystemModeType systemMode)
{
    LK_LOCK(_hLock);

    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    _systemMode = systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedCb, &callbackData);

    LK_UNLOCK(_hLock);
}

Examples

SysData Subscriber Example

The subscriber creates a callback function that conforms to the publisher's callback function signature. 

void SysDataCallback(const SystemModeData* data, void* userData)
{
    cout << "SysDataCallback: " << data->CurrentSystemMode << endl;
}

At runtime, CB_Register() is used to register for SysData callbacks on DispatchCallbackThread1

CB_Register(SystemModeChangedCb, SysDataCallback, DispatchCallbackThread1, NULL);

When SD_SetSystemMode() is called, any client interested in the mode changes are notified asynchronously on their desired execution thread.

SD_SetSystemMode(STARTING);
SD_SetSystemMode(NORMAL);

SysDataNoLock Publisher Example

SysDataNoLock is an alternate implementation that uses a private callback for setting the system mode asynchronously and without locks.

// Declare a public SysData callback interface
CB_DECLARE(SystemModeChangedNoLockCb, const SystemModeData*)

void SDNL_Init(void);
void SDNL_Term(void);
void SDNL_SetSystemMode(SystemModeType systemMode);

The initialize function registers with the private SetSystemModeCb callback. 

// Define a private callback interface
CB_DECLARE(SetSystemModeCb, SystemModeType*)
CB_DEFINE(SetSystemModeCb, SystemModeType*, sizeof(SystemModeType), 1)

void SDNL_Init(void)
{
    // Register with private callback
    CB_Register(SetSystemModeCb, SDNL_SetSystemModePrivate, DispatchCallbackThread1, NULL);
}

The SSNL_SetSystemMode() function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but, under the hood, a private function call is invoked asynchronously. In this case, invoking SetSystemModeCb causes SDNL_SetSystemModePrivate() to be called on DispatchCallbackThread1.

void SDNL_SetSystemMode(SystemModeType systemMode)
{
    // Invoke the private callback. SDNL_SetSystemModePrivate() will be called
    // on DispatchCallbackThread1.
    CB_Invoke(SetSystemModeCb, &systemMode);
}

Since this private function is always invoked asynchronously on DispatchCallbackThread1 it doesn't require locks.

static void SDNL_SetSystemModePrivate(SystemModeType* systemMode, void* userData)
{
    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = *systemMode;

    // Update the system mode
    _systemMode = *systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedNoLockCb, &callbackData);
}

Callback Signature Limitations

This design has the following limitations imposed on all callback functions:

  1. Each callback handles a single user-defined argument type.
  2. The argument may be a const or non-const pointer (e.g. const MyData* or MyData*).
  3. The two callback function arguments are always: MyData* and void*.
  4. Each callback has a void return type.

For instance, if a callback is declared as:

CB_DECLARE(TestCb, const MyData*)

The callback function signature is:

void MyCallback(const MyData* data, void* userData);

The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:

  1. Dynamically create an instance to a struct or class and populate data.
  2. Post a pointer to the data through an OS message as a void*.
  3. Get the data from the OS message queue and typecast the void* back to the original type.
  4. Delete the dynamically created data.

In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class/struct and used as the callback data argument.

Implementation

The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.

The implementation uses macros and token pasting to provide a unique type-safe interface for each callback. The token pasting operator (##) is used to merge two tokens when the preprocessor expands the macro. The CB_DECLARE macro is shown below.

#define CB_DECLARE(cbName, cbArg) \
    typedef void(*cbName##CallbackFuncType)(cbArg cbData, void* cbUserData); \
    BOOL cbName##_Register(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData); \
    BOOL cbName##_IsRegistered(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Unregister(cbName##CallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Invoke(cbArg cbData); \
    BOOL cbName##_InvokeArray(cbArg cbData, size_t num, size_t size);

In the SysData example used above, the compiler preprocessor expands CB_DECLARE to:

typedef void(*SystemModeChangedCbCallbackFuncType)(const SystemModeData* cbData, void* cbUserData); 

BOOL SystemModeChangedCb_Register(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData);
 
BOOL SystemModeChangedCb_IsRegistered(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Unregister(SystemModeChangedCbCallbackFuncType cbFunc, CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData); 

BOOL SystemModeChangedCb_InvokeArray(const SystemModeData* cbData, size_t num, size_t size);

Notice every cbName## location is replaced by the macro name argument, in this case, being SystemModeChangedCb from the declaration below.

CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

Similarly, the CB_DEFINE macro expands to create the callback function implementations. Notice the macro provides a thin, type-safe wrapper around private functions such as _CB_AddCallback() and _CB_Dispatch(). If attempting to register the wrong function signature, the compiler generates an error or warning. The macros automate the monotonous, boilerplate code that you’d normally write by hand.

The registered callbacks are stored in a static array of CB_Info instances. Calling CB_Invoke(SystemModeChangedCb, &callbackData) executes SystemModeChangedCb_Invoke(). Then _CB_Dispatch() iterates over the CB_Info array and dispatches one CB_CallbackMsg message to each target thread. The message data is dynamically created to travel through an OS message queue.

// Macro generated unique invoke function
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData) 
{ 
    return _CB_Dispatch(&SystemModeChangedCbMulticast[0], 2, cbData, sizeof(SystemModeData)); 
}

BOOL _CB_Dispatch(CB_Info* cbInfo, size_t cbInfoLen, const void* cbData, 
    size_t cbDataSize)
{
    BOOL invoked = FALSE;

    LK_LOCK(_hLock);

    // For each CB_Info instance within the array
    for (size_t idx = 0; idx<cbInfoLen; idx++)
    {
        // Is a client registered?
        if (cbInfo[idx].cbFunc)
        {
            // Dispatch callback onto the OS task
            if (CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize))
            {
                invoked = TRUE;
            }
        }
    }

    LK_UNLOCK(_hLock);
    return invoked;
}

The target OS task event loop dequeues a CB_CallbackMsg* and calls CB_TargetInvoke(). The dynamic data created is freed before the function exits. 

void CB_TargetInvoke(const CB_CallbackMsg* cbMsg)
{
    ASSERT_TRUE(cbMsg);
    ASSERT_TRUE(cbMsg->cbFunc);

    // Invoke callback function with the callback data
    cbMsg->cbFunc(cbMsg->cbData, cbMsg->cbUserData);

    // Free data sent through OS queue
    XFREE((void*)cbMsg->cbData);
    XFREE((void*)cbMsg);
} 

Asynchronous callbacks impose certain limitations because everything the callback destination thread must be created on the heap, packaged into a CB_CallbackMsg structure, and placed into an OS message queue.

The insertion into an OS queue is platform specific. The CB_DispatchCallbackFuncType function pointer typedef provides the OS queue interface to be implemented for each thread event loop on the target platform. See the Porting section below for a more complete discussion.

typedef BOOL (*CB_DispatchCallbackFuncType)(const CB_CallbackMsg* cbMsg);

Once the message is placed into the message queue, platform specific code unpacks the message and calls the CB_TargetInvoke() function and destroys dynamically allocated data. For this example, a simple WorkerThreadStd class provides the thread event loop leveraging the C++ thread support library. While this example uses C++ threads, the callback modules are written in plain C. Abstracting the OS details from the callback implementation makes this possible. 

void WorkerThread::Process()
{
    while (1)
    {
        ThreadMsg* msg = 0;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->GetId())
        {
            case MSG_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg->GetData() != NULL);

                // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
                const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());

                // Invoke the callback on the target thread
                CB_TargetInvoke(callbackMsg);

                // Delete dynamic data passed through message queue
                delete msg;
                break;
            }
        }
    }
}

Notice the thread loop is unlike most systems that have a huge switch statement handling various incoming data messages, type casting void* data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_DELEGATE message. Once setup, the same small thread loop handles every callback. New publishers and subscribers come and go as the system is designed, but the code in-between doesn't change.

This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here you do none of that. All the stuff in-between is neatly handled for users.

Heap

The dynamic data is required to send data structures through the message queue. Remember, the data pointed to by your callback argument is bitwise copied during a callback. 

On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The x_allocator implementation solves the dynamic storage issues and is much faster than the global heap. To use, just define USE_CALLBACK_ALLOCATOR within callback.c. See the References section for more information on x_allocator.

Porting

The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into four directories.

  1. Callback – core library implementation files
  2. Port – Windows-specific files (thread/lock)
  3. Examples – sample code showing usage
  4. Allocator – optional fixed-block memory allocator

Porting to another platform requires implementing a dispatch function that accepts a const CB_CallbackMsg* for each thread. The functions below show an example.

// C language interface to a callback dispatch function
extern "C" BOOL DispatchCallbackThread1(const CB_CallbackMsg* cbMsg)
{
    workerThread1.DispatchCallback(cbMsg);
    return TRUE;
}

void WorkerThread::DispatchCallback(const CB_CallbackMsg* msg)
{
    ASSERT_TRUE(m_thread);
A
    // Create a new ThreadMsg
    ThreadMsg* threadMsg = new ThreadMsg(MSG_DISPATCH_DELEGATE, msg);

    // Add dispatch delegate msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

The thread event loop gets the message and calls the CB_TargetInvoke() function. The data sent through the queue is deleted once complete. 

case MSG_DISPATCH_DELEGATE:
{
    ASSERT_TRUE(msg->GetData() != NULL);

    // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
    const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());

    // Invoke the callback on the target thread
    CB_TargetInvoke(callbackMsg);

    // Delete dynamic data passed through message queue
    delete msg;
    break;
}

Software locks are handled by the LockGuard module. This file can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places. Define USE_LOCKS within callback.c to use LockGuard module locks. 

Asynchronous Library Comparison

Asynchronous function invocation allows for easy movement of data between threads. The table below summarizes the various asynchronous function invocation implementations available in C and C++.

Repository Language Key Delegate Features Notes
DelegateMQ C++17 * Function-like template syntax
* Any delegate target function type (member, static, free, lambda)
* N target function arguments
* N delegate subscribers
* Variadic templates
* Template metaprogramming
* Modern C++
* Invoke synchronously, asynchronously or remotely
* Extensive unit tests
AsyncCallback C++ * Traditional template syntax
* Delegate target function type (static, free)
* 1 target function argument
* N delegate subscribers
* Low lines of source code
* Most compact C++ implementation
* Any C++ compiler
C_AsyncCallback C * Macros provide type-safety
* Delegate target function type (static, free)
* 1 target function argument
* Fixed delegate subscribers (set at compile time)
* Optional fixed block allocator
* Low lines of source code
* Very compact implementation
* Any C compiler

References

About

Asynchronous Multicast Callbacks in C

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published