Skip to content

Commit 89bbe73

Browse files
add: flow-stream-feature
1 parent 39b32d5 commit 89bbe73

File tree

3 files changed

+34
-9
lines changed

3 files changed

+34
-9
lines changed

src/libmodelbox/engine/flowunit_group.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ void FlowUnitGroup::InitTrace() {
4646
}
4747
}
4848

49+
uint32_t FlowUnitGroup::GetBatchSize() const
50+
{
51+
return batch_size_;
52+
}
53+
4954
std::shared_ptr<TraceSlice> FlowUnitGroup::StartTrace(
5055
FUExecContextList &exec_ctx_list) {
5156
std::call_once(trace_init_flag_, &FlowUnitGroup::InitTrace, this);

src/libmodelbox/engine/node.cc

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -762,30 +762,48 @@ void Node::CleanDataContext() {
762762
}
763763

764764
Status Node::Run(RunType type) {
765+
765766
std::list<std::shared_ptr<FlowUnitDataContext>> data_ctx_list;
767+
size_t process_count = 0;
766768
auto ret = Recv(type, data_ctx_list);
767-
if (!ret) {
768-
return ret;
769-
}
770769

771-
ret = Process(data_ctx_list);
772770
if (!ret) {
773771
return ret;
774772
}
775773

776-
if (!GetOutputNames().empty()) {
777-
ret = Send(data_ctx_list);
774+
std::list<std::shared_ptr<FlowUnitDataContext>> process_ctx_list;
775+
776+
for(auto& ctx: data_ctx_list){
777+
778+
process_count++;
779+
process_ctx_list.push_back(ctx);
780+
781+
if (process_ctx_list.size() < flowunit_group_->GetBatchSize()){
782+
if (process_count < data_ctx_list.size()){
783+
continue;
784+
}
785+
}
786+
787+
ret = Process(process_ctx_list);
778788
if (!ret) {
779789
return ret;
780790
}
781-
} else {
782-
SetLastError(data_ctx_list);
791+
792+
if (!GetOutputNames().empty()) {
793+
ret = Send(process_ctx_list);
794+
if (!ret) {
795+
return ret;
796+
}
797+
} else {
798+
SetLastError(process_ctx_list);
799+
}
800+
801+
process_ctx_list.clear();
783802
}
784803

785804
Clean(data_ctx_list);
786805
return STATUS_SUCCESS;
787806
}
788-
789807
void Node::SetLastError(
790808
std::list<std::shared_ptr<FlowUnitDataContext>>& data_ctx_list) {
791809
for (auto& data_ctx : data_ctx_list) {

src/libmodelbox/include/modelbox/flowunit_group.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class FlowUnitGroup {
6464

6565
Status Close();
6666

67+
uint32_t GetBatchSize() const;
68+
6769
private:
6870
std::weak_ptr<Node> node_;
6971
uint32_t batch_size_;

0 commit comments

Comments
 (0)