Skip to content

Commit b2c4afd

Browse files
Dream-Runner-Yupymumu
authored andcommitted
max_executor_thread_num && flow-stream-feature
1 parent e2a7464 commit b2c4afd

File tree

7 files changed

+55
-9
lines changed

7 files changed

+55
-9
lines changed

src/libmodelbox/base/include/modelbox/base/executor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class Executor {
3333

3434
virtual ~Executor();
3535

36+
void SetThreadCount(int thread_count);
37+
3638
template <typename func, typename... ts>
3739
auto Run(func &&fun, int32_t priority, ts &&...params)
3840
-> std::future<typename std::result_of<func(ts...)>::type> {

src/libmodelbox/engine/flowunit_data_executor.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ Executor::Executor(int thread_count) {
3434

3535
Executor::~Executor() { thread_pool_ = nullptr; }
3636

37+
void Executor::SetThreadCount(int thread_count) {
38+
thread_pool_->SetThreadSize(thread_count);
39+
}
40+
3741
FlowUnitExecContext::FlowUnitExecContext(
3842
std::shared_ptr<FlowUnitDataContext> data_ctx)
3943
: data_ctx_(std::move(data_ctx)) {}

src/libmodelbox/engine/flowunit_group.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ void FlowUnitGroup::InitTrace() {
4646
}
4747
}
4848

49+
uint32_t FlowUnitGroup::GetBatchSize() const { return batch_size_; }
50+
4951
std::shared_ptr<TraceSlice> FlowUnitGroup::StartTrace(
5052
FUExecContextList &exec_ctx_list) {
5153
std::call_once(trace_init_flag_, &FlowUnitGroup::InitTrace, this);
@@ -388,7 +390,7 @@ Status FlowUnitGroup::Open(const CreateExternalDataFunc &create_func) {
388390

389391
return STATUS_OK;
390392
};
391-
393+
392394
ThreadPool pool(std::thread::hardware_concurrency());
393395
pool.SetName(unit_name_ + "-Open");
394396
std::vector<std::future<Status>> result;

src/libmodelbox/engine/flowunit_manager.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ Status FlowUnitManager::Initialize(
5959
SetDeviceManager(std::move(device_mgr));
6060
Status status;
6161
status = InitFlowUnitFactory(driver);
62+
63+
if (config != nullptr) {
64+
max_executor_thread_num_ =
65+
config->GetUint32("graph.max_executor_thread_num", 0);
66+
} else {
67+
max_executor_thread_num_ = 0;
68+
}
69+
6270
if (status != STATUS_SUCCESS) {
6371
return status;
6472
}
@@ -407,6 +415,12 @@ std::shared_ptr<FlowUnit> FlowUnitManager::CreateSingleFlowUnit(
407415
return nullptr;
408416
}
409417

418+
if (max_executor_thread_num_ > 0) {
419+
MBLOG_INFO << "find the parameter max_executor_thread_num in the config: "
420+
<< max_executor_thread_num_;
421+
device->GetDeviceExecutor()->SetThreadCount(max_executor_thread_num_);
422+
}
423+
410424
flowunit->SetBindDevice(device);
411425
std::vector<FlowUnitInput> &in_list = flowunit_desc->GetFlowUnitInput();
412426
for (auto &in_item : in_list) {

src/libmodelbox/engine/node.cc

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -763,23 +763,43 @@ void Node::CleanDataContext() {
763763

764764
Status Node::Run(RunType type) {
765765
std::list<std::shared_ptr<FlowUnitDataContext>> data_ctx_list;
766+
size_t process_count = 0;
766767
auto ret = Recv(type, data_ctx_list);
767-
if (!ret) {
768-
return ret;
769-
}
770768

771-
ret = Process(data_ctx_list);
772769
if (!ret) {
773770
return ret;
774771
}
775772

776-
if (!GetOutputNames().empty()) {
777-
ret = Send(data_ctx_list);
773+
std::list<std::shared_ptr<FlowUnitDataContext>> process_ctx_list;
774+
775+
auto output_names_is_empty = GetOutputNames().empty();
776+
777+
for (auto& ctx : data_ctx_list) {
778+
// process data according to batch size
779+
process_count++;
780+
process_ctx_list.push_back(ctx);
781+
782+
if (process_ctx_list.size() < flowunit_group_->GetBatchSize()) {
783+
if (process_count < data_ctx_list.size()) {
784+
continue;
785+
}
786+
}
787+
788+
ret = Process(process_ctx_list);
778789
if (!ret) {
779790
return ret;
780791
}
781-
} else {
782-
SetLastError(data_ctx_list);
792+
793+
if (!output_names_is_empty) {
794+
ret = Send(process_ctx_list);
795+
if (!ret) {
796+
return ret;
797+
}
798+
} else {
799+
SetLastError(process_ctx_list);
800+
}
801+
802+
process_ctx_list.clear();
783803
}
784804

785805
Clean(data_ctx_list);

src/libmodelbox/include/modelbox/flowunit.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ class FlowUnitManager {
612612

613613
std::shared_ptr<DeviceManager> GetDeviceManager();
614614

615+
int max_executor_thread_num_;
616+
615617
private:
616618
Status CheckParams(const std::string &unit_name, const std::string &unit_type,
617619
const std::string &unit_device_id);

src/libmodelbox/include/modelbox/flowunit_group.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class FlowUnitGroup {
6464

6565
Status Close();
6666

67+
uint32_t GetBatchSize() const;
68+
6769
private:
6870
std::weak_ptr<Node> node_;
6971
uint32_t batch_size_;

0 commit comments

Comments
 (0)