1
1
#ifndef _COLLECTOR_PIPELINE_H
2
2
#define _COLLECTOR_PIPELINE_H
3
3
4
+ #include < chrono>
5
+ #include < condition_variable>
4
6
#include < memory>
5
7
#include < mutex>
6
8
#include < optional>
@@ -17,16 +19,6 @@ class Queue {
17
19
Queue () {}
18
20
~Queue () {}
19
21
20
- T front () {
21
- auto lock = read_lock ();
22
- return inner_.front ();
23
- }
24
-
25
- T back () {
26
- auto lock = read_lock ();
27
- return inner_.back ();
28
- }
29
-
30
22
bool empty () {
31
23
auto lock = read_lock ();
32
24
return inner_.empty ();
@@ -38,34 +30,36 @@ class Queue {
38
30
}
39
31
40
32
void push (const T& elem) {
41
- auto lock = write_lock ();
42
- auto e = elem;
43
- inner_.push (std::move (e));
33
+ {
34
+ auto lock = write_lock ();
35
+ auto e = elem;
36
+ inner_.push (std::move (e));
37
+ }
44
38
state_changed_.notify_one ();
45
39
}
46
40
47
41
void push (T&& elem) {
48
- auto lock = write_lock ();
49
- inner_.push (elem);
42
+ {
43
+ auto lock = write_lock ();
44
+ inner_.push (elem);
45
+ }
50
46
state_changed_.notify_one ();
51
47
}
52
48
53
- template <class ... Args>
54
- decltype (auto ) emplace(Args&&... args) {
49
+ std::optional<T> pop (std::chrono::milliseconds wait_max = std::chrono::milliseconds(10 )) {
55
50
auto lock = write_lock ();
56
- decltype ( auto ) out = inner_.emplace (std::forward<Args>(args)...);
57
- state_changed_. notify_one ();
58
- return out ;
59
- }
51
+ if ( inner_.empty ()) {
52
+ auto pred = [ this ]() {
53
+ return !inner_. empty () ;
54
+ };
60
55
61
- T pop () {
62
- auto lock = write_lock ();
63
- if (empty ()) {
64
- state_changed_.wait (lock, [this ]() { return empty (); });
56
+ if (!state_changed_.wait_for (lock, wait_max, pred)) {
57
+ return std::nullopt;
58
+ }
65
59
}
66
60
T data = inner_.front ();
67
61
inner_.pop ();
68
- return data;
62
+ return { data} ;
69
63
}
70
64
71
65
std::shared_lock<std::shared_mutex> read_lock () const {
@@ -94,7 +88,7 @@ class Producer {
94
88
}
95
89
}
96
90
97
- virtual T next () = 0;
91
+ virtual std::optional<T> next () = 0;
98
92
99
93
void Start () {
100
94
thread_.Start ([this ] { Run (); });
@@ -107,7 +101,10 @@ class Producer {
107
101
void Run () {
108
102
while (!thread_.should_stop ()) {
109
103
auto event = next ();
110
- output_->push (event);
104
+ if (!event.has_value ()) {
105
+ break ;
106
+ }
107
+ output_->push (event.value ());
111
108
}
112
109
}
113
110
@@ -140,7 +137,10 @@ class Consumer {
140
137
void Run () {
141
138
while (!thread_.should_stop ()) {
142
139
auto event = input_->pop ();
143
- handle (event);
140
+ if (!event.has_value ()) {
141
+ continue ;
142
+ }
143
+ handle (event.value ());
144
144
}
145
145
}
146
146
@@ -170,7 +170,11 @@ class Transformer {
170
170
void Run () {
171
171
while (!thread_.should_stop ()) {
172
172
auto event = input_->pop ();
173
- auto transformed = transform (event);
173
+ if (!event.has_value ()) {
174
+ continue ;
175
+ }
176
+
177
+ auto transformed = transform (event.value ());
174
178
if (transformed.has_value ()) {
175
179
output_.push (transformed.value ());
176
180
}
0 commit comments