Skip to content

Commit b91bf5e

Browse files
committed
Some fixes and basic test
1 parent d92336e commit b91bf5e

File tree

2 files changed

+81
-6
lines changed

2 files changed

+81
-6
lines changed

collector/lib/Pipeline.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ class Queue {
6060
return data;
6161
}
6262

63-
std::shared_lock<std::shared_mutex> read_lock() {
63+
std::shared_lock<std::shared_mutex> read_lock() const {
6464
return std::shared_lock(mx_);
6565
}
6666

67-
std::unique_lock<std::shared_mutex> write_lock() {
67+
std::unique_lock<std::shared_mutex> write_lock() const {
6868
return std::unique_lock(mx_);
6969
}
7070

7171
private:
7272
std::queue<T> inner_;
7373

74-
std::shared_mutex mx_;
74+
mutable std::shared_mutex mx_;
7575
};
7676

7777
template <class T>
@@ -81,7 +81,7 @@ class Producer {
8181

8282
~Producer() { Stop(); }
8383

84-
virtual T next();
84+
virtual T next() = 0;
8585

8686
void Start() {
8787
thread_.Start([this] { Run(); });
@@ -110,7 +110,7 @@ class Consumer {
110110

111111
~Consumer() { Stop(); }
112112

113-
virtual void handle(const T& event);
113+
virtual void handle(const T& event) = 0;
114114

115115
void Start() {
116116
thread_.Start([this] { Run(); });
@@ -140,7 +140,7 @@ class Transformer {
140140

141141
~Transformer() { Stop(); }
142142

143-
virtual std::optional<Out> transform(const In& event);
143+
virtual std::optional<Out> transform(const In& event) = 0;
144144

145145
void Start() {
146146
thread_.Start([this] { Run(); });

collector/test/PipelineTests.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#include <chrono>
2+
#include <memory>
3+
#include <ratio>
4+
#include <thread>
5+
#include <vector>
6+
7+
#include "Pipeline.h"
8+
#include "gmock/gmock.h"
9+
#include "gtest/gtest.h"
10+
11+
namespace collector {
12+
13+
class IntProducer : public Producer<int> {
14+
public:
15+
IntProducer(std::shared_ptr<Queue<int>>& input, int limit) : Producer(input), limit_(limit) {}
16+
17+
int next() override {
18+
n_++;
19+
if (n_ > limit_) {
20+
Stop();
21+
}
22+
return n_;
23+
}
24+
25+
private:
26+
int n_ = 0;
27+
int limit_;
28+
};
29+
30+
class IntConsumer : public Consumer<int> {
31+
public:
32+
IntConsumer(std::shared_ptr<Queue<int>>& input, std::vector<int> output) : Consumer(input), events_(output) {}
33+
34+
void handle(const int& event) override {
35+
events_.push_back(event);
36+
}
37+
38+
std::vector<int>& Events() {
39+
return events_;
40+
}
41+
42+
private:
43+
std::vector<int> events_;
44+
};
45+
46+
class EvenIntFilter : public Filter<int> {
47+
public:
48+
std::optional<int> transform(const int& event) {
49+
if (event % 2 == 0) {
50+
return {event};
51+
}
52+
return std::nullopt;
53+
}
54+
};
55+
56+
TEST(PipelineTests, TestBasic) {
57+
std::shared_ptr<Queue<int>> queue;
58+
59+
std::vector<int> output;
60+
61+
std::unique_ptr<Producer<int>> producer = std::make_unique<IntProducer>(queue, 10);
62+
std::unique_ptr<Consumer<int>> consumer = std::make_unique<IntConsumer>(queue, output);
63+
64+
producer->Start();
65+
consumer->Start();
66+
67+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
68+
69+
consumer->Stop();
70+
producer->Stop();
71+
72+
EXPECT_TRUE(output.size() == 10);
73+
}
74+
75+
} // namespace collector

0 commit comments

Comments
 (0)