Skip to content

Commit d92336e

Browse files
committed
Initial commit of pipeline library
1 parent f555c23 commit d92336e

File tree

2 files changed

+177
-0
lines changed

2 files changed

+177
-0
lines changed

collector/lib/Pipeline.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
#include "Pipeline.h"

collector/lib/Pipeline.h

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
#ifndef _COLLECTOR_PIPELINE_H
2+
#define _COLLECTOR_PIPELINE_H
3+
4+
#include <memory>
5+
#include <mutex>
6+
#include <optional>
7+
#include <queue>
8+
#include <shared_mutex>
9+
10+
#include "StoppableThread.h"
11+
12+
namespace collector {
13+
14+
template <typename T>
15+
class Queue {
16+
public:
17+
Queue() {}
18+
~Queue() {}
19+
20+
T front() {
21+
auto lock = read_lock();
22+
return inner_.front();
23+
}
24+
25+
T back() {
26+
auto lock = read_lock();
27+
return inner_.back();
28+
}
29+
30+
bool empty() {
31+
auto lock = read_lock();
32+
return inner_.empty();
33+
}
34+
35+
size_t size() {
36+
auto lock = read_lock();
37+
return inner_.size();
38+
}
39+
40+
void push(const T& elem) {
41+
auto lock = write_lock();
42+
return inner_.push(elem);
43+
}
44+
45+
void push(T&& elem) {
46+
auto lock = write_lock();
47+
return inner_.push(elem);
48+
}
49+
50+
template <class... Args>
51+
decltype(auto) emplace(Args&&... args) {
52+
auto lock = write_lock();
53+
return inner_.emplace(std::forward<Args>(args)...);
54+
}
55+
56+
T pop() {
57+
auto lock = write_lock();
58+
T data = inner_.front();
59+
inner_.pop();
60+
return data;
61+
}
62+
63+
std::shared_lock<std::shared_mutex> read_lock() {
64+
return std::shared_lock(mx_);
65+
}
66+
67+
std::unique_lock<std::shared_mutex> write_lock() {
68+
return std::unique_lock(mx_);
69+
}
70+
71+
private:
72+
std::queue<T> inner_;
73+
74+
std::shared_mutex mx_;
75+
};
76+
77+
template <class T>
78+
class Producer {
79+
public:
80+
Producer(std::shared_ptr<Queue<T>>& output) : output_(output) {}
81+
82+
~Producer() { Stop(); }
83+
84+
virtual T next();
85+
86+
void Start() {
87+
thread_.Start([this] { Run(); });
88+
}
89+
90+
void Stop() {
91+
thread_.Stop();
92+
}
93+
94+
void Run() {
95+
while (!thread_.should_stop()) {
96+
auto event = next();
97+
output_->push(event);
98+
}
99+
}
100+
101+
protected:
102+
std::shared_ptr<Queue<T>>& output_;
103+
StoppableThread thread_;
104+
};
105+
106+
template <class T>
107+
class Consumer {
108+
public:
109+
Consumer(std::shared_ptr<Queue<T>>& input) : input_(input) {}
110+
111+
~Consumer() { Stop(); }
112+
113+
virtual void handle(const T& event);
114+
115+
void Start() {
116+
thread_.Start([this] { Run(); });
117+
}
118+
119+
void Stop() {
120+
thread_.Stop();
121+
}
122+
123+
void Run() {
124+
while (!thread_.should_stop()) {
125+
auto event = input_->pop();
126+
handle(event);
127+
}
128+
}
129+
130+
protected:
131+
std::shared_ptr<Queue<T>>& input_;
132+
StoppableThread thread_;
133+
};
134+
135+
template <class In, class Out>
136+
class Transformer {
137+
public:
138+
Transformer(std::shared_ptr<Queue<In>>& input, std::shared_ptr<Queue<Out>>& output)
139+
: input_(input), output_(output) {}
140+
141+
~Transformer() { Stop(); }
142+
143+
virtual std::optional<Out> transform(const In& event);
144+
145+
void Start() {
146+
thread_.Start([this] { Run(); });
147+
}
148+
149+
void Stop() {
150+
thread_.Stop();
151+
}
152+
153+
void Run() {
154+
while (!thread_.should_stop()) {
155+
auto event = input_->pop();
156+
auto transformed = transform(event);
157+
if (transformed.has_value()) {
158+
output_.push(transformed.value());
159+
}
160+
}
161+
}
162+
163+
protected:
164+
std::shared_ptr<Queue<In>> input_;
165+
std::shared_ptr<Queue<Out>> output_;
166+
167+
StoppableThread thread_;
168+
};
169+
170+
template <class T>
171+
using Filter = Transformer<T, T>;
172+
173+
} // namespace collector
174+
175+
#endif

0 commit comments

Comments
 (0)