From d92336ed670630713e5ab295188d8fde2c6f2ddd Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Mon, 11 Nov 2024 09:12:55 +0000 Subject: [PATCH 1/8] Initial commit of pipeline library --- collector/lib/Pipeline.cpp | 2 + collector/lib/Pipeline.h | 175 +++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 collector/lib/Pipeline.cpp create mode 100644 collector/lib/Pipeline.h diff --git a/collector/lib/Pipeline.cpp b/collector/lib/Pipeline.cpp new file mode 100644 index 0000000000..9fdb2eb755 --- /dev/null +++ b/collector/lib/Pipeline.cpp @@ -0,0 +1,2 @@ + +#include "Pipeline.h" diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h new file mode 100644 index 0000000000..fbb4b309a6 --- /dev/null +++ b/collector/lib/Pipeline.h @@ -0,0 +1,175 @@ +#ifndef _COLLECTOR_PIPELINE_H +#define _COLLECTOR_PIPELINE_H + +#include +#include +#include +#include +#include + +#include "StoppableThread.h" + +namespace collector { + +template +class Queue { + public: + Queue() {} + ~Queue() {} + + T front() { + auto lock = read_lock(); + return inner_.front(); + } + + T back() { + auto lock = read_lock(); + return inner_.back(); + } + + bool empty() { + auto lock = read_lock(); + return inner_.empty(); + } + + size_t size() { + auto lock = read_lock(); + return inner_.size(); + } + + void push(const T& elem) { + auto lock = write_lock(); + return inner_.push(elem); + } + + void push(T&& elem) { + auto lock = write_lock(); + return inner_.push(elem); + } + + template + decltype(auto) emplace(Args&&... args) { + auto lock = write_lock(); + return inner_.emplace(std::forward(args)...); + } + + T pop() { + auto lock = write_lock(); + T data = inner_.front(); + inner_.pop(); + return data; + } + + std::shared_lock read_lock() { + return std::shared_lock(mx_); + } + + std::unique_lock write_lock() { + return std::unique_lock(mx_); + } + + private: + std::queue inner_; + + std::shared_mutex mx_; +}; + +template +class Producer { + public: + Producer(std::shared_ptr>& output) : output_(output) {} + + ~Producer() { Stop(); } + + virtual T next(); + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = next(); + output_->push(event); + } + } + + protected: + std::shared_ptr>& output_; + StoppableThread thread_; +}; + +template +class Consumer { + public: + Consumer(std::shared_ptr>& input) : input_(input) {} + + ~Consumer() { Stop(); } + + virtual void handle(const T& event); + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + handle(event); + } + } + + protected: + std::shared_ptr>& input_; + StoppableThread thread_; +}; + +template +class Transformer { + public: + Transformer(std::shared_ptr>& input, std::shared_ptr>& output) + : input_(input), output_(output) {} + + ~Transformer() { Stop(); } + + virtual std::optional transform(const In& event); + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + auto transformed = transform(event); + if (transformed.has_value()) { + output_.push(transformed.value()); + } + } + } + + protected: + std::shared_ptr> input_; + std::shared_ptr> output_; + + StoppableThread thread_; +}; + +template +using Filter = Transformer; + +} // namespace collector + +#endif From b91bf5e9f23fc0caf01e24b504662b8163b61774 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Mon, 11 Nov 2024 10:40:32 +0000 Subject: [PATCH 2/8] Some fixes and basic test --- collector/lib/Pipeline.h | 12 ++--- collector/test/PipelineTests.cpp | 75 ++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 collector/test/PipelineTests.cpp diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h index fbb4b309a6..ad15c4ab11 100644 --- a/collector/lib/Pipeline.h +++ b/collector/lib/Pipeline.h @@ -60,18 +60,18 @@ class Queue { return data; } - std::shared_lock read_lock() { + std::shared_lock read_lock() const { return std::shared_lock(mx_); } - std::unique_lock write_lock() { + std::unique_lock write_lock() const { return std::unique_lock(mx_); } private: std::queue inner_; - std::shared_mutex mx_; + mutable std::shared_mutex mx_; }; template @@ -81,7 +81,7 @@ class Producer { ~Producer() { Stop(); } - virtual T next(); + virtual T next() = 0; void Start() { thread_.Start([this] { Run(); }); @@ -110,7 +110,7 @@ class Consumer { ~Consumer() { Stop(); } - virtual void handle(const T& event); + virtual void handle(const T& event) = 0; void Start() { thread_.Start([this] { Run(); }); @@ -140,7 +140,7 @@ class Transformer { ~Transformer() { Stop(); } - virtual std::optional transform(const In& event); + virtual std::optional transform(const In& event) = 0; void Start() { thread_.Start([this] { Run(); }); diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp new file mode 100644 index 0000000000..b71716d544 --- /dev/null +++ b/collector/test/PipelineTests.cpp @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include + +#include "Pipeline.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace collector { + +class IntProducer : public Producer { + public: + IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} + + int next() override { + n_++; + if (n_ > limit_) { + Stop(); + } + return n_; + } + + private: + int n_ = 0; + int limit_; +}; + +class IntConsumer : public Consumer { + public: + IntConsumer(std::shared_ptr>& input, std::vector output) : Consumer(input), events_(output) {} + + void handle(const int& event) override { + events_.push_back(event); + } + + std::vector& Events() { + return events_; + } + + private: + std::vector events_; +}; + +class EvenIntFilter : public Filter { + public: + std::optional transform(const int& event) { + if (event % 2 == 0) { + return {event}; + } + return std::nullopt; + } +}; + +TEST(PipelineTests, TestBasic) { + std::shared_ptr> queue; + + std::vector output; + + std::unique_ptr> producer = std::make_unique(queue, 10); + std::unique_ptr> consumer = std::make_unique(queue, output); + + producer->Start(); + consumer->Start(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + consumer->Stop(); + producer->Stop(); + + EXPECT_TRUE(output.size() == 10); +} + +} // namespace collector From c1af1053fc4183f068f3b53b63c2e4c3ec0f48f4 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Tue, 12 Nov 2024 08:52:14 +0000 Subject: [PATCH 3/8] Some experimental fixes --- collector/lib/Pipeline.h | 27 ++++++++++++++++++++++----- collector/lib/StoppableThread.cpp | 6 ++++++ collector/test/PipelineTests.cpp | 4 ++-- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h index ad15c4ab11..20e244792a 100644 --- a/collector/lib/Pipeline.h +++ b/collector/lib/Pipeline.h @@ -39,22 +39,30 @@ class Queue { void push(const T& elem) { auto lock = write_lock(); - return inner_.push(elem); + auto e = elem; + inner_.push(std::move(e)); + state_changed_.notify_one(); } void push(T&& elem) { auto lock = write_lock(); - return inner_.push(elem); + inner_.push(elem); + state_changed_.notify_one(); } template decltype(auto) emplace(Args&&... args) { auto lock = write_lock(); - return inner_.emplace(std::forward(args)...); + decltype(auto) out = inner_.emplace(std::forward(args)...); + state_changed_.notify_one(); + return out; } T pop() { auto lock = write_lock(); + if (empty()) { + state_changed_.wait(lock, [this]() { return empty(); }); + } T data = inner_.front(); inner_.pop(); return data; @@ -72,6 +80,7 @@ class Queue { std::queue inner_; mutable std::shared_mutex mx_; + mutable std::condition_variable_any state_changed_; }; template @@ -79,7 +88,11 @@ class Producer { public: Producer(std::shared_ptr>& output) : output_(output) {} - ~Producer() { Stop(); } + ~Producer() { + if (thread_.running()) { + Stop(); + } + } virtual T next() = 0; @@ -108,7 +121,11 @@ class Consumer { public: Consumer(std::shared_ptr>& input) : input_(input) {} - ~Consumer() { Stop(); } + ~Consumer() { + if (thread_.running()) { + Stop(); + } + } virtual void handle(const T& event) = 0; diff --git a/collector/lib/StoppableThread.cpp b/collector/lib/StoppableThread.cpp index 13e05444d2..97e2557d38 100644 --- a/collector/lib/StoppableThread.cpp +++ b/collector/lib/StoppableThread.cpp @@ -1,6 +1,8 @@ #include "StoppableThread.h" +#include #include +#include #include #include "Utility.h" @@ -44,6 +46,10 @@ void StoppableThread::Stop() { } break; } + if (!thread_->joinable()) { + CLOG(WARNING) << "thread not yet joinable..."; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } thread_->join(); thread_.reset(); int rv = close(stop_pipe_[0]); diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp index b71716d544..f0213b5722 100644 --- a/collector/test/PipelineTests.cpp +++ b/collector/test/PipelineTests.cpp @@ -17,7 +17,7 @@ class IntProducer : public Producer { int next() override { n_++; if (n_ > limit_) { - Stop(); + return limit_; } return n_; } @@ -54,7 +54,7 @@ class EvenIntFilter : public Filter { }; TEST(PipelineTests, TestBasic) { - std::shared_ptr> queue; + std::shared_ptr> queue = std::make_shared>(); std::vector output; From 2aec857e186256d5463cdd3eaeb1565e31422e8f Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Tue, 12 Nov 2024 13:44:55 +0000 Subject: [PATCH 4/8] Make it not do those segfault thingies --- collector/lib/Pipeline.h | 64 +++++++++++++++++--------------- collector/test/PipelineTests.cpp | 15 +++----- 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h index 20e244792a..0f39db8a40 100644 --- a/collector/lib/Pipeline.h +++ b/collector/lib/Pipeline.h @@ -1,6 +1,8 @@ #ifndef _COLLECTOR_PIPELINE_H #define _COLLECTOR_PIPELINE_H +#include +#include #include #include #include @@ -17,16 +19,6 @@ class Queue { Queue() {} ~Queue() {} - T front() { - auto lock = read_lock(); - return inner_.front(); - } - - T back() { - auto lock = read_lock(); - return inner_.back(); - } - bool empty() { auto lock = read_lock(); return inner_.empty(); @@ -38,34 +30,36 @@ class Queue { } void push(const T& elem) { - auto lock = write_lock(); - auto e = elem; - inner_.push(std::move(e)); + { + auto lock = write_lock(); + auto e = elem; + inner_.push(std::move(e)); + } state_changed_.notify_one(); } void push(T&& elem) { - auto lock = write_lock(); - inner_.push(elem); + { + auto lock = write_lock(); + inner_.push(elem); + } state_changed_.notify_one(); } - template - decltype(auto) emplace(Args&&... args) { + std::optional pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) { auto lock = write_lock(); - decltype(auto) out = inner_.emplace(std::forward(args)...); - state_changed_.notify_one(); - return out; - } + if (inner_.empty()) { + auto pred = [this]() { + return !inner_.empty(); + }; - T pop() { - auto lock = write_lock(); - if (empty()) { - state_changed_.wait(lock, [this]() { return empty(); }); + if (!state_changed_.wait_for(lock, wait_max, pred)) { + return std::nullopt; + } } T data = inner_.front(); inner_.pop(); - return data; + return {data}; } std::shared_lock read_lock() const { @@ -94,7 +88,7 @@ class Producer { } } - virtual T next() = 0; + virtual std::optional next() = 0; void Start() { thread_.Start([this] { Run(); }); @@ -107,7 +101,10 @@ class Producer { void Run() { while (!thread_.should_stop()) { auto event = next(); - output_->push(event); + if (!event.has_value()) { + break; + } + output_->push(event.value()); } } @@ -140,7 +137,10 @@ class Consumer { void Run() { while (!thread_.should_stop()) { auto event = input_->pop(); - handle(event); + if (!event.has_value()) { + continue; + } + handle(event.value()); } } @@ -170,7 +170,11 @@ class Transformer { void Run() { while (!thread_.should_stop()) { auto event = input_->pop(); - auto transformed = transform(event); + if (!event.has_value()) { + continue; + } + + auto transformed = transform(event.value()); if (transformed.has_value()) { output_.push(transformed.value()); } diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp index f0213b5722..a807ecc64a 100644 --- a/collector/test/PipelineTests.cpp +++ b/collector/test/PipelineTests.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -14,12 +15,12 @@ class IntProducer : public Producer { public: IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} - int next() override { + std::optional next() override { n_++; if (n_ > limit_) { - return limit_; + return std::nullopt; } - return n_; + return {n_}; } private: @@ -29,18 +30,14 @@ class IntProducer : public Producer { class IntConsumer : public Consumer { public: - IntConsumer(std::shared_ptr>& input, std::vector output) : Consumer(input), events_(output) {} + IntConsumer(std::shared_ptr>& input, std::vector& output) : Consumer(input), events_(output) {} void handle(const int& event) override { events_.push_back(event); } - std::vector& Events() { - return events_; - } - private: - std::vector events_; + std::vector& events_; }; class EvenIntFilter : public Filter { From 7bc5ad3e776f0996e0d0a1bfc69e87baaf73e237 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Mon, 23 Jun 2025 10:01:07 +0100 Subject: [PATCH 5/8] Initial dispatcher plumbing --- collector/lib/CMakeLists.txt | 1 + collector/lib/CollectorService.cpp | 5 +++-- collector/lib/CollectorService.h | 5 +++++ collector/lib/system-inspector/Service.cpp | 13 +++++++++++-- collector/lib/system-inspector/Service.h | 5 ++++- collector/test/NetworkStatusNotifierTest.cpp | 7 +++++-- 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/collector/lib/CMakeLists.txt b/collector/lib/CMakeLists.txt index 1d5386e7d1..626d582401 100644 --- a/collector/lib/CMakeLists.txt +++ b/collector/lib/CMakeLists.txt @@ -1,6 +1,7 @@ file(GLOB COLLECTOR_LIB_SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/events/*.cpp ) add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES}) diff --git a/collector/lib/CollectorService.cpp b/collector/lib/CollectorService.cpp index 71b6508cee..4674c3bf93 100644 --- a/collector/lib/CollectorService.cpp +++ b/collector/lib/CollectorService.cpp @@ -27,13 +27,14 @@ static const std::string PROMETHEUS_PORT = "9090"; CollectorService::CollectorService(CollectorConfig& config, std::atomic* control, const std::atomic* signum) : config_(config), - system_inspector_(config_), + system_inspector_(config_, event_dispatcher_), control_(control), signum_(*signum), server_(OPTIONS), exposer_(PROMETHEUS_PORT), exporter_(&config_, &system_inspector_), - config_loader_(config_) { + config_loader_(config_), + event_dispatcher_(process_handler_) { CLOG(INFO) << "Config: " << config_; // Network tracking diff --git a/collector/lib/CollectorService.h b/collector/lib/CollectorService.h index 01f6ee2a80..cc9a6b8923 100644 --- a/collector/lib/CollectorService.h +++ b/collector/lib/CollectorService.h @@ -10,6 +10,8 @@ #include "Control.h" #include "NetworkStatusInspector.h" #include "NetworkStatusNotifier.h" +#include "events/Dispatcher.h" +#include "events/handlers/ProcessHandler.h" #include "system-inspector/Service.h" namespace collector { @@ -52,6 +54,9 @@ class CollectorService { std::unique_ptr net_status_notifier_; std::shared_ptr process_store_; std::shared_ptr network_connection_info_service_comm_; + + collector::events::EventDispatcher event_dispatcher_; + collector::events::handler::ProcessHandler process_handler_; }; } // namespace collector diff --git a/collector/lib/system-inspector/Service.cpp b/collector/lib/system-inspector/Service.cpp index 95c0394416..67d0099b6e 100644 --- a/collector/lib/system-inspector/Service.cpp +++ b/collector/lib/system-inspector/Service.cpp @@ -28,19 +28,21 @@ #include "SelfChecks.h" #include "TimeUtil.h" #include "Utility.h" +#include "events/Dispatcher.h" #include "logger.h" namespace collector::system_inspector { Service::~Service() = default; -Service::Service(const CollectorConfig& config) +Service::Service(const CollectorConfig& config, collector::events::EventDispatcher& dispatcher) : inspector_(std::make_unique(true)), container_metadata_inspector_(std::make_unique(inspector_.get())), default_formatter_(std::make_unique( inspector_.get(), DEFAULT_OUTPUT_STR, - EventExtractor::FilterList())) { + EventExtractor::FilterList())), + dispatcher_(dispatcher) { // Setup the inspector. // peeking into arguments has a big overhead, so we prevent it from happening inspector_->set_snaplen(0); @@ -241,6 +243,10 @@ void LogUnreasonableEventTime(int64_t time_micros, sinsp_evt* evt) { } } +events::IEventPtr to_ievt(sinsp_evt* evt) { + return std::make_shared(); +} + void Service::Run(const std::atomic& control) { if (!inspector_) { throw CollectorException("Invalid state: system inspector was not initialized"); @@ -254,6 +260,9 @@ void Service::Run(const std::atomic& control) { continue; } + auto ievt = to_ievt(evt); + dispatcher_.Dispatch(*ievt); + auto process_start = NowMicros(); for (auto it = signal_handlers_.begin(); it != signal_handlers_.end(); it++) { auto& signal_handler = *it; diff --git a/collector/lib/system-inspector/Service.h b/collector/lib/system-inspector/Service.h index 651e7ff7cb..96df977009 100644 --- a/collector/lib/system-inspector/Service.h +++ b/collector/lib/system-inspector/Service.h @@ -13,6 +13,7 @@ #include "SignalHandler.h" #include "SignalServiceClient.h" #include "SystemInspector.h" +#include "events/Dispatcher.h" // forward declarations class sinsp; @@ -30,7 +31,7 @@ class Service : public SystemInspector { Service& operator=(Service&&) = delete; ~Service() override; - Service(const CollectorConfig& config); + Service(const CollectorConfig& config, collector::events::EventDispatcher& dispatcher); void Start() override; void Run(const std::atomic& control) override; void CleanUp() override; @@ -85,6 +86,8 @@ class Service : public SystemInspector { mutable std::mutex process_requests_mutex_; // [ ( pid, callback ), ( pid, callback ), ... ] std::list> pending_process_requests_; + + collector::events::EventDispatcher dispatcher_; }; } // namespace collector::system_inspector diff --git a/collector/test/NetworkStatusNotifierTest.cpp b/collector/test/NetworkStatusNotifierTest.cpp index 4af986ea94..233ac1e146 100644 --- a/collector/test/NetworkStatusNotifierTest.cpp +++ b/collector/test/NetworkStatusNotifierTest.cpp @@ -190,16 +190,19 @@ class NetworkConnectionInfoMessageParser { class NetworkStatusNotifierTest : public testing::Test { public: NetworkStatusNotifierTest() - : inspector(config), + : inspector(config, dispatcher_), conn_tracker(std::make_shared()), conn_scraper(std::make_unique()), comm(std::make_unique()), - net_status_notifier(conn_tracker, config, &inspector, nullptr) { + net_status_notifier(conn_tracker, config, &inspector, nullptr), + dispatcher_(process_handler_) { } protected: MockCollectorConfig config; system_inspector::Service inspector; + collector::events::EventDispatcher dispatcher_; + collector::events::handler::ProcessHandler process_handler_; std::shared_ptr conn_tracker; std::unique_ptr conn_scraper; std::unique_ptr comm; From 16c15b005094184f922a819f160030e861dac1b5 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Tue, 24 Jun 2025 13:55:53 +0100 Subject: [PATCH 6/8] Add event library --- collector/lib/CollectorService.cpp | 2 +- collector/lib/events/Dispatcher.h | 45 +++++++++++++++++++ collector/lib/events/IEvent.h | 28 ++++++++++++ collector/lib/events/Thing.cpp | 5 +++ collector/lib/events/handlers/Handler.h | 13 ++++++ .../lib/events/handlers/ProcessHandler.h | 24 ++++++++++ collector/test/NetworkStatusNotifierTest.cpp | 2 +- 7 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 collector/lib/events/Dispatcher.h create mode 100644 collector/lib/events/IEvent.h create mode 100644 collector/lib/events/Thing.cpp create mode 100644 collector/lib/events/handlers/Handler.h create mode 100644 collector/lib/events/handlers/ProcessHandler.h diff --git a/collector/lib/CollectorService.cpp b/collector/lib/CollectorService.cpp index 4674c3bf93..cc324c2766 100644 --- a/collector/lib/CollectorService.cpp +++ b/collector/lib/CollectorService.cpp @@ -34,7 +34,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic +#include + +#include "IEvent.h" +#include "events/handlers/ProcessHandler.h" + +namespace collector::events { +namespace { + +// The idea here is compile time dispatching of events to the relevant handler(s) +// rather than having to do runtime lookups in maps or more complex (and slow) +// casting of handler types. +// +// This can be optimized by the compiler quite aggressively and this will form +// most of the hot path of event dispatching from the several event sources we +// may end up supporting. +template +void dispatch(const Tuple& handlers, const Event& event) { + if constexpr (I < std::tuple_size_v) { + if (auto* concrete_event = dynamic_cast::EventType*>(&event)) { + std::get(handlers).Handle(*concrete_event); + } + dispatch(handlers, event); + } +} +} // namespace + +template +class StaticEventDispatcher { + public: + StaticEventDispatcher() {} + + void Dispatch(const IEvent& event) { + dispatch(handlers_, event); + } + + private: + std::tuple handlers_; +}; + +using EventDispatcher = StaticEventDispatcher; + +} // namespace collector::events diff --git a/collector/lib/events/IEvent.h b/collector/lib/events/IEvent.h new file mode 100644 index 0000000000..0c9c7edee8 --- /dev/null +++ b/collector/lib/events/IEvent.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace collector::events { + +enum EventType { + ProcessStart, + NetworkConnection, +}; + +class IEvent { + public: + virtual ~IEvent() = default; + + virtual EventType Type() const = 0; +}; + +using IEventPtr = std::shared_ptr; + +class ProcessStartEvent : public IEvent { + public: + EventType Type() const { + return EventType::ProcessStart; + } +}; + +} // namespace collector::events diff --git a/collector/lib/events/Thing.cpp b/collector/lib/events/Thing.cpp new file mode 100644 index 0000000000..2dbc5e49de --- /dev/null +++ b/collector/lib/events/Thing.cpp @@ -0,0 +1,5 @@ + +#include "Dispatcher.h" +#include "IEvent.h" +#include "handlers/Handler.h" +#include "handlers/ProcessHandler.h" diff --git a/collector/lib/events/handlers/Handler.h b/collector/lib/events/handlers/Handler.h new file mode 100644 index 0000000000..5bf88a4b91 --- /dev/null +++ b/collector/lib/events/handlers/Handler.h @@ -0,0 +1,13 @@ +#pragma once + +namespace collector::events::handler { + +template +class EventHandler { + public: + void Handle(const EventType& event) const { + static_cast(this)->HandleImpl(event); + } +}; + +} // namespace collector::events::handler diff --git a/collector/lib/events/handlers/ProcessHandler.h b/collector/lib/events/handlers/ProcessHandler.h new file mode 100644 index 0000000000..4077f19162 --- /dev/null +++ b/collector/lib/events/handlers/ProcessHandler.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include "events/IEvent.h" +#include "events/handlers/Handler.h" + +namespace collector::events::handler { + +class ProcessHandler : public EventHandler { + public: + using EventType = ProcessStartEvent; + + ProcessHandler() {} + + void HandleImpl(const ProcessStartEvent& event) const { + queue_->push(event); + } + + private: + std::shared_ptr> queue_; +}; + +} // namespace collector::events::handler diff --git a/collector/test/NetworkStatusNotifierTest.cpp b/collector/test/NetworkStatusNotifierTest.cpp index 233ac1e146..5be96ebcc4 100644 --- a/collector/test/NetworkStatusNotifierTest.cpp +++ b/collector/test/NetworkStatusNotifierTest.cpp @@ -195,7 +195,7 @@ class NetworkStatusNotifierTest : public testing::Test { conn_scraper(std::make_unique()), comm(std::make_unique()), net_status_notifier(conn_tracker, config, &inspector, nullptr), - dispatcher_(process_handler_) { + dispatcher_() { } protected: From 25085dd502c4a50b1d87609512008131bab29de8 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Fri, 27 Jun 2025 09:54:42 +0100 Subject: [PATCH 7/8] Main pipeline handling and structure alongside example nodes --- collector/lib/Pipeline.h | 178 ------------------ collector/lib/events/Thing.cpp | 1 + .../lib/events/handlers/ProcessHandler.h | 11 +- collector/lib/events/pipeline/Nodes.h | 155 +++++++++++++++ collector/lib/events/pipeline/Pipeline.h | 76 ++++++++ collector/lib/events/pipeline/Queue.h | 73 +++++++ collector/lib/events/pipeline/nodes/Debug.h | 17 ++ .../pipeline/nodes/ProcessProtoTransformer.h | 8 + collector/lib/system-inspector/Service.h | 2 +- collector/test/PipelineTests.cpp | 122 ++++++------ 10 files changed, 401 insertions(+), 242 deletions(-) create mode 100644 collector/lib/events/pipeline/Nodes.h create mode 100644 collector/lib/events/pipeline/Pipeline.h create mode 100644 collector/lib/events/pipeline/Queue.h create mode 100644 collector/lib/events/pipeline/nodes/Debug.h create mode 100644 collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h diff --git a/collector/lib/Pipeline.h b/collector/lib/Pipeline.h index 0f39db8a40..f318ac0acd 100644 --- a/collector/lib/Pipeline.h +++ b/collector/lib/Pipeline.h @@ -13,184 +13,6 @@ namespace collector { -template -class Queue { - public: - Queue() {} - ~Queue() {} - - bool empty() { - auto lock = read_lock(); - return inner_.empty(); - } - - size_t size() { - auto lock = read_lock(); - return inner_.size(); - } - - void push(const T& elem) { - { - auto lock = write_lock(); - auto e = elem; - inner_.push(std::move(e)); - } - state_changed_.notify_one(); - } - - void push(T&& elem) { - { - auto lock = write_lock(); - inner_.push(elem); - } - state_changed_.notify_one(); - } - - std::optional pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) { - auto lock = write_lock(); - if (inner_.empty()) { - auto pred = [this]() { - return !inner_.empty(); - }; - - if (!state_changed_.wait_for(lock, wait_max, pred)) { - return std::nullopt; - } - } - T data = inner_.front(); - inner_.pop(); - return {data}; - } - - std::shared_lock read_lock() const { - return std::shared_lock(mx_); - } - - std::unique_lock write_lock() const { - return std::unique_lock(mx_); - } - - private: - std::queue inner_; - - mutable std::shared_mutex mx_; - mutable std::condition_variable_any state_changed_; -}; - -template -class Producer { - public: - Producer(std::shared_ptr>& output) : output_(output) {} - - ~Producer() { - if (thread_.running()) { - Stop(); - } - } - - virtual std::optional next() = 0; - - void Start() { - thread_.Start([this] { Run(); }); - } - - void Stop() { - thread_.Stop(); - } - - void Run() { - while (!thread_.should_stop()) { - auto event = next(); - if (!event.has_value()) { - break; - } - output_->push(event.value()); - } - } - - protected: - std::shared_ptr>& output_; - StoppableThread thread_; -}; - -template -class Consumer { - public: - Consumer(std::shared_ptr>& input) : input_(input) {} - - ~Consumer() { - if (thread_.running()) { - Stop(); - } - } - - virtual void handle(const T& event) = 0; - - void Start() { - thread_.Start([this] { Run(); }); - } - - void Stop() { - thread_.Stop(); - } - - void Run() { - while (!thread_.should_stop()) { - auto event = input_->pop(); - if (!event.has_value()) { - continue; - } - handle(event.value()); - } - } - - protected: - std::shared_ptr>& input_; - StoppableThread thread_; -}; - -template -class Transformer { - public: - Transformer(std::shared_ptr>& input, std::shared_ptr>& output) - : input_(input), output_(output) {} - - ~Transformer() { Stop(); } - - virtual std::optional transform(const In& event) = 0; - - void Start() { - thread_.Start([this] { Run(); }); - } - - void Stop() { - thread_.Stop(); - } - - void Run() { - while (!thread_.should_stop()) { - auto event = input_->pop(); - if (!event.has_value()) { - continue; - } - - auto transformed = transform(event.value()); - if (transformed.has_value()) { - output_.push(transformed.value()); - } - } - } - - protected: - std::shared_ptr> input_; - std::shared_ptr> output_; - - StoppableThread thread_; -}; - -template -using Filter = Transformer; - } // namespace collector #endif diff --git a/collector/lib/events/Thing.cpp b/collector/lib/events/Thing.cpp index 2dbc5e49de..1404fdc118 100644 --- a/collector/lib/events/Thing.cpp +++ b/collector/lib/events/Thing.cpp @@ -3,3 +3,4 @@ #include "IEvent.h" #include "handlers/Handler.h" #include "handlers/ProcessHandler.h" +#include "pipeline/nodes/Debug.h" diff --git a/collector/lib/events/handlers/ProcessHandler.h b/collector/lib/events/handlers/ProcessHandler.h index 4077f19162..1660e23954 100644 --- a/collector/lib/events/handlers/ProcessHandler.h +++ b/collector/lib/events/handlers/ProcessHandler.h @@ -2,11 +2,18 @@ #include +// clang-format: off +#include "events/pipeline/nodes/Debug.h" +// clang-format: on + #include "events/IEvent.h" #include "events/handlers/Handler.h" +#include "events/pipeline/Pipeline.h" namespace collector::events::handler { +using ProcessPipeline = collector::pipeline::Pipeline>; + class ProcessHandler : public EventHandler { public: using EventType = ProcessStartEvent; @@ -14,11 +21,11 @@ class ProcessHandler : public EventHandler { ProcessHandler() {} void HandleImpl(const ProcessStartEvent& event) const { - queue_->push(event); + process_pipeline_.Push(event); } private: - std::shared_ptr> queue_; + ProcessPipeline process_pipeline_; }; } // namespace collector::events::handler diff --git a/collector/lib/events/pipeline/Nodes.h b/collector/lib/events/pipeline/Nodes.h new file mode 100644 index 0000000000..2e62cdaea9 --- /dev/null +++ b/collector/lib/events/pipeline/Nodes.h @@ -0,0 +1,155 @@ +#pragma once + +#include +#include + +#include "Queue.h" +#include "StoppableThread.h" + +namespace collector::pipeline { + +template +class Producer { + public: + using InputType = void; + using OutputType = Out; + + Producer(const std::shared_ptr>& output) : output_(output) {} + + ~Producer() { + if (thread_.running()) { + Stop(); + } + } + + virtual std::optional next() = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = next(); + if (!event.has_value()) { + break; + } + output_->push(event.value()); + } + } + + protected: + std::shared_ptr>& output_; + StoppableThread thread_; +}; + +template +class Consumer { + public: + using InputType = In; + using OutputType = void; + + Consumer(const std::shared_ptr>& input) : input_(input) {} + + ~Consumer() { + if (thread_.running()) { + Stop(); + } + } + + virtual void consume(const In& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + consume(event.value()); + } + } + + protected: + std::shared_ptr>& input_; + StoppableThread thread_; +}; + +template +class Transformer { + public: + using InputType = In; + using OutputType = Out; + + Transformer(const std::shared_ptr>& input, const std::shared_ptr>& output) + : input_(input), output_(output) {} + + ~Transformer() { Stop(); } + + virtual std::optional transform(const In& event) = 0; + + void Start() { + thread_.Start([this] { Run(); }); + } + + void Stop() { + thread_.Stop(); + } + + void Run() { + while (!thread_.should_stop()) { + auto event = input_->pop(); + if (!event.has_value()) { + continue; + } + + auto transformed = transform(event.value()); + if (transformed.has_value()) { + output_.push(transformed.value()); + } + } + } + + protected: + std::shared_ptr> input_; + std::shared_ptr> output_; + + StoppableThread thread_; +}; + +template +using Filter = Transformer; + +template +class Splitter : public Transformer { + public: + using InputType = InOut; + using OutputType = InOut; + + Splitter(const std::shared_ptr> input, + std::vector>> outputs) : Transformer(input, nullptr), outputs_(outputs) { + } + + std::optional transform(const InOut& event) override { + for (const auto& queue : outputs_) { + queue->push(event); + } + return std::nullopt; + } + + private: + std::vector>> outputs_; +}; + +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/Pipeline.h b/collector/lib/events/pipeline/Pipeline.h new file mode 100644 index 0000000000..e1430be711 --- /dev/null +++ b/collector/lib/events/pipeline/Pipeline.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include + +#include "events/pipeline/Queue.h" + +namespace collector::pipeline { + +template +class Pipeline { + using QueueTuple = std::tuple>...>; + struct Graph { + std::tuple...> nodes; + QueueTuple queues; + }; + + static Graph build_graph() { + auto queues = std::make_tuple(std::make_shared>()...); + auto nodes = build_nodes(queues, std::index_sequence_for{}); + return { + std::move(nodes), + std::move(queues), + }; + } + + template + static auto build_nodes(const std::tuple>...>& queues, + std::index_sequence) { + // This is a fold expression over the comma operator. + // It expands to: std::make_tuple(Node_0(...), Node_1(...), Node_2(...), ...) + return std::make_tuple( + std::make_unique( + std::get(queues), + get_output_queue(queues))...); + } + + // Helper to safely get the output queue for a given node index. + template + static auto get_output_queue( + const std::tuple>...>& queues) { + if constexpr (I + 1 < sizeof...(Nodes)) { + return std::get(queues); + } else { + // This is the last node in the pipeline (a Consumer). + // Its output type is `void`, so we pass a nullptr. + return nullptr; + } + } + + public: + using In = typename std::tuple_element_t<0, std::tuple>::InputType; + + Pipeline() : graph_(build_graph()) { + } + + void Start() { + start_nodes<0>(); + } + + void Push(const In& input) const { + std::get<0>(graph_.queues)->push(input); + } + + private: + Graph graph_; + + template + void start_nodes() { + if constexpr (I < sizeof...(Nodes)) { + std::get(graph_.nodes)->Start(); + start_nodes(); + } + } +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/Queue.h b/collector/lib/events/pipeline/Queue.h new file mode 100644 index 0000000000..7f49ea509b --- /dev/null +++ b/collector/lib/events/pipeline/Queue.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace collector::pipeline { +template +class Queue { + public: + Queue() {} + ~Queue() {} + + bool empty() { + auto lock = read_lock(); + return inner_.empty(); + } + + size_t size() { + auto lock = read_lock(); + return inner_.size(); + } + + void push(const T& elem) { + { + auto lock = write_lock(); + auto e = elem; + inner_.push(std::move(e)); + } + state_changed_.notify_one(); + } + + void push(T&& elem) { + { + auto lock = write_lock(); + inner_.push(elem); + } + state_changed_.notify_one(); + } + + std::optional pop(std::chrono::milliseconds wait_max = std::chrono::milliseconds(10)) { + auto lock = write_lock(); + if (inner_.empty()) { + auto pred = [this]() { + return !inner_.empty(); + }; + + if (!state_changed_.wait_for(lock, wait_max, pred)) { + return std::nullopt; + } + } + T data = inner_.front(); + inner_.pop(); + return {data}; + } + + std::shared_lock read_lock() const { + return std::shared_lock(mx_); + } + + std::unique_lock write_lock() const { + return std::unique_lock(mx_); + } + + private: + std::queue inner_; + + mutable std::shared_mutex mx_; + mutable std::condition_variable_any state_changed_; +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/Debug.h b/collector/lib/events/pipeline/nodes/Debug.h new file mode 100644 index 0000000000..8f0ff32f04 --- /dev/null +++ b/collector/lib/events/pipeline/nodes/Debug.h @@ -0,0 +1,17 @@ +#pragma once + +#include "events/pipeline/Nodes.h" + +namespace collector::pipeline { + +template +class DebugNode : public Transformer { + public: + DebugNode(const std::shared_ptr>& input, const std::shared_ptr>& output) + : Transformer(input, output) {} + + std::optional transform(const In& input) { + return std::nullopt; + } +}; +} // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h new file mode 100644 index 0000000000..026b196e0f --- /dev/null +++ b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h @@ -0,0 +1,8 @@ +#pragma once + +namespace collector::pipeline { + +class ProcessProtoTransformer { +}; + +} // namespace collector::pipeline diff --git a/collector/lib/system-inspector/Service.h b/collector/lib/system-inspector/Service.h index 96df977009..687715c381 100644 --- a/collector/lib/system-inspector/Service.h +++ b/collector/lib/system-inspector/Service.h @@ -87,7 +87,7 @@ class Service : public SystemInspector { // [ ( pid, callback ), ( pid, callback ), ... ] std::list> pending_process_requests_; - collector::events::EventDispatcher dispatcher_; + collector::events::EventDispatcher& dispatcher_; }; } // namespace collector::system_inspector diff --git a/collector/test/PipelineTests.cpp b/collector/test/PipelineTests.cpp index a807ecc64a..878a8a625c 100644 --- a/collector/test/PipelineTests.cpp +++ b/collector/test/PipelineTests.cpp @@ -9,64 +9,64 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -namespace collector { - -class IntProducer : public Producer { - public: - IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} - - std::optional next() override { - n_++; - if (n_ > limit_) { - return std::nullopt; - } - return {n_}; - } - - private: - int n_ = 0; - int limit_; -}; - -class IntConsumer : public Consumer { - public: - IntConsumer(std::shared_ptr>& input, std::vector& output) : Consumer(input), events_(output) {} - - void handle(const int& event) override { - events_.push_back(event); - } - - private: - std::vector& events_; -}; - -class EvenIntFilter : public Filter { - public: - std::optional transform(const int& event) { - if (event % 2 == 0) { - return {event}; - } - return std::nullopt; - } -}; - -TEST(PipelineTests, TestBasic) { - std::shared_ptr> queue = std::make_shared>(); - - std::vector output; - - std::unique_ptr> producer = std::make_unique(queue, 10); - std::unique_ptr> consumer = std::make_unique(queue, output); - - producer->Start(); - consumer->Start(); - - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - - consumer->Stop(); - producer->Stop(); - - EXPECT_TRUE(output.size() == 10); -} - -} // namespace collector +// namespace collector { +// +// class IntProducer : public Producer { +// public: +// IntProducer(std::shared_ptr>& input, int limit) : Producer(input), limit_(limit) {} +// +// std::optional next() override { +// n_++; +// if (n_ > limit_) { +// return std::nullopt; +// } +// return {n_}; +// } +// +// private: +// int n_ = 0; +// int limit_; +// }; +// +// class IntConsumer : public Consumer { +// public: +// IntConsumer(std::shared_ptr>& input, std::vector& output) : Consumer(input), events_(output) {} +// +// void handle(const int& event) override { +// events_.push_back(event); +// } +// +// private: +// std::vector& events_; +// }; +// +// class EvenIntFilter : public Filter { +// public: +// std::optional transform(const int& event) { +// if (event % 2 == 0) { +// return {event}; +// } +// return std::nullopt; +// } +// }; +// +// TEST(PipelineTests, TestBasic) { +// std::shared_ptr> queue = std::make_shared>(); +// +// std::vector output; +// +// std::unique_ptr> producer = std::make_unique(queue, 10); +// std::unique_ptr> consumer = std::make_unique(queue, output); +// +// producer->Start(); +// consumer->Start(); +// +// std::this_thread::sleep_for(std::chrono::milliseconds(200)); +// +// consumer->Stop(); +// producer->Stop(); +// +// EXPECT_TRUE(output.size() == 10); +// } +// +// } // namespace collector From 5e68dd045c8d690f0de8ecc47b954011de0c5c19 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Wed, 2 Jul 2025 13:14:42 +0100 Subject: [PATCH 8/8] Big ol' changes --- collector/lib/CollectorService.cpp | 3 +- collector/lib/CollectorService.h | 6 +-- collector/lib/Process.cpp | 4 ++ collector/lib/Process.h | 7 +++ collector/lib/events/Dispatcher.h | 2 +- collector/lib/events/IEvent.h | 17 +++++++ collector/lib/events/Thing.cpp | 6 --- collector/lib/events/handlers/Handler.h | 20 ++++++++ .../lib/events/handlers/ProcessHandler.h | 22 ++++---- collector/lib/events/pipeline/Nodes.h | 18 ++++--- collector/lib/events/pipeline/Pipeline.h | 24 ++++++--- collector/lib/events/pipeline/nodes/Debug.h | 3 +- .../pipeline/nodes/ProcessProtoTransformer.h | 50 ++++++++++++++++++- collector/lib/system-inspector/Service.cpp | 22 +++++++- collector/lib/system-inspector/Service.h | 3 ++ collector/test/ConnTrackerTest.cpp | 1 + 16 files changed, 170 insertions(+), 38 deletions(-) delete mode 100644 collector/lib/events/Thing.cpp diff --git a/collector/lib/CollectorService.cpp b/collector/lib/CollectorService.cpp index cc324c2766..a5a1e0f842 100644 --- a/collector/lib/CollectorService.cpp +++ b/collector/lib/CollectorService.cpp @@ -33,8 +33,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic net_status_notifier_; std::shared_ptr process_store_; std::shared_ptr network_connection_info_service_comm_; - - collector::events::EventDispatcher event_dispatcher_; - collector::events::handler::ProcessHandler process_handler_; }; } // namespace collector diff --git a/collector/lib/Process.cpp b/collector/lib/Process.cpp index 632d824a03..2b4e4ee1cb 100644 --- a/collector/lib/Process.cpp +++ b/collector/lib/Process.cpp @@ -28,6 +28,10 @@ const std::shared_ptr ProcessStore::Fetch(uint64_t pid) { return cached_process; } +std::string Process::name() const { + return exe(); +} + std::string Process::container_id() const { WaitForProcessInfo(); diff --git a/collector/lib/Process.h b/collector/lib/Process.h index 8fb346984d..4b17ac8655 100644 --- a/collector/lib/Process.h +++ b/collector/lib/Process.h @@ -8,6 +8,10 @@ #include #include +#include "libsinsp/sinsp.h" + +#include "system-inspector/EventExtractor.h" + // forward declarations class sinsp_threadinfo; namespace collector { @@ -44,6 +48,7 @@ class ProcessStore { class IProcess { public: virtual uint64_t pid() const = 0; + virtual std::string name() const = 0; virtual std::string container_id() const = 0; virtual std::string comm() const = 0; virtual std::string exe() const = 0; @@ -61,6 +66,7 @@ class IProcess { class Process : public IProcess { public: inline uint64_t pid() const override { return pid_; } + std::string name() const override; std::string container_id() const override; std::string comm() const override; std::string exe() const override; @@ -70,6 +76,7 @@ class Process : public IProcess { /* - when 'cache' is provided, this process will remove itself from it upon deletion. * - 'instance' is used to request the process information from the system. */ Process(uint64_t pid, ProcessStore::MapRef cache = 0, system_inspector::Service* instance = 0); + ~Process(); private: diff --git a/collector/lib/events/Dispatcher.h b/collector/lib/events/Dispatcher.h index d757a22add..b966990822 100644 --- a/collector/lib/events/Dispatcher.h +++ b/collector/lib/events/Dispatcher.h @@ -15,7 +15,7 @@ namespace { // // This can be optimized by the compiler quite aggressively and this will form // most of the hot path of event dispatching from the several event sources we -// may end up supporting. +// can support template void dispatch(const Tuple& handlers, const Event& event) { if constexpr (I < std::tuple_size_v) { diff --git a/collector/lib/events/IEvent.h b/collector/lib/events/IEvent.h index 0c9c7edee8..ddf3a2c676 100644 --- a/collector/lib/events/IEvent.h +++ b/collector/lib/events/IEvent.h @@ -1,7 +1,10 @@ #pragma once +#include #include +#include "Process.h" + namespace collector::events { enum EventType { @@ -20,9 +23,23 @@ using IEventPtr = std::shared_ptr; class ProcessStartEvent : public IEvent { public: + ProcessStartEvent(const std::shared_ptr& process) : process_(process) {} + EventType Type() const { return EventType::ProcessStart; } + + friend std::ostream& operator<<(std::ostream& stream, const ProcessStartEvent& event) { + stream << *event.Process(); + return stream; + } + + const std::shared_ptr& Process() const { + return process_; + } + + private: + std::shared_ptr process_; }; } // namespace collector::events diff --git a/collector/lib/events/Thing.cpp b/collector/lib/events/Thing.cpp deleted file mode 100644 index 1404fdc118..0000000000 --- a/collector/lib/events/Thing.cpp +++ /dev/null @@ -1,6 +0,0 @@ - -#include "Dispatcher.h" -#include "IEvent.h" -#include "handlers/Handler.h" -#include "handlers/ProcessHandler.h" -#include "pipeline/nodes/Debug.h" diff --git a/collector/lib/events/handlers/Handler.h b/collector/lib/events/handlers/Handler.h index 5bf88a4b91..cf95419298 100644 --- a/collector/lib/events/handlers/Handler.h +++ b/collector/lib/events/handlers/Handler.h @@ -2,6 +2,26 @@ namespace collector::events::handler { +// An EventHandler is the owner of a processing pipeline +// for a given event type. +// +// This base class is intended to formalize the API between +// event dispatchers and event handlers, using CRTP. +// +// Note: C++20 would allow us to use concepts for this API +// definition, and this should be changed if/when we move to +// the newer standard. +// +// example derived handler: +// +// ```cpp +// class MyEventHandler : public EventHandler { +// public: +// void HandleImpl(const MyEventType& event) const { +// std::cout << event << std::endl; +// } +// } +// ``` template class EventHandler { public: diff --git a/collector/lib/events/handlers/ProcessHandler.h b/collector/lib/events/handlers/ProcessHandler.h index 1660e23954..d128f3d3fb 100644 --- a/collector/lib/events/handlers/ProcessHandler.h +++ b/collector/lib/events/handlers/ProcessHandler.h @@ -1,24 +1,28 @@ #pragma once -#include - -// clang-format: off -#include "events/pipeline/nodes/Debug.h" -// clang-format: on - #include "events/IEvent.h" #include "events/handlers/Handler.h" #include "events/pipeline/Pipeline.h" +#include "events/pipeline/nodes/ProcessProtoTransformer.h" +#include "events/pipeline/nodes/SignalServiceConsumer.h" namespace collector::events::handler { -using ProcessPipeline = collector::pipeline::Pipeline>; - class ProcessHandler : public EventHandler { public: using EventType = ProcessStartEvent; - ProcessHandler() {} + using ProcessPipeline = collector::pipeline::Pipeline< + pipeline::ProcessProtoTransformer, + pipeline::SignalServiceConsumer>; + + ProcessHandler() { + process_pipeline_.Start(); + } + + ~ProcessHandler() { + process_pipeline_.Stop(); + } void HandleImpl(const ProcessStartEvent& event) const { process_pipeline_.Push(event); diff --git a/collector/lib/events/pipeline/Nodes.h b/collector/lib/events/pipeline/Nodes.h index 2e62cdaea9..ca5a05fdf9 100644 --- a/collector/lib/events/pipeline/Nodes.h +++ b/collector/lib/events/pipeline/Nodes.h @@ -14,7 +14,7 @@ class Producer { using InputType = void; using OutputType = Out; - Producer(const std::shared_ptr>& output) : output_(output) {} + Producer(const std::shared_ptr _ptr /* ignored */, const std::shared_ptr>& output) : output_(output) {} ~Producer() { if (thread_.running()) { @@ -53,7 +53,7 @@ class Consumer { using InputType = In; using OutputType = void; - Consumer(const std::shared_ptr>& input) : input_(input) {} + Consumer(const std::shared_ptr>& input, const std::shared_ptr _ptr /*ignored*/) : input_(input) {} ~Consumer() { if (thread_.running()) { @@ -73,6 +73,10 @@ class Consumer { void Run() { while (!thread_.should_stop()) { + if (!input_) { + CLOG(DEBUG) << "No input queue for Consumer"; + break; + } auto event = input_->pop(); if (!event.has_value()) { continue; @@ -82,7 +86,7 @@ class Consumer { } protected: - std::shared_ptr>& input_; + const std::shared_ptr>& input_; StoppableThread thread_; }; @@ -116,14 +120,16 @@ class Transformer { auto transformed = transform(event.value()); if (transformed.has_value()) { - output_.push(transformed.value()); + if (output_) { + output_->push(transformed.value()); + } } } } protected: - std::shared_ptr> input_; - std::shared_ptr> output_; + const std::shared_ptr> input_; + const std::shared_ptr> output_; StoppableThread thread_; }; diff --git a/collector/lib/events/pipeline/Pipeline.h b/collector/lib/events/pipeline/Pipeline.h index e1430be711..c335a6fbc9 100644 --- a/collector/lib/events/pipeline/Pipeline.h +++ b/collector/lib/events/pipeline/Pipeline.h @@ -9,7 +9,7 @@ namespace collector::pipeline { template class Pipeline { - using QueueTuple = std::tuple>...>; + using QueueTuple = std::tuple>...>; struct Graph { std::tuple...> nodes; QueueTuple queues; @@ -24,15 +24,15 @@ class Pipeline { }; } - template - static auto build_nodes(const std::tuple>...>& queues, - std::index_sequence) { + template + static auto build_nodes(const std::tuple>...>& queues, + std::index_sequence) { // This is a fold expression over the comma operator. // It expands to: std::make_tuple(Node_0(...), Node_1(...), Node_2(...), ...) return std::make_tuple( std::make_unique( - std::get(queues), - get_output_queue(queues))...); + std::get(queues), + get_output_queue(queues))...); } // Helper to safely get the output queue for a given node index. @@ -58,6 +58,10 @@ class Pipeline { start_nodes<0>(); } + void Stop() { + stop_nodes<0>(); + } + void Push(const In& input) const { std::get<0>(graph_.queues)->push(input); } @@ -72,5 +76,13 @@ class Pipeline { start_nodes(); } } + + template + void stop_nodes() { + if constexpr (I < sizeof...(Nodes)) { + std::get(graph_.nodes)->Stop(); + start_nodes(); + } + } }; } // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/Debug.h b/collector/lib/events/pipeline/nodes/Debug.h index 8f0ff32f04..d41f087bf8 100644 --- a/collector/lib/events/pipeline/nodes/Debug.h +++ b/collector/lib/events/pipeline/nodes/Debug.h @@ -11,7 +11,8 @@ class DebugNode : public Transformer { : Transformer(input, output) {} std::optional transform(const In& input) { - return std::nullopt; + CLOG(INFO) << input; + return {input}; } }; } // namespace collector::pipeline diff --git a/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h index 026b196e0f..10d1f87393 100644 --- a/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h +++ b/collector/lib/events/pipeline/nodes/ProcessProtoTransformer.h @@ -1,8 +1,56 @@ #pragma once +#include + +#include "internalapi/sensor/signal_iservice.pb.h" +#include "storage/process_indicator.pb.h" + +#include "ProtoAllocator.h" +#include "Utility.h" +#include "events/IEvent.h" +#include "events/pipeline/Nodes.h" + namespace collector::pipeline { -class ProcessProtoTransformer { +class ProcessProtoTransformer : public Transformer> { + public: + ProcessProtoTransformer( + const std::shared_ptr>& input, + const std::shared_ptr>>& output) + : Transformer>(input, output) {} + + std::optional> transform(const events::ProcessStartEvent& event) { + storage::ProcessSignal* proc_signal = allocator_.Allocate(); + + if (!proc_signal) { + return std::nullopt; + } + + proc_signal->set_id(UUIDStr()); + + auto process = event.Process(); + + proc_signal->set_name(process->name()); + proc_signal->set_exec_file_path(process->exe_path()); + proc_signal->set_container_id(process->container_id()); + proc_signal->set_pid(process->pid()); + proc_signal->set_args(process->args()); + + sensor::SignalStreamMessage* msg = allocator_.AllocateRoot(); + v1::Signal* signal = allocator_.Allocate(); + + signal->set_allocated_process_signal(proc_signal); + + msg->clear_collector_register_request(); + msg->set_allocated_signal(signal); + + std::shared_ptr ptr; + ptr.reset(msg); + return {ptr}; + } + + private: + ProtoAllocator allocator_; }; } // namespace collector::pipeline diff --git a/collector/lib/system-inspector/Service.cpp b/collector/lib/system-inspector/Service.cpp index 67d0099b6e..87119fe32f 100644 --- a/collector/lib/system-inspector/Service.cpp +++ b/collector/lib/system-inspector/Service.cpp @@ -1,6 +1,7 @@ #include "Service.h" #include +#include #include #include @@ -19,17 +20,21 @@ #include "ContainerMetadata.h" #include "EventExtractor.h" #include "EventNames.h" +#include "FalcoProcess.h" #include "HostInfo.h" #include "KernelDriver.h" #include "Logging.h" #include "NetworkSignalHandler.h" +#include "Process.h" #include "ProcessSignalHandler.h" #include "SelfCheckHandler.h" #include "SelfChecks.h" #include "TimeUtil.h" #include "Utility.h" #include "events/Dispatcher.h" +#include "events/IEvent.h" #include "logger.h" +#include "ppm_events_public.h" namespace collector::system_inspector { @@ -243,8 +248,21 @@ void LogUnreasonableEventTime(int64_t time_micros, sinsp_evt* evt) { } } -events::IEventPtr to_ievt(sinsp_evt* evt) { - return std::make_shared(); +events::IEventPtr Service::to_ievt(sinsp_evt* evt) { + std::bitset event_filter; + const EventNames& event_names = EventNames::GetInstance(); + for (ppm_event_code event_id : event_names.GetEventIDs("execve<")) { + event_filter.set(event_id); + } + + if (event_filter[evt->get_type()]) { + EventExtractor extractor; + extractor.Init(inspector_.get()); + auto proc = std::make_shared(evt, extractor); + events::IEventPtr ievt = std::make_shared(proc); + return ievt; + } + return std::nullptr_t(); } void Service::Run(const std::atomic& control) { diff --git a/collector/lib/system-inspector/Service.h b/collector/lib/system-inspector/Service.h index 687715c381..1ac0ebfe3a 100644 --- a/collector/lib/system-inspector/Service.h +++ b/collector/lib/system-inspector/Service.h @@ -14,6 +14,7 @@ #include "SignalServiceClient.h" #include "SystemInspector.h" #include "events/Dispatcher.h" +#include "events/IEvent.h" // forward declarations class sinsp; @@ -51,6 +52,8 @@ class Service : public SystemInspector { void AddSignalHandler(std::unique_ptr signal_handler); + events::IEventPtr to_ievt(sinsp_evt* evt); + private: FRIEND_TEST(SystemInspectorServiceTest, FilterEvent); diff --git a/collector/test/ConnTrackerTest.cpp b/collector/test/ConnTrackerTest.cpp index a8457247cc..dbd028b3b9 100644 --- a/collector/test/ConnTrackerTest.cpp +++ b/collector/test/ConnTrackerTest.cpp @@ -1359,6 +1359,7 @@ class FakeProcess : public IProcess { args_(args) {} uint64_t pid() const override { return pid_; } + std::string name() const override { return ""; } std::string container_id() const override { return container_id_; } std::string comm() const override { return comm_; } std::string exe() const override { return exe_; }