Skip to content

Commit 37242e3

Browse files
authored
Merge pull request #1518 from PietroGhg/pietro/coverity
[NATIVECPU] Address coverity warnings in threadpool implementation
2 parents 48facdd + 56ffd29 commit 37242e3

File tree

1 file changed

+35
-57
lines changed

1 file changed

+35
-57
lines changed

source/adapters/native_cpu/threadpool.hpp

Lines changed: 35 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
#include <atomic>
1212
#include <condition_variable>
1313
#include <cstdlib>
14+
#include <forward_list>
1415
#include <functional>
1516
#include <future>
16-
#include <iostream>
17+
#include <iterator>
1718
#include <mutex>
1819
#include <numeric>
1920
#include <queue>
@@ -30,18 +31,14 @@ namespace detail {
3031
class worker_thread {
3132
public:
3233
// Initializes state, but does not start the worker thread
33-
worker_thread() noexcept : m_isRunning(false), m_numTasks(0) {}
34-
35-
// Creates and launches the worker thread
36-
inline void start(size_t threadId) {
34+
worker_thread(size_t threadId) noexcept
35+
: m_threadId(threadId), m_isRunning(false), m_numTasks(0) {
3736
std::lock_guard<std::mutex> lock(m_workMutex);
3837
if (this->is_running()) {
3938
return;
4039
}
41-
m_threadId = threadId;
4240
m_worker = std::thread([this]() {
4341
while (true) {
44-
// pin the thread to the cpu
4542
std::unique_lock<std::mutex> lock(m_workMutex);
4643
// Wait until there's work available
4744
m_startWorkCondition.wait(
@@ -51,7 +48,7 @@ class worker_thread {
5148
break;
5249
}
5350
// Retrieve a task from the queue
54-
auto task = m_tasks.front();
51+
worker_task_t task = std::move(m_tasks.front());
5552
m_tasks.pop();
5653

5754
// Not modifying internal state anymore, can release the mutex
@@ -63,7 +60,7 @@ class worker_thread {
6360
}
6461
});
6562

66-
m_isRunning = true;
63+
m_isRunning.store(true, std::memory_order_release);
6764
}
6865

6966
inline void schedule(const worker_task_t &task) {
@@ -79,16 +76,12 @@ class worker_thread {
7976
size_t num_pending_tasks() const noexcept {
8077
// m_numTasks is an atomic counter because we don't want to lock the mutex
8178
// here, num_pending_tasks is only used for heuristics
82-
return m_numTasks.load();
79+
return m_numTasks.load(std::memory_order_acquire);
8380
}
8481

8582
// Waits for all tasks to finish and destroys the worker thread
8683
inline void stop() {
87-
{
88-
// Notify the worker thread to stop executing
89-
std::lock_guard<std::mutex> lock(m_workMutex);
90-
m_isRunning = false;
91-
}
84+
m_isRunning.store(false, std::memory_order_release);
9285
m_startWorkCondition.notify_all();
9386
if (m_worker.joinable()) {
9487
// Wait for the worker thread to finish handling the task queue
@@ -97,18 +90,21 @@ class worker_thread {
9790
}
9891

9992
// Checks whether the thread pool is currently running threads
100-
inline bool is_running() const noexcept { return m_isRunning; }
93+
inline bool is_running() const noexcept {
94+
return m_isRunning.load(std::memory_order_acquire);
95+
}
10196

10297
private:
10398
// Unique ID identifying the thread in the threadpool
104-
size_t m_threadId;
99+
const size_t m_threadId;
100+
105101
std::thread m_worker;
106102

107103
std::mutex m_workMutex;
108104

109105
std::condition_variable m_startWorkCondition;
110106

111-
bool m_isRunning;
107+
std::atomic<bool> m_isRunning;
112108

113109
std::queue<worker_task_t> m_tasks;
114110

@@ -121,47 +117,21 @@ class worker_thread {
121117
// parameters and futures.
122118
class simple_thread_pool {
123119
public:
124-
simple_thread_pool(size_t numThreads = 0) noexcept : m_isRunning(false) {
125-
this->resize(numThreads);
126-
this->start();
127-
}
128-
129-
~simple_thread_pool() { this->stop(); }
130-
131-
// Creates and launches the worker threads
132-
inline void start() {
133-
if (this->is_running()) {
134-
return;
135-
}
136-
size_t threadId = 0;
137-
for (auto &t : m_workers) {
138-
t.start(threadId);
139-
threadId++;
120+
simple_thread_pool() noexcept
121+
: m_isRunning(false), m_numThreads(get_num_threads()) {
122+
for (size_t i = 0; i < m_numThreads; i++) {
123+
m_workers.emplace_front(i);
140124
}
141125
m_isRunning.store(true, std::memory_order_release);
142126
}
143127

144-
// Waits for all tasks to finish and destroys the worker threads
145-
inline void stop() {
128+
~simple_thread_pool() {
146129
for (auto &t : m_workers) {
147130
t.stop();
148131
}
149132
m_isRunning.store(false, std::memory_order_release);
150133
}
151134

152-
inline void resize(size_t numThreads) {
153-
char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS");
154-
if (envVar) {
155-
numThreads = std::stoul(envVar);
156-
}
157-
if (numThreads == 0) {
158-
numThreads = std::thread::hardware_concurrency();
159-
}
160-
if (!this->is_running() && (numThreads != this->num_threads())) {
161-
m_workers = decltype(m_workers)(numThreads);
162-
}
163-
}
164-
165135
inline void schedule(const worker_task_t &task) {
166136
// Schedule the task on the best available worker thread
167137
this->best_worker().schedule(task);
@@ -171,7 +141,7 @@ class simple_thread_pool {
171141
return m_isRunning.load(std::memory_order_acquire);
172142
}
173143

174-
inline size_t num_threads() const noexcept { return m_workers.size(); }
144+
inline size_t num_threads() const noexcept { return m_numThreads; }
175145

176146
inline size_t num_pending_tasks() const noexcept {
177147
return std::accumulate(std::begin(m_workers), std::end(m_workers),
@@ -201,24 +171,32 @@ class simple_thread_pool {
201171
}
202172

203173
private:
204-
std::vector<worker_thread> m_workers;
174+
static size_t get_num_threads() {
175+
size_t numThreads;
176+
char *envVar = std::getenv("SYCL_NATIVE_CPU_HOST_THREADS");
177+
if (envVar) {
178+
numThreads = std::stoul(envVar);
179+
} else {
180+
numThreads = std::thread::hardware_concurrency();
181+
}
182+
return numThreads;
183+
}
184+
185+
std::forward_list<worker_thread> m_workers;
205186

206187
std::atomic<bool> m_isRunning;
188+
189+
const size_t m_numThreads;
207190
};
208191
} // namespace detail
209192

210193
template <typename ThreadPoolT> class threadpool_interface {
211194
ThreadPoolT threadpool;
212195

213196
public:
214-
void start() { threadpool.start(); }
215-
216-
void stop() { threadpool.stop(); }
217-
218197
size_t num_threads() const noexcept { return threadpool.num_threads(); }
219198

220-
threadpool_interface(size_t numThreads) : threadpool(numThreads) {}
221-
threadpool_interface() : threadpool(0) {}
199+
threadpool_interface() : threadpool() {}
222200

223201
auto schedule_task(worker_task_t &&task) {
224202
auto workerTask = std::make_shared<std::packaged_task<void(size_t)>>(

0 commit comments

Comments
 (0)