Skip to content

Commit c942414

Browse files
committed
feat(eventx,flow): 1.12.11, 添加了eventx::ThreadExecutor接口,实现了flow::ExecuteCmdAction动作
1 parent 94dac0e commit c942414

16 files changed

+471
-110
lines changed

modules/eventx/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ add_definitions(-DMODULE_ID="tbox.eventx")
3030
set(TBOX_LIBRARY_NAME tbox_eventx)
3131

3232
set(TBOX_EVENTX_HEADERS
33+
thread_executor.h
3334
thread_pool.h
3435
timer_pool.h
3536
timeout_monitor.hpp

modules/eventx/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ LIB_VERSION_Y = 0
2525
LIB_VERSION_Z = 2
2626

2727
HEAD_FILES = \
28+
thread_executor.h \
2829
thread_pool.h \
2930
timer_pool.h \
3031
timeout_monitor.hpp \

modules/eventx/thread_executor.h

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* .============.
3+
* // M A K E / \
4+
* // C++ DEV / \
5+
* // E A S Y / \/ \
6+
* ++ ----------. \/\ .
7+
* \\ \ \ /\ /
8+
* \\ \ \ /
9+
* \\ \ \ /
10+
* -============'
11+
*
12+
* Copyright (c) 2018 Hevake and contributors, all rights reserved.
13+
*
14+
* This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox)
15+
* Use of this source code is governed by MIT license that can be found
16+
* in the LICENSE file in the root of the source tree. All contributing
17+
* project authors may be found in the CONTRIBUTORS.md file in the root
18+
* of the source tree.
19+
*/
20+
#ifndef TBOX_EVENTX_THREAD_EXECUTOR_H
21+
#define TBOX_EVENTX_THREAD_EXECUTOR_H
22+
23+
#include <functional>
24+
#include <tbox/base/cabinet_token.h>
25+
26+
namespace tbox {
27+
namespace eventx {
28+
29+
/**
30+
* 线程执行器接口
31+
*/
32+
class ThreadExecutor {
33+
public:
34+
using TaskToken = cabinet::Token;
35+
using NonReturnFunc = std::function<void ()>;
36+
37+
enum class TaskStatus {
38+
kWaiting, //!< 等待中
39+
kExecuting, //!< 执行中
40+
kNotFound, //!< 未找到(可能已完成)
41+
kCleanup, //!< 执行器与被清除
42+
};
43+
44+
enum class CancelResult {
45+
kSuccess, //!< 成功
46+
kExecuting, //!< 该任务正在执行
47+
kNotFound, //!< 没有找到该任务(可能已完成)
48+
kCleanup, //!< 执行器与被清除
49+
};
50+
51+
public:
52+
/**
53+
* 使用worker线程执行某个函数
54+
*
55+
* \param backend_task 让worker线程执行的函数对象
56+
*
57+
* \return TaskToken 任务Token
58+
*/
59+
virtual TaskToken execute(NonReturnFunc &&backend_task) = 0;
60+
virtual TaskToken execute(const NonReturnFunc &backend_task) = 0;
61+
62+
/**
63+
* 使用worker线程执行某个函数,并在完成之后在主线程执行指定的回调函数
64+
*
65+
* \param backend_task 让worker线程执行的函数对象
66+
* \param main_cb 任务完成后,由主线程执行的回调函数对象
67+
* \param prio 任务优先级[-2, -1, 0, 1, 2],越小优先级越高
68+
*
69+
* \return TaskToken 任务Token
70+
*/
71+
virtual TaskToken execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb) = 0;
72+
virtual TaskToken execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb) = 0;
73+
74+
75+
//! 获取任务的状态
76+
virtual TaskStatus getTaskStatus(TaskToken task_token) const = 0;
77+
78+
//! 取消任务
79+
virtual CancelResult cancel(TaskToken task_token) = 0;
80+
};
81+
82+
}
83+
}
84+
85+
#endif //TBOX_EVENTX_THREAD_EXECUTOR_H

modules/eventx/thread_pool.cpp

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -126,17 +126,7 @@ bool ThreadPool::initialize(ssize_t min_thread_num, ssize_t max_thread_num)
126126
return true;
127127
}
128128

129-
ThreadPool::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, int prio)
130-
{
131-
return execute(std::move(backend_task), nullptr, prio);
132-
}
133-
134-
ThreadPool::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task, int prio)
135-
{
136-
return execute(backend_task, nullptr, prio);
137-
}
138-
139-
ThreadPool::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb, int prio)
129+
ThreadExecutor::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb, int prio)
140130
{
141131
RECORD_SCOPE();
142132
TaskToken token;
@@ -180,14 +170,44 @@ ThreadPool::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, NonRetur
180170
return token;
181171
}
182172

183-
ThreadPool::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb, int prio)
173+
ThreadExecutor::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb, int prio)
184174
{
185175
NonReturnFunc backend_task_copy(backend_task);
186176
NonReturnFunc main_cb_copy(main_cb);
187177
return execute(std::move(backend_task_copy), std::move(main_cb_copy), prio);
188178
}
189179

190-
ThreadPool::TaskStatus ThreadPool::getTaskStatus(TaskToken task_token) const
180+
ThreadExecutor::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, int prio)
181+
{
182+
return execute(std::move(backend_task), nullptr, prio);
183+
}
184+
185+
ThreadExecutor::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task, int prio)
186+
{
187+
return execute(backend_task, nullptr, prio);
188+
}
189+
190+
ThreadExecutor::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task)
191+
{
192+
return execute(std::move(backend_task), 0);
193+
}
194+
195+
ThreadExecutor::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task)
196+
{
197+
return execute(backend_task, 0);
198+
}
199+
200+
ThreadExecutor::TaskToken ThreadPool::execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb)
201+
{
202+
return execute(std::move(backend_task), std::move(main_cb), 0);
203+
}
204+
205+
ThreadExecutor::TaskToken ThreadPool::execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb)
206+
{
207+
return execute(backend_task, main_cb, 0);
208+
}
209+
210+
ThreadExecutor::TaskStatus ThreadPool::getTaskStatus(TaskToken task_token) const
191211
{
192212
std::lock_guard<std::mutex> lg(d_->lock);
193213

@@ -200,20 +220,14 @@ ThreadPool::TaskStatus ThreadPool::getTaskStatus(TaskToken task_token) const
200220
return TaskStatus::kNotFound;
201221
}
202222

203-
/**
204-
* 返回值如下:
205-
* 0: 取消成功
206-
* 1: 没有找到该任务
207-
* 2: 该任务正在执行
208-
*/
209-
int ThreadPool::cancel(TaskToken token)
223+
ThreadExecutor::CancelResult ThreadPool::cancel(TaskToken token)
210224
{
211225
RECORD_SCOPE();
212226
std::lock_guard<std::mutex> lg(d_->lock);
213227

214228
//! 如果正在执行
215229
if (d_->doing_tasks_token.find(token) != d_->doing_tasks_token.end())
216-
return 2; //! 返回正在执行
230+
return CancelResult::kExecuting; //! 返回正在执行
217231

218232
//! 从高优先级向低优先级遍历,找出优先级最高的任务
219233
for (size_t i = 0; i < d_->undo_tasks_token.size(); ++i) {
@@ -223,12 +237,12 @@ int ThreadPool::cancel(TaskToken token)
223237
if (iter != tasks_token.end()) {
224238
tasks_token.erase(iter);
225239
d_->task_pool.free(d_->undo_tasks_cabinet.free(token));
226-
return 0;
240+
return CancelResult::kSuccess;
227241
}
228242
}
229243
}
230244

231-
return 1; //! 返回没有找到
245+
return CancelResult::kNotFound; //! 返回没有找到
232246
}
233247

234248
void ThreadPool::cleanup()

modules/eventx/thread_pool.h

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include <limits>
2424
#include <functional>
2525
#include <array>
26+
2627
#include <tbox/event/forward.h>
27-
#include <tbox/base/cabinet_token.h>
28+
29+
#include "thread_executor.h"
2830

2931
namespace tbox {
3032
namespace eventx {
@@ -42,10 +44,8 @@ namespace eventx {
4244
/**
4345
* 线程池类
4446
*/
45-
class ThreadPool {
47+
class ThreadPool : public ThreadExecutor {
4648
public:
47-
using TaskToken = cabinet::Token;
48-
4949
/**
5050
* 构造函数
5151
*
@@ -67,47 +67,45 @@ class ThreadPool {
6767
using NonReturnFunc = std::function<void ()>;
6868

6969
/**
70-
* 使用worker线程执行某个函数
70+
* 使用worker线程执行某个函数,并在完成之后在主线程执行指定的回调函数
7171
*
7272
* \param backend_task 让worker线程执行的函数对象
73+
* \param main_cb 任务完成后,由主线程执行的回调函数对象
7374
* \param prio 任务优先级[-2, -1, 0, 1, 2],越小优先级越高
7475
*
7576
* \return TaskToken 任务Token
7677
*/
77-
TaskToken execute(NonReturnFunc &&backend_task, int prio = 0);
78-
TaskToken execute(const NonReturnFunc &backend_task, int prio = 0);
78+
TaskToken execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb, int prio);
79+
TaskToken execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb, int prio);
7980

8081
/**
81-
* 使用worker线程执行某个函数,并在完成之后在主线程执行指定的回调函数
82+
* 使用worker线程执行某个函数
8283
*
8384
* \param backend_task 让worker线程执行的函数对象
84-
* \param main_cb 任务完成后,由主线程执行的回调函数对象
8585
* \param prio 任务优先级[-2, -1, 0, 1, 2],越小优先级越高
8686
*
8787
* \return TaskToken 任务Token
8888
*/
89-
TaskToken execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb, int prio = 0);
90-
TaskToken execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb, int prio = 0);
91-
92-
enum class TaskStatus {
93-
kWaiting, //! 等待中
94-
kExecuting, //! 执行中
95-
kNotFound //! 未找到(可能已完成)
96-
};
97-
98-
//! 获取任务的状态
99-
TaskStatus getTaskStatus(TaskToken task_token) const;
89+
TaskToken execute(NonReturnFunc &&backend_task, int prio);
90+
TaskToken execute(const NonReturnFunc &backend_task, int prio);
10091

10192
/**
102-
* 取消任务
93+
* 使用worker线程执行某个函数
10394
*
104-
* \return task_token 任务Token
95+
* \param backend_task 让worker线程执行的函数对象
96+
* \param main_cb 任务完成后,由主线程执行的回调函数对象
10597
*
106-
* \return int 0: 成功
107-
* 1: 没有找到该任务(已执行)
108-
* 2: 该任务正在执行
98+
* \return TaskToken 任务Token
10999
*/
110-
int cancel(TaskToken task_token);
100+
virtual TaskToken execute(NonReturnFunc &&backend_task) override;
101+
virtual TaskToken execute(const NonReturnFunc &backend_task) override;
102+
virtual TaskToken execute(NonReturnFunc &&backend_task, NonReturnFunc &&main_cb) override;
103+
virtual TaskToken execute(const NonReturnFunc &backend_task, const NonReturnFunc &main_cb) override;
104+
105+
//! 获取任务的状态
106+
virtual TaskStatus getTaskStatus(TaskToken task_token) const override;
107+
//! 取消任务
108+
virtual CancelResult cancel(TaskToken task_token) override;
111109

112110
/**
113111
* 清理资源,并等待所有的worker线程结束

modules/eventx/thread_pool_test.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,11 @@ TEST(ThreadPool, cancel_task) {
133133

134134
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
135135

136-
EXPECT_EQ(tp->cancel(task_ids[0]), 1); //! 第一个任务已完成
137-
EXPECT_EQ(tp->cancel(task_ids[1]), 2); //! 第二个任务正在执行
138-
EXPECT_EQ(tp->cancel(task_ids[2]), 0); //! 第三个任务可正常取消
136+
EXPECT_EQ(tp->cancel(task_ids[0]), ThreadPool::CancelResult::kNotFound); //! 第一个任务已完成
137+
EXPECT_EQ(tp->cancel(task_ids[1]), ThreadPool::CancelResult::kExecuting); //! 第二个任务正在执行
138+
EXPECT_EQ(tp->cancel(task_ids[2]), ThreadPool::CancelResult::kSuccess); //! 第三个任务可正常取消
139139
ThreadPool::TaskToken invalid_token(100, 1);
140-
EXPECT_EQ(tp->cancel(invalid_token), 1); //! 任务不存在
140+
EXPECT_EQ(tp->cancel(invalid_token), ThreadPool::CancelResult::kNotFound); //! 任务不存在
141141

142142
loop->exitLoop(std::chrono::seconds(4));
143143
loop->runLoop();

0 commit comments

Comments
 (0)