Skip to content

Commit c1af105

Browse files
committed
Some experimental fixes
1 parent b91bf5e commit c1af105

File tree

3 files changed

+30
-7
lines changed

3 files changed

+30
-7
lines changed

collector/lib/Pipeline.h

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,30 @@ class Queue {
3939

4040
void push(const T& elem) {
4141
auto lock = write_lock();
42-
return inner_.push(elem);
42+
auto e = elem;
43+
inner_.push(std::move(e));
44+
state_changed_.notify_one();
4345
}
4446

4547
void push(T&& elem) {
4648
auto lock = write_lock();
47-
return inner_.push(elem);
49+
inner_.push(elem);
50+
state_changed_.notify_one();
4851
}
4952

5053
template <class... Args>
5154
decltype(auto) emplace(Args&&... args) {
5255
auto lock = write_lock();
53-
return inner_.emplace(std::forward<Args>(args)...);
56+
decltype(auto) out = inner_.emplace(std::forward<Args>(args)...);
57+
state_changed_.notify_one();
58+
return out;
5459
}
5560

5661
T pop() {
5762
auto lock = write_lock();
63+
if (empty()) {
64+
state_changed_.wait(lock, [this]() { return empty(); });
65+
}
5866
T data = inner_.front();
5967
inner_.pop();
6068
return data;
@@ -72,14 +80,19 @@ class Queue {
7280
std::queue<T> inner_;
7381

7482
mutable std::shared_mutex mx_;
83+
mutable std::condition_variable_any state_changed_;
7584
};
7685

7786
template <class T>
7887
class Producer {
7988
public:
8089
Producer(std::shared_ptr<Queue<T>>& output) : output_(output) {}
8190

82-
~Producer() { Stop(); }
91+
~Producer() {
92+
if (thread_.running()) {
93+
Stop();
94+
}
95+
}
8396

8497
virtual T next() = 0;
8598

@@ -108,7 +121,11 @@ class Consumer {
108121
public:
109122
Consumer(std::shared_ptr<Queue<T>>& input) : input_(input) {}
110123

111-
~Consumer() { Stop(); }
124+
~Consumer() {
125+
if (thread_.running()) {
126+
Stop();
127+
}
128+
}
112129

113130
virtual void handle(const T& event) = 0;
114131

collector/lib/StoppableThread.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "StoppableThread.h"
22

3+
#include <chrono>
34
#include <iostream>
5+
#include <thread>
46
#include <unistd.h>
57

68
#include "Utility.h"
@@ -44,6 +46,10 @@ void StoppableThread::Stop() {
4446
}
4547
break;
4648
}
49+
if (!thread_->joinable()) {
50+
CLOG(WARNING) << "thread not yet joinable...";
51+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
52+
}
4753
thread_->join();
4854
thread_.reset();
4955
int rv = close(stop_pipe_[0]);

collector/test/PipelineTests.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class IntProducer : public Producer<int> {
1717
int next() override {
1818
n_++;
1919
if (n_ > limit_) {
20-
Stop();
20+
return limit_;
2121
}
2222
return n_;
2323
}
@@ -54,7 +54,7 @@ class EvenIntFilter : public Filter<int> {
5454
};
5555

5656
TEST(PipelineTests, TestBasic) {
57-
std::shared_ptr<Queue<int>> queue;
57+
std::shared_ptr<Queue<int>> queue = std::make_shared<Queue<int>>();
5858

5959
std::vector<int> output;
6060

0 commit comments

Comments
 (0)