Skip to content

Commit 94949c3

Browse files
committed
[线程与线程池优化]: 升级多线程组件至C++20标准,增强功能与跨平台支持
- **线程实现升级**: 将基础线程类从`std::thread`重构为`std::jthread`,利用C++20新特性简化线程管理逻辑,新增线程停止功能支持。 - **停止令牌集成**: 在所有线程相关操作中引入`std::stop_token`机制,实现线程任务的优雅中断,提升程序响应能力。 - **线程池优化**: 重构线程池内部任务队列,采用自定义模板队列替代原生队列,增强任务调度灵活性与性能表现。 - **跨平台适配调整**: 根据构建主机操作系统类型,调整线程组件的编译条件,移除非Windows系统对线程示例的构建支持(因`std::jthread`在Apple Clang下兼容性问题)。 - **单元测试扩展**: 新增线程与线程池相关测试用例共10项,涵盖线程启动/停止、任务添加、等待完成等核心功能验证。 - **资源管理改进**: 在线程池与任务队列中添加资源计数与完成状态通知机制,优化多线程环境下的资源监控与同步操作。 - **文档更新**: 修正README中线程模块描述,明确指出基于`std::jthread`实现,并标注Apple Clang平台的兼容性限制。 - **依赖基线更新**: 调整vcpkg依赖基线至最新版本,确保第三方组件与新线程API的兼容性。
1 parent 447c703 commit 94949c3

File tree

9 files changed

+365
-113
lines changed

9 files changed

+365
-113
lines changed

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.25.1)
22

33
include(cmake/vcpkg.cmake)
44

5-
# 设定工程名
65
project(
76
Cpp-Examples
87
VERSION 0.0.1
@@ -69,12 +68,14 @@ add_subdirectory(MonitorDir_EFSW)
6968
add_subdirectory(OpenSSL)
7069
add_subdirectory(SafeCallback)
7170
add_subdirectory(SpinMutex)
72-
add_subdirectory(Thread)
7371

74-
if(CMAKE_HOST_LINUX)
72+
if(CMAKE_HOST_WIN32)
73+
add_subdirectory(Thread)
74+
elseif(CMAKE_HOST_LINUX)
7575
add_subdirectory(Client)
7676
add_subdirectory(Icmp)
7777
add_subdirectory(Server)
78+
add_subdirectory(Thread)
7879
endif()
7980

8081
include(cmake/build_info.cmake)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@
5151
2. [server_poll](/Server/server_poll.cc)——poll的例子;
5252
3. [server_select](/Server/server_select.cc)——select的例子;
5353
23. [SpinMutex](/SpinMutex)——使用std::atomic_flag实现的简单互斥锁和自旋锁;
54-
24. [Thread](/Thread/)——基于std::thread实现的线程类,包括线程池;
54+
24. [Thread](/Thread/)——基于std::jthread实现的线程类,包括线程池`Apple Clang`不支持)
5555
1. [Thread](/Thread/thread.hpp)——线程类;
5656
2. [ThreadPool](/Thread/threadpool.hpp)——线程池;

Thread/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ target_link_libraries(thread_unittest PRIVATE GTest::gtest GTest::gtest_main
33
GTest::gmock GTest::gmock_main)
44
add_test(NAME thread_unittest COMMAND thread_unittest)
55

6-
add_executable(threadpool_unittest threadpool_unittest.cc threadpool.hpp
7-
thread.hpp)
6+
add_executable(threadpool_unittest queue.hpp threadpool_unittest.cc
7+
threadpool.hpp thread.hpp)
88
target_link_libraries(
99
threadpool_unittest PRIVATE GTest::gtest GTest::gtest_main GTest::gmock
1010
GTest::gmock_main)

Thread/queue.hpp

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#pragma once
2+
3+
#include <utils/object.hpp>
4+
5+
#include <condition_variable>
6+
#include <functional>
7+
#include <mutex>
8+
#include <utility>
9+
#include <queue>
10+
11+
template<typename T>
12+
requires std::is_move_assignable_v<T>
13+
class Queue : noncopyable
14+
{
15+
std::queue<T> m_queue;
16+
bool m_stop = false;
17+
mutable std::mutex m_mutex;
18+
std::condition_variable m_condEmpty;
19+
std::condition_variable m_condFull;
20+
std::size_t m_maxSize = 0;
21+
22+
public:
23+
using PushSuccessCallback = std::function<void(void)>;
24+
25+
Queue() = default;
26+
explicit Queue(std::size_t maxSize)
27+
: m_maxSize(maxSize == 0 ? 1 : maxSize)
28+
{}
29+
~Queue() { stop(); }
30+
31+
[[nodiscard]] auto push(T &&item, PushSuccessCallback callback = nullptr) -> bool
32+
{
33+
{
34+
std::unique_lock lock(m_mutex);
35+
m_condFull.wait(lock, [&]() { return !m_stop && (m_queue.size() < m_maxSize); });
36+
if (m_stop) {
37+
return false;
38+
}
39+
m_queue.push(std::move(item));
40+
if (callback) {
41+
callback();
42+
}
43+
}
44+
m_condEmpty.notify_one();
45+
return true;
46+
}
47+
48+
[[nodiscard]] auto pop(T &item) -> bool
49+
{
50+
std::unique_lock lock(m_mutex);
51+
m_condEmpty.wait(lock, [&]() { return !m_queue.empty() || m_stop; });
52+
if (m_queue.empty()) {
53+
return false;
54+
}
55+
item = std::move(m_queue.front());
56+
m_queue.pop();
57+
m_condFull.notify_one();
58+
return true;
59+
}
60+
61+
void setMaxSize(std::size_t maxSize)
62+
{
63+
std::scoped_lock guard(m_mutex);
64+
m_maxSize = maxSize == 0 ? 1 : maxSize;
65+
m_condFull.notify_all();
66+
}
67+
68+
[[nodiscard]] auto getMaxSize() const -> std::size_t
69+
{
70+
std::scoped_lock guard(m_mutex);
71+
return m_maxSize;
72+
}
73+
74+
[[nodiscard]] auto size() const -> std::size_t
75+
{
76+
std::scoped_lock guard(m_mutex);
77+
return m_queue.size();
78+
}
79+
80+
[[nodiscard]] auto empty() const -> bool
81+
{
82+
std::scoped_lock guard(m_mutex);
83+
return m_queue.empty();
84+
}
85+
86+
void clear()
87+
{
88+
std::scoped_lock guard(m_mutex);
89+
std::queue<T>().swap(m_queue);
90+
m_condFull.notify_all();
91+
m_condEmpty.notify_all();
92+
}
93+
94+
[[nodiscard]] auto isStopped() const -> bool
95+
{
96+
std::scoped_lock guard(m_mutex);
97+
return m_stop;
98+
}
99+
100+
void start()
101+
{
102+
std::scoped_lock guard(m_mutex);
103+
m_stop = false;
104+
}
105+
106+
void stop()
107+
{
108+
{
109+
std::scoped_lock guard(m_mutex);
110+
m_stop = true;
111+
}
112+
m_condEmpty.notify_all();
113+
m_condFull.notify_all();
114+
}
115+
};

Thread/thread.hpp

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,72 +7,123 @@
77
#include <chrono>
88
#include <condition_variable>
99
#include <functional>
10+
#include <iostream>
1011
#include <mutex>
1112
#include <thread>
1213

1314
class Thread : noncopyable
1415
{
1516
public:
16-
using Task = std::function<void()>;
17+
using Task = std::function<void(std::stop_token)>;
1718

1819
Thread() = default;
19-
explicit Thread(Task task) { setTask(std::move(task)); }
20+
21+
explicit Thread(Task task)
22+
: m_task(std::move(task))
23+
{}
24+
2025
~Thread() { stop(); }
2126

22-
void setTask(Task task) { m_task = std::move(task); }
27+
void setTask(Task task)
28+
{
29+
std::lock_guard<std::mutex> lock(m_mutex);
30+
m_task = std::move(task);
31+
}
2332

24-
void start()
33+
bool start()
2534
{
35+
if (m_jthread.joinable()) {
36+
return false;
37+
}
38+
2639
m_running.store(true);
27-
m_thread = std::thread([this]() {
28-
m_condition.notify_one();
29-
if (m_task) {
30-
m_task();
40+
m_jthread = std::jthread([this](std::stop_token token) {
41+
{
42+
std::unique_lock<std::mutex> lock(m_mutex);
43+
if (m_task) {
44+
try {
45+
m_task(token);
46+
} catch (const std::exception &e) {
47+
std::cerr << "Thread exception: " << e.what() << std::endl;
48+
}
49+
}
3150
}
51+
52+
m_running.store(false);
53+
m_running.notify_all();
3254
});
55+
56+
return true;
3357
}
3458

3559
void stop()
3660
{
37-
if (m_thread.joinable()) {
38-
m_thread.join();
61+
if (m_jthread.joinable()) {
62+
m_jthread.request_stop();
63+
m_jthread.join();
64+
}
65+
}
66+
67+
void interrupt()
68+
{
69+
if (m_jthread.joinable()) {
70+
m_jthread.request_stop();
3971
}
40-
m_running.store(false);
4172
}
4273

43-
void waitForStarted()
74+
void waitForFinished()
4475
{
45-
std::unique_lock<std::mutex> lock(m_mutex);
46-
m_condition.wait(lock, [this]() { return m_running.load(); });
76+
while (m_running.load()) {
77+
m_running.wait(true);
78+
}
4779
}
4880

49-
[[nodiscard]] auto isRunning() const -> bool { return m_running; }
81+
[[nodiscard]] auto isRunning() const -> bool { return m_running.load(); }
5082

51-
[[nodiscard]] auto getThreadId() const -> std::thread::id { return m_thread.get_id(); }
83+
[[nodiscard]] auto getThreadId() const -> std::thread::id { return m_jthread.get_id(); }
5284

5385
static void yield() { std::this_thread::yield(); }
5486

87+
static void sleep(unsigned int seconds)
88+
{
89+
std::this_thread::sleep_for(std::chrono::seconds(seconds));
90+
}
91+
92+
static void msleep(unsigned int milliseconds)
93+
{
94+
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
95+
}
96+
97+
static void usleep(unsigned int microseconds)
98+
{
99+
std::this_thread::sleep_for(std::chrono::microseconds(microseconds));
100+
}
101+
102+
static void nsleep(unsigned int nanoseconds)
103+
{
104+
std::this_thread::sleep_for(std::chrono::nanoseconds(nanoseconds));
105+
}
106+
55107
static void sleepFor(std::chrono::milliseconds duration)
56108
{
57109
std::this_thread::sleep_for(duration);
58110
}
59111

60-
static void sleepUntil(std::chrono::system_clock::time_point timePoint)
112+
static void sleepUntil(const std::chrono::system_clock::time_point &wake_time)
61113
{
62-
std::this_thread::sleep_until(timePoint);
114+
std::this_thread::sleep_until(wake_time);
63115
}
64116

65117
static auto hardwareConcurrency() -> unsigned int
66118
{
67-
auto availableCores = std::thread::hardware_concurrency(); // 如果不支持,返回0
119+
auto availableCores = std::thread::hardware_concurrency();
68120
assert(availableCores > 0);
69121
return availableCores;
70122
}
71123

72124
private:
73-
std::thread m_thread;
125+
std::jthread m_jthread;
74126
std::atomic_bool m_running{false};
75127
std::mutex m_mutex;
76-
std::condition_variable m_condition;
77128
Task m_task;
78129
};

Thread/thread_unittest.cc

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,70 @@
22

33
#include <gtest/gtest.h>
44

5-
TEST(Thread, start)
5+
void testTaskFunction(std::stop_token stopToken)
66
{
7-
Thread thread([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
7+
for (int i = 0; i < 5; ++i) {
8+
if (stopToken.stop_requested()) {
9+
break;
10+
}
11+
12+
std::cout << "Test task is running: " << i << std::endl;
13+
Thread::msleep(200);
14+
}
15+
}
16+
17+
TEST(ThreadTest, StartAndStop)
18+
{
19+
Thread thread;
20+
thread.setTask(testTaskFunction);
821
thread.start();
9-
thread.waitForStarted();
1022
EXPECT_TRUE(thread.isRunning());
1123
thread.stop();
1224
EXPECT_FALSE(thread.isRunning());
1325
}
1426

15-
TEST(Thread, setTask)
27+
TEST(ThreadTest, Interrupt)
1628
{
1729
Thread thread;
18-
thread.setTask([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
30+
thread.setTask(testTaskFunction);
1931
thread.start();
20-
thread.waitForStarted();
2132
EXPECT_TRUE(thread.isRunning());
22-
thread.stop();
33+
thread.interrupt();
34+
Thread::sleep(1);
35+
EXPECT_FALSE(thread.isRunning());
36+
}
37+
38+
TEST(ThreadTest, WaitForFinished)
39+
{
40+
Thread thread;
41+
thread.setTask(testTaskFunction);
42+
thread.start();
43+
thread.waitForFinished();
2344
EXPECT_FALSE(thread.isRunning());
2445
}
2546

26-
auto main(int argc, char *argv[]) -> int
47+
TEST(ThreadTest, WaitForFinished2)
48+
{
49+
Thread thread;
50+
thread.waitForFinished();
51+
EXPECT_FALSE(thread.isRunning());
52+
}
53+
54+
TEST(ThreadTest, TaskExecution)
55+
{
56+
Thread thread;
57+
bool taskExecuted = false;
58+
thread.setTask([&taskExecuted](std::stop_token) {
59+
taskExecuted = true;
60+
std::cout << "Task executed." << std::endl;
61+
});
62+
thread.start();
63+
thread.stop();
64+
EXPECT_TRUE(taskExecuted);
65+
}
66+
67+
int main(int argc, char **argv)
2768
{
28-
testing::InitGoogleTest(&argc, argv);
69+
::testing::InitGoogleTest(&argc, argv);
2970
return RUN_ALL_TESTS();
3071
}

0 commit comments

Comments
 (0)