diff --git a/collector/lib/CMakeLists.txt b/collector/lib/CMakeLists.txt index 1d5386e7d1..626d582401 100644 --- a/collector/lib/CMakeLists.txt +++ b/collector/lib/CMakeLists.txt @@ -1,6 +1,7 @@ file(GLOB COLLECTOR_LIB_SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/events/*.cpp ) add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES}) diff --git a/collector/lib/CollectorService.cpp b/collector/lib/CollectorService.cpp index 71b6508cee..a5a1e0f842 100644 --- a/collector/lib/CollectorService.cpp +++ b/collector/lib/CollectorService.cpp @@ -27,7 +27,7 @@ static const std::string PROMETHEUS_PORT = "9090"; CollectorService::CollectorService(CollectorConfig& config, std::atomic* control, const std::atomic* signum) : config_(config), - system_inspector_(config_), + system_inspector_(config_, event_dispatcher_), control_(control), signum_(*signum), server_(OPTIONS), diff --git a/collector/lib/CollectorService.h b/collector/lib/CollectorService.h index 01f6ee2a80..a1e673238e 100644 --- a/collector/lib/CollectorService.h +++ b/collector/lib/CollectorService.h @@ -10,6 +10,7 @@ #include "Control.h" #include "NetworkStatusInspector.h" #include "NetworkStatusNotifier.h" +#include "events/Dispatcher.h" #include "system-inspector/Service.h" namespace collector { @@ -31,6 +32,8 @@ class CollectorService { private: bool WaitForGRPCServer(); + collector::events::EventDispatcher event_dispatcher_; + CollectorConfig& config_; system_inspector::Service system_inspector_; diff --git a/collector/lib/Pipeline.cpp b/collector/lib/Pipeline.cpp new file mode 100644 index 0000000000..9fdb2eb755 --- /dev/null +++ b/collector/lib/Pipeline.cpp @@ -0,0 +1,2 @@ + +#include "Pipeline.h" diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h new file mode 100644 index 0000000000..f318ac0acd --- /dev/null +++ b/collector/lib/Pipeline.h @@ -0,0 +1,18 @@ +#ifndef _COLLECTOR_PIPELINE_H +#define _COLLECTOR_PIPELINE_H + +#include +#include +#include +#include +#include +#include +#include + +#include "StoppableThread.h" + +namespace collector { + +} // namespace collector + +#endif diff --git a/collector/lib/Process.cpp b/collector/lib/Process.cpp index 632d824a03..2b4e4ee1cb 100644 --- a/collector/lib/Process.cpp +++ b/collector/lib/Process.cpp @@ -28,6 +28,10 @@ const std::shared_ptr ProcessStore::Fetch(uint64_t pid) { return cached_process; } +std::string Process::name() const { + return exe(); +} + std::string Process::container_id() const { WaitForProcessInfo(); diff --git a/collector/lib/Process.h b/collector/lib/Process.h index 8fb346984d..4b17ac8655 100644 --- a/collector/lib/Process.h +++ b/collector/lib/Process.h @@ -8,6 +8,10 @@ #include #include +#include "libsinsp/sinsp.h" + +#include "system-inspector/EventExtractor.h" + // forward declarations class sinsp_threadinfo; namespace collector { @@ -44,6 +48,7 @@ class ProcessStore { class IProcess { public: virtual uint64_t pid() const = 0; + virtual std::string name() const = 0; virtual std::string container_id() const = 0; virtual std::string comm() const = 0; virtual std::string exe() const = 0; @@ -61,6 +66,7 @@ class IProcess { class Process : public IProcess { public: inline uint64_t pid() const override { return pid_; } + std::string name() const override; std::string container_id() const override; std::string comm() const override; std::string exe() const override; @@ -70,6 +76,7 @@ class Process : public IProcess { /* - when 'cache' is provided, this process will remove itself from it upon deletion. * - 'instance' is used to request the process information from the system. */ Process(uint64_t pid, ProcessStore::MapRef cache = 0, system_inspector::Service* instance = 0); + ~Process(); private: diff --git a/collector/lib/StoppableThread.cpp b/collector/lib/StoppableThread.cpp index 13e05444d2..97e2557d38 100644 --- a/collector/lib/StoppableThread.cpp +++ b/collector/lib/StoppableThread.cpp @@ -1,6 +1,8 @@ #include "StoppableThread.h" +#include #include +#include #include #include "Utility.h" @@ -44,6 +46,10 @@ void StoppableThread::Stop() { } break; } + if (!thread_->joinable()) { + CLOG(WARNING) << "thread not yet joinable..."; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } thread_->join(); thread_.reset(); int rv = close(stop_pipe_[0]); diff --git a/collector/lib/events/Dispatcher.h b/collector/lib/events/Dispatcher.h new file mode 100644 index 0000000000..b966990822 --- /dev/null +++ b/collector/lib/events/Dispatcher.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include "IEvent.h" +#include "events/handlers/ProcessHandler.h" + +namespace collector::events { +namespace { + +// The idea here is compile time dispatching of events to the relevant handler(s) +// rather than having to do runtime lookups in maps or more complex (and slow) +// casting of handler types. +// +// This can be optimized by the compiler quite aggressively and this will form +// most of the hot path of event dispatching from the several event sources we +// can support +template +void dispatch(const Tuple& handlers, const Event& event) { + if constexpr (I < std::tuple_size_v) { + if (auto* concrete_event = dynamic_cast::EventType*>(&event)) { + std::get(handlers).Handle(*concrete_event); + } + dispatch(handlers, event); + } +} +} // namespace + +template +class StaticEventDispatcher { + public: + StaticEventDispatcher() {} + + void Dispatch(const IEvent& event) { + dispatch(handlers_, event); + } + + private: + std::tuple handlers_; +}; + +using EventDispatcher = StaticEventDispatcher; + +} // namespace collector::events diff --git a/collector/lib/events/IEvent.h b/collector/lib/events/IEvent.h new file mode 100644 index 0000000000..ddf3a2c676 --- /dev/null +++ b/collector/lib/events/IEvent.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include "Process.h" + +namespace collector::events { + +enum EventType { + ProcessStart, + NetworkConnection, +}; + +class IEvent { + public: + virtual ~IEvent() = default; + + virtual EventType Type() const = 0; +}; + +using IEventPtr = std::shared_ptr; + +class ProcessStartEvent : public IEvent { + public: + ProcessStartEvent(const std::shared_ptr& process) : process_(process) {} + + EventType Type() const { + return EventType::ProcessStart; + } + + friend std::ostream& operator<<(std::ostream& stream, const ProcessStartEvent& event) { + stream << *event.Process(); + return stream; + } + + const std::shared_ptr& Process() const { + return process_; + } + + private: + std::shared_ptr process_; +}; + +} // namespace collector::events diff --git a/collector/lib/events/handlers/Handler.h b/collector/lib/events/handlers/Handler.h new file mode 100644 index 0000000000..cf95419298 --- /dev/null +++ b/collector/lib/events/handlers/Handler.h @@ -0,0 +1,33 @@ +#pragma once + +namespace collector::events::handler { + +// An EventHandler is the owner of a processing pipeline +// for a given event type. +// +// This base class is intended to formalize the API between +// event dispatchers and event handlers, using CRTP. +// +// Note: C++20 would allow us to use concepts for this API +// definition, and this should be changed if/when we move to +// the newer standard. +// +// example derived handler: +// +// ```cpp +// class MyEventHandler : public EventHandler { +// public: +// void HandleImpl(const MyEventType& event) const { +// std::cout << event << std::endl; +// } +// } +// ``` +template +class EventHandler { + public: + void Handle(const EventType& event) const { + static_cast(this)->HandleImpl(event); + } +}; + +} // namespace collector::events::handler diff --git a/collector/lib/events/handlers/ProcessHandler.h b/collector/lib/events/handlers/ProcessHandler.h new file mode 100644 index 0000000000..d128f3d3fb --- /dev/null +++ b/collector/lib/events/handlers/ProcessHandler.h @@ -0,0 +1,35 @@ +#pragma once + +#include "events/IEvent.h" +#include "events/handlers/Handler.h" +#include "events/pipeline/Pipeline.h" +#include "events/pipeline/nodes/ProcessProtoTransformer.h" +#include "events/pipeline/nodes/SignalServiceConsumer.h" + +namespace collector::events::handler { + +class ProcessHandler : public EventHandler { + public: + using EventType = ProcessStartEvent; + + using ProcessPipeline = collector::pipeline::Pipeline< + pipeline::ProcessProtoTransformer, + pipeline::SignalServiceConsumer>; + + ProcessHandler() { + process_pipeline_.Start(); + } + + ~ProcessHandler() { + process_pipeline_.Stop(); + } + + void HandleImpl(const ProcessStartEvent& event) const { + process_pipeline_.Push(event); + } + + private: + ProcessPipeline process_pipeline_; +}; + +} // namespace collector::events::handler diff --git a/collector/lib/events/pipeline/Nodes.h b/collector/lib/events/pipeline/Nodes.h new file mode 100644 index 0000000000..ca5a05fdf9 --- /dev/null +++ b/collector/lib/events/pipeline/Nodes.h @@ -0,0 +1,161 @@ +#pragma once + +#include +#include + +#include "Queue.h" +#include "StoppableThread.h" + +namespace collector::pipeline { + +template +class Producer { + public: + using InputType = void; + using OutputType = Out; + + Producer(const std::shared_ptr _ptr /* ignored */, const std::shared_ptr>& output) : output_(output) {} + + ~Producer() { + if (thread_.running()) { + Stop(); + } + } + + virtual std::optional next() = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = next(); + if (!event.has_value()) { + break; + } + output_->push(event.value()); + } + } + + protected: + std::shared_ptr>& output_; + StoppableThread thread_; +}; + +template +class Consumer { + public: + using InputType = In; + using OutputType = void; + + Consumer(const std::shared_ptr>& input, const std::shared_ptr _ptr /*ignored*/) : input_(input) {} + + ~Consumer() { + if (thread_.running()) { + Stop(); + } + } + + virtual void consume(const In& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + if (!input_) { + CLOG(DEBUG) << "No input queue for Consumer"; + break; + } + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + consume(event.value()); + } + } + + protected: + const std::shared_ptr>& input_; + StoppableThread thread_; +}; + +template +class Transformer { + public: + using InputType = In; + using OutputType = Out; + + Transformer(const std::shared_ptr>& input, const std::shared_ptr>& output) + : input_(input), output_(output) {} + + ~Transformer() { Stop(); } + + virtual std::optional transform(const In& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + + auto transformed = transform(event.value()); + if (transformed.has_value()) { + if (output_) { + output_->push(transformed.value()); + } + } + } + } + + protected: + const std::shared_ptr> input_; + const std::shared_ptr> output_; + + StoppableThread thread_; +}; + +template +using Filter = Transformer; + +template +class Splitter : public Transformer { + public: + using InputType = InOut; + using OutputType = InOut; + + Splitter(const std::shared_ptr> input, + std::vector>> outputs) : Transformer(input, nullptr), outputs_(outputs) { + } + + std::optional transform(const InOut& event) override { + for (const auto& queue : outputs_) { + queue->push(event); + } + return std::nullopt; + } + + private: + std::vector>> outputs_; +}; + +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/Pipeline.h b/collector/lib/events/pipeline/Pipeline.h new file mode 100644 index 0000000000..c335a6fbc9 --- /dev/null +++ b/collector/lib/events/pipeline/Pipeline.h @@ -0,0 +1,88 @@ +#pragma once + +#include +#include + +#include "events/pipeline/Queue.h" + +namespace collector::pipeline { + +template +class Pipeline { + using QueueTuple = std::tuple>...>; + struct Graph { + std::tuple...> nodes; + QueueTuple queues; + }; + + static Graph build_graph() { + auto queues = std::make_tuple(std::make_shared>()...); + auto nodes = build_nodes(queues, std::index_sequence_for{}); + return { + std::move(nodes), + std::move(queues), + }; + } + + template + static auto build_nodes(const std::tuple>...>& queues, + std::index_sequence) { + // This is a fold expression over the comma operator. + // It expands to: std::make_tuple(Node_0(...), Node_1(...), Node_2(...), ...) + return std::make_tuple( + std::make_unique( + std::get(queues), + get_output_queue(queues))...); + } + + // Helper to safely get the output queue for a given node index. + template + static auto get_output_queue( + const std::tuple>...>& queues) { + if constexpr (I + 1 < sizeof...(Nodes)) { + return std::get(queues); + } else { + // This is the last node in the pipeline (a Consumer). + // Its output type is `void`, so we pass a nullptr. + return nullptr; + } + } + + public: + using In = typename std::tuple_element_t<0, std::tuple>::InputType; + + Pipeline() : graph_(build_graph()) { + } + + void Start() { + start_nodes<0>(); + } + + void Stop() { + stop_nodes<0>(); + } + + void Push(const In& input) const { + std::get<0>(graph_.queues)->push(input); + } + + private: + Graph graph_; + + template + void start_nodes() { + if constexpr (I < sizeof...(Nodes)) { + std::get(graph_.nodes)->Start(); + start_nodes(); + } + } + + template + void stop_nodes() { + if constexpr (I < sizeof...(Nodes)) { + std::get(graph_.nodes)->Stop(); + start_nodes(); + } + } +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/Queue.h b/collector/lib/events/pipeline/Queue.h new file mode 100644 index 0000000000..7f49ea509b --- /dev/null +++ b/collector/lib/events/pipeline/Queue.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace collector::pipeline { +template +class Queue { + public: + Queue() {} + ~Queue() {} + + bool empty() { + auto lock = read_lock(); + return inner_.empty(); + } + + size_t size() { + auto lock = read_lock(); + return inner_.size(); + } + + void push(const T& elem) { + { + auto lock = write_lock(); + auto e = elem; + inner_.push(std::move(e)); + } + state_changed_.notify_one(); + } + + void push(T&& elem) { + { + auto lock = write_lock(); + inner_.push(elem); + } + state_changed_.notify_one(); + } + + std::optional pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) { + auto lock = write_lock(); + if (inner_.empty()) { + auto pred = [this]() { + return !inner_.empty(); + }; + + if (!state_changed_.wait_for(lock, wait_max, pred)) { + return std::nullopt; + } + } + T data = inner_.front(); + inner_.pop(); + return {data}; + } + + std::shared_lock read_lock() const { + return std::shared_lock(mx_); + } + + std::unique_lock write_lock() const { + return std::unique_lock(mx_); + } + + private: + std::queue inner_; + + mutable std::shared_mutex mx_; + mutable std::condition_variable_any state_changed_; +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/Debug.h b/collector/lib/events/pipeline/nodes/Debug.h new file mode 100644 index 0000000000..d41f087bf8 --- /dev/null +++ b/collector/lib/events/pipeline/nodes/Debug.h @@ -0,0 +1,18 @@ +#pragma once + +#include "events/pipeline/Nodes.h" + +namespace collector::pipeline { + +template +class DebugNode : public Transformer { + public: + DebugNode(const std::shared_ptr>& input, const std::shared_ptr>& output) + : Transformer(input, output) {} + + std::optional transform(const In& input) { + CLOG(INFO) << input; + return {input}; + } +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h new file mode 100644 index 0000000000..10d1f87393 --- /dev/null +++ b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +#include "internalapi/sensor/signal_iservice.pb.h" +#include "storage/process_indicator.pb.h" + +#include "ProtoAllocator.h" +#include "Utility.h" +#include "events/IEvent.h" +#include "events/pipeline/Nodes.h" + +namespace collector::pipeline { + +class ProcessProtoTransformer : public Transformer> { + public: + ProcessProtoTransformer( + const std::shared_ptr>& input, + const std::shared_ptr>>& output) + : Transformer>(input, output) {} + + std::optional> transform(const events::ProcessStartEvent& event) { + storage::ProcessSignal* proc_signal = allocator_.Allocate(); + + if (!proc_signal) { + return std::nullopt; + } + + proc_signal->set_id(UUIDStr()); + + auto process = event.Process(); + + proc_signal->set_name(process->name()); + proc_signal->set_exec_file_path(process->exe_path()); + proc_signal->set_container_id(process->container_id()); + proc_signal->set_pid(process->pid()); + proc_signal->set_args(process->args()); + + sensor::SignalStreamMessage* msg = allocator_.AllocateRoot(); + v1::Signal* signal = allocator_.Allocate(); + + signal->set_allocated_process_signal(proc_signal); + + msg->clear_collector_register_request(); + msg->set_allocated_signal(signal); + + std::shared_ptr ptr; + ptr.reset(msg); + return {ptr}; + } + + private: + ProtoAllocator allocator_; +}; + +} // namespace collector::pipeline diff --git a/collector/lib/system-inspector/Service.cpp b/collector/lib/system-inspector/Service.cpp index 95c0394416..87119fe32f 100644 --- a/collector/lib/system-inspector/Service.cpp +++ b/collector/lib/system-inspector/Service.cpp @@ -1,6 +1,7 @@ #include "Service.h" #include +#include #include #include @@ -19,28 +20,34 @@ #include "ContainerMetadata.h" #include "EventExtractor.h" #include "EventNames.h" +#include "FalcoProcess.h" #include "HostInfo.h" #include "KernelDriver.h" #include "Logging.h" #include "NetworkSignalHandler.h" +#include "Process.h" #include "ProcessSignalHandler.h" #include "SelfCheckHandler.h" #include "SelfChecks.h" #include "TimeUtil.h" #include "Utility.h" +#include "events/Dispatcher.h" +#include "events/IEvent.h" #include "logger.h" +#include "ppm_events_public.h" namespace collector::system_inspector { Service::~Service() = default; -Service::Service(const CollectorConfig& config) +Service::Service(const CollectorConfig& config, collector::events::EventDispatcher& dispatcher) : inspector_(std::make_unique(true)), container_metadata_inspector_(std::make_unique(inspector_.get())), default_formatter_(std::make_unique( inspector_.get(), DEFAULT_OUTPUT_STR, - EventExtractor::FilterList())) { + EventExtractor::FilterList())), + dispatcher_(dispatcher) { // Setup the inspector. // peeking into arguments has a big overhead, so we prevent it from happening inspector_->set_snaplen(0); @@ -241,6 +248,23 @@ void LogUnreasonableEventTime(int64_t time_micros, sinsp_evt* evt) { } } +events::IEventPtr Service::to_ievt(sinsp_evt* evt) { + std::bitset event_filter; + const EventNames& event_names = EventNames::GetInstance(); + for (ppm_event_code event_id : event_names.GetEventIDs("execve<")) { + event_filter.set(event_id); + } + + if (event_filter[evt->get_type()]) { + EventExtractor extractor; + extractor.Init(inspector_.get()); + auto proc = std::make_shared(evt, extractor); + events::IEventPtr ievt = std::make_shared(proc); + return ievt; + } + return std::nullptr_t(); +} + void Service::Run(const std::atomic& control) { if (!inspector_) { throw CollectorException("Invalid state: system inspector was not initialized"); @@ -254,6 +278,9 @@ void Service::Run(const std::atomic& control) { continue; } + auto ievt = to_ievt(evt); + dispatcher_.Dispatch(*ievt); + auto process_start = NowMicros(); for (auto it = signal_handlers_.begin(); it != signal_handlers_.end(); it++) { auto& signal_handler = *it; diff --git a/collector/lib/system-inspector/Service.h b/collector/lib/system-inspector/Service.h index 651e7ff7cb..1ac0ebfe3a 100644 --- a/collector/lib/system-inspector/Service.h +++ b/collector/lib/system-inspector/Service.h @@ -13,6 +13,8 @@ #include "SignalHandler.h" #include "SignalServiceClient.h" #include "SystemInspector.h" +#include "events/Dispatcher.h" +#include "events/IEvent.h" // forward declarations class sinsp; @@ -30,7 +32,7 @@ class Service : public SystemInspector { Service& operator=(Service&&) = delete; ~Service() override; - Service(const CollectorConfig& config); + Service(const CollectorConfig& config, collector::events::EventDispatcher& dispatcher); void Start() override; void Run(const std::atomic& control) override; void CleanUp() override; @@ -50,6 +52,8 @@ class Service : public SystemInspector { void AddSignalHandler(std::unique_ptr signal_handler); + events::IEventPtr to_ievt(sinsp_evt* evt); + private: FRIEND_TEST(SystemInspectorServiceTest, FilterEvent); @@ -85,6 +89,8 @@ class Service : public SystemInspector { mutable std::mutex process_requests_mutex_; // [ ( pid, callback ), ( pid, callback ), ... ] std::list> pending_process_requests_; + + collector::events::EventDispatcher& dispatcher_; }; } // namespace collector::system_inspector diff --git a/collector/test/ConnTrackerTest.cpp b/collector/test/ConnTrackerTest.cpp index a8457247cc..dbd028b3b9 100644 --- a/collector/test/ConnTrackerTest.cpp +++ b/collector/test/ConnTrackerTest.cpp @@ -1359,6 +1359,7 @@ class FakeProcess : public IProcess { args_(args) {} uint64_t pid() const override { return pid_; } + std::string name() const override { return ""; } std::string container_id() const override { return container_id_; } std::string comm() const override { return comm_; } std::string exe() const override { return exe_; } diff --git a/collector/test/NetworkStatusNotifierTest.cpp b/collector/test/NetworkStatusNotifierTest.cpp index 4af986ea94..5be96ebcc4 100644 --- a/collector/test/NetworkStatusNotifierTest.cpp +++ b/collector/test/NetworkStatusNotifierTest.cpp @@ -190,16 +190,19 @@ class NetworkConnectionInfoMessageParser { class NetworkStatusNotifierTest : public testing::Test { public: NetworkStatusNotifierTest() - : inspector(config), + : inspector(config, dispatcher_), conn_tracker(std::make_shared()), conn_scraper(std::make_unique()), comm(std::make_unique()), - net_status_notifier(conn_tracker, config, &inspector, nullptr) { + net_status_notifier(conn_tracker, config, &inspector, nullptr), + dispatcher_() { } protected: MockCollectorConfig config; system_inspector::Service inspector; + collector::events::EventDispatcher dispatcher_; + collector::events::handler::ProcessHandler process_handler_; std::shared_ptr conn_tracker; std::unique_ptr conn_scraper; std::unique_ptr comm; diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp new file mode 100644 index 0000000000..878a8a625c --- /dev/null +++ b/collector/test/PipelineTests.cpp @@ -0,0 +1,72 @@ +#include +#include +#include +#include +#include +#include + +#include "Pipeline.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +// namespace collector { +// +// class IntProducer : public Producer { +// public: +// IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} +// +// std::optional next() override { +// n_++; +// if (n_ > limit_) { +// return std::nullopt; +// } +// return {n_}; +// } +// +// private: +// int n_ = 0; +// int limit_; +// }; +// +// class IntConsumer : public Consumer { +// public: +// IntConsumer(std::shared_ptr>& input, std::vector& output) : Consumer(input), events_(output) {} +// +// void handle(const int& event) override { +// events_.push_back(event); +// } +// +// private: +// std::vector& events_; +// }; +// +// class EvenIntFilter : public Filter { +// public: +// std::optional transform(const int& event) { +// if (event % 2 == 0) { +// return {event}; +// } +// return std::nullopt; +// } +// }; +// +// TEST(PipelineTests, TestBasic) { +// std::shared_ptr> queue = std::make_shared>(); +// +// std::vector output; +// +// std::unique_ptr> producer = std::make_unique(queue, 10); +// std::unique_ptr> consumer = std::make_unique(queue, output); +// +// producer->Start(); +// consumer->Start(); +// +// std::this_thread::sleep_for(std::chrono::milliseconds(200)); +// +// consumer->Stop(); +// producer->Stop(); +// +// EXPECT_TRUE(output.size() == 10); +// } +// +// } // namespace collector