Skip to content

Commit a6b8f1f

Browse files
committed
Improve thread startup and shutdown handling.
1 parent a05f6f5 commit a6b8f1f

File tree

4 files changed

+76
-20
lines changed

4 files changed

+76
-20
lines changed

README.md

+9-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Create a worker thread with an event loop, message queue and a timer using the C
1919
- [Event Loop (Win32)](#event-loop-win32)
2020
- [Timer](#timer)
2121
- [Usage](#usage)
22-
- [Star History](#star-history)
22+
- [Related Repositories](#related-repositories)
2323
- [Conclusion](#conclusion)
2424

2525
# Preface
@@ -314,11 +314,16 @@ int main(void)
314314
    return 0;
315315
}</pre>
316316

317-
# Star History
317+
# Related Repositories
318318

319-
Find this repository useful? Consider giving it a star!
319+
Real-world projects using `WorkerThread`.
320320

321-
[![Star History Chart](https://api.star-history.com/svg?repos=endurodave/StdWorkerThread&type=Date)](https://star-history.com/#endurodave/StdWorkerThread&Date)
321+
* <a href="https://github.com/endurodave/DelegateMQ">Asynchronous Delegates in C++</a> - Invoke any C++ callable function synchronously, asynchronously, or on a remote endpoint.
322+
* <a href="https://github.com/endurodave/StateMachineWithThreads">C++ State Machine with Threads</a> - A framework combining C++ state machines and multicast asynchronous callbacks.
323+
* <a href="https://github.com/endurodave/StateMachineWithModernDelegates">C++ State Machine with Asynchronous Delegates</a> - A framework combining C++ state machines and asynchronous delegate callbacks.
324+
* <a href="https://github.com/endurodave/AsyncStateMachine">Asynchronous State Machine Design in C++</a> - An asynchronous C++ state machine implemented using an asynchronous delegate library.
325+
* <a href="https://github.com/endurodave/IntegrationTestFramework">Integration Test Framework using Google Test and Delegates</a> - A multi-threaded C++ software integration test framework using Google Test and DelegateMQ libraries.
326+
* <a href="https://github.com/endurodave/Async-SQLite">Asynchronous SQLite API using C++ Delegates</a> - An asynchronous SQLite wrapper implemented using an asynchronous delegate library.
322327

323328
# Conclusion
324329

WorkerThread.cpp

+51-14
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ struct ThreadMsg
2222
//----------------------------------------------------------------------------
2323
// WorkerThread
2424
//----------------------------------------------------------------------------
25-
WorkerThread::WorkerThread(const std::string& threadName) : m_thread(nullptr), m_timerExit(false), THREAD_NAME(threadName)
25+
WorkerThread::WorkerThread(const std::string& threadName) :
26+
m_thread(nullptr),
27+
m_exit(false),
28+
m_timerExit(false),
29+
THREAD_NAME(threadName)
2630
{
2731
}
2832

@@ -41,20 +45,15 @@ bool WorkerThread::CreateThread()
4145
{
4246
if (!m_thread)
4347
{
48+
m_threadStartFuture = m_threadStartPromise.get_future();
49+
4450
m_thread = std::unique_ptr<std::thread>(new thread(&WorkerThread::Process, this));
4551

46-
#ifdef WIN32
47-
// Get the thread's native Windows handle
4852
auto handle = m_thread->native_handle();
53+
SetThreadName(handle, THREAD_NAME);
4954

50-
// Set the thread name so it shows in the Visual Studio Debug Location toolbar
51-
std::wstring wstr(THREAD_NAME.begin(), THREAD_NAME.end());
52-
HRESULT hr = SetThreadDescription(handle, wstr.c_str());
53-
if (FAILED(hr))
54-
{
55-
// Handle error if needed
56-
}
57-
#endif
55+
// Wait for the thread to enter the Process method
56+
m_threadStartFuture.get();
5857
}
5958

6059
return true;
@@ -77,6 +76,31 @@ std::thread::id WorkerThread::GetCurrentThreadId()
7776
return this_thread::get_id();
7877
}
7978

79+
//----------------------------------------------------------------------------
80+
// GetQueueSize
81+
//----------------------------------------------------------------------------
82+
size_t WorkerThread::GetQueueSize()
83+
{
84+
lock_guard<mutex> lock(m_mutex);
85+
return m_queue.size();
86+
}
87+
88+
//----------------------------------------------------------------------------
89+
// SetThreadName
90+
//----------------------------------------------------------------------------
91+
void WorkerThread::SetThreadName(std::thread::native_handle_type handle, const std::string& name)
92+
{
93+
#ifdef WIN32
94+
// Set the thread name so it shows in the Visual Studio Debug Location toolbar
95+
std::wstring wstr(name.begin(), name.end());
96+
HRESULT hr = SetThreadDescription(handle, wstr.c_str());
97+
if (FAILED(hr))
98+
{
99+
// Handle error if needed
100+
}
101+
#endif
102+
}
103+
80104
//----------------------------------------------------------------------------
81105
// ExitThread
82106
//----------------------------------------------------------------------------
@@ -95,15 +119,25 @@ void WorkerThread::ExitThread()
95119
m_cv.notify_one();
96120
}
97121

98-
m_thread->join();
99-
m_thread = nullptr;
122+
m_exit.store(true);
123+
m_thread->join();
124+
125+
// Clear the queue if anything added while waiting for join
126+
{
127+
lock_guard<mutex> lock(m_mutex);
128+
m_thread = nullptr;
129+
while (!m_queue.empty())
130+
m_queue.pop();
131+
}
100132
}
101133

102134
//----------------------------------------------------------------------------
103135
// PostMsg
104136
//----------------------------------------------------------------------------
105137
void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
106138
{
139+
if (m_exit.load())
140+
return;
107141
ASSERT_TRUE(m_thread);
108142

109143
// Create a new ThreadMsg
@@ -123,7 +157,7 @@ void WorkerThread::TimerThread()
123157
while (!m_timerExit)
124158
{
125159
// Sleep for 250mS then put a MSG_TIMER into the message queue
126-
std::this_thread::sleep_for(250ms);
160+
std::this_thread::sleep_for(std::chrono::milliseconds(250));
127161

128162
std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));
129163

@@ -139,6 +173,9 @@ void WorkerThread::TimerThread()
139173
//----------------------------------------------------------------------------
140174
void WorkerThread::Process()
141175
{
176+
// Signal that the thread has started processing to notify CreateThread
177+
m_threadStartPromise.set_value();
178+
142179
m_timerExit = false;
143180
std::thread timerThread(&WorkerThread::TimerThread, this);
144181

WorkerThread.h

+15
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <atomic>
1111
#include <condition_variable>
1212
#include <string>
13+
#include <future>
1314

1415
struct UserData
1516
{
@@ -47,6 +48,12 @@ class WorkerThread
4748
/// @param[in] data - thread specific message information
4849
void PostMsg(std::shared_ptr<UserData> msg);
4950

51+
/// Get size of thread message queue.
52+
size_t GetQueueSize();
53+
54+
/// Get thread name
55+
std::string GetThreadName() { return THREAD_NAME; }
56+
5057
private:
5158
WorkerThread(const WorkerThread&) = delete;
5259
WorkerThread& operator=(const WorkerThread&) = delete;
@@ -57,12 +64,20 @@ class WorkerThread
5764
/// Entry point for timer thread
5865
void TimerThread();
5966

67+
void SetThreadName(std::thread::native_handle_type handle, const std::string& name);
68+
6069
std::unique_ptr<std::thread> m_thread;
6170
std::queue<std::shared_ptr<ThreadMsg>> m_queue;
6271
std::mutex m_mutex;
6372
std::condition_variable m_cv;
6473
std::atomic<bool> m_timerExit;
6574
const std::string THREAD_NAME;
75+
76+
// Promise and future to synchronize thread start
77+
std::promise<void> m_threadStartPromise;
78+
std::future<void> m_threadStartFuture;
79+
80+
std::atomic<bool> m_exit;
6681
};
6782

6883
#endif

main.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ int main(void)
3636
// Post the message to worker thread 2
3737
workerThread2.PostMsg(userData2);
3838

39-
// Give time for messages processing on worker threads
40-
this_thread::sleep_for(1s);
39+
std::this_thread::sleep_for(std::chrono::seconds(1));
4140

4241
workerThread1.ExitThread();
4342
workerThread2.ExitThread();

0 commit comments

Comments
 (0)