Skip to content

Experimentation with execution pipelines #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions collector/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
file(GLOB COLLECTOR_LIB_SRC_FILES
${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/events/*.cpp
)

add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES})
Expand Down
2 changes: 1 addition & 1 deletion collector/lib/CollectorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static const std::string PROMETHEUS_PORT = "9090";
CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlValue>* control,
const std::atomic<int>* signum)
: config_(config),
system_inspector_(config_),
system_inspector_(config_, event_dispatcher_),
control_(control),
signum_(*signum),
server_(OPTIONS),
Expand Down
3 changes: 3 additions & 0 deletions collector/lib/CollectorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "Control.h"
#include "NetworkStatusInspector.h"
#include "NetworkStatusNotifier.h"
#include "events/Dispatcher.h"
#include "system-inspector/Service.h"

namespace collector {
Expand All @@ -31,6 +32,8 @@ class CollectorService {
private:
bool WaitForGRPCServer();

collector::events::EventDispatcher event_dispatcher_;

CollectorConfig& config_;
system_inspector::Service system_inspector_;

Expand Down
2 changes: 2 additions & 0 deletions collector/lib/Pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

#include "Pipeline.h"
18 changes: 18 additions & 0 deletions collector/lib/Pipeline.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef _COLLECTOR_PIPELINE_H
#define _COLLECTOR_PIPELINE_H

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <shared_mutex>

#include "StoppableThread.h"

namespace collector {

} // namespace collector

#endif
4 changes: 4 additions & 0 deletions collector/lib/Process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const std::shared_ptr<IProcess> ProcessStore::Fetch(uint64_t pid) {
return cached_process;
}

std::string Process::name() const {
return exe();
}

std::string Process::container_id() const {
WaitForProcessInfo();

Expand Down
7 changes: 7 additions & 0 deletions collector/lib/Process.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <string>
#include <unordered_map>

#include "libsinsp/sinsp.h"

#include "system-inspector/EventExtractor.h"

// forward declarations
class sinsp_threadinfo;
namespace collector {
Expand Down Expand Up @@ -44,6 +48,7 @@ class ProcessStore {
class IProcess {
public:
virtual uint64_t pid() const = 0;
virtual std::string name() const = 0;
virtual std::string container_id() const = 0;
virtual std::string comm() const = 0;
virtual std::string exe() const = 0;
Expand All @@ -61,6 +66,7 @@ class IProcess {
class Process : public IProcess {
public:
inline uint64_t pid() const override { return pid_; }
std::string name() const override;
std::string container_id() const override;
std::string comm() const override;
std::string exe() const override;
Expand All @@ -70,6 +76,7 @@ class Process : public IProcess {
/* - when 'cache' is provided, this process will remove itself from it upon deletion.
* - 'instance' is used to request the process information from the system. */
Process(uint64_t pid, ProcessStore::MapRef cache = 0, system_inspector::Service* instance = 0);

~Process();

private:
Expand Down
6 changes: 6 additions & 0 deletions collector/lib/StoppableThread.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "StoppableThread.h"

#include <chrono>
#include <iostream>
#include <thread>
#include <unistd.h>

#include "Utility.h"
Expand Down Expand Up @@ -44,6 +46,10 @@ void StoppableThread::Stop() {
}
break;
}
if (!thread_->joinable()) {
CLOG(WARNING) << "thread not yet joinable...";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
thread_->join();
thread_.reset();
int rv = close(stop_pipe_[0]);
Expand Down
45 changes: 45 additions & 0 deletions collector/lib/events/Dispatcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <memory>
#include <tuple>

#include "IEvent.h"
#include "events/handlers/ProcessHandler.h"

namespace collector::events {
namespace {

// The idea here is compile time dispatching of events to the relevant handler(s)
// rather than having to do runtime lookups in maps or more complex (and slow)
// casting of handler types.
//
// This can be optimized by the compiler quite aggressively and this will form
// most of the hot path of event dispatching from the several event sources we
// can support
template <typename Tuple, typename Event, std::size_t I = 0>
void dispatch(const Tuple& handlers, const Event& event) {
if constexpr (I < std::tuple_size_v<Tuple>) {
if (auto* concrete_event = dynamic_cast<const typename std::tuple_element_t<I, Tuple>::EventType*>(&event)) {
std::get<I>(handlers).Handle(*concrete_event);
}
dispatch<Tuple, Event, I + 1>(handlers, event);
}
}
} // namespace

template <typename... Handlers>
class StaticEventDispatcher {
public:
StaticEventDispatcher() {}

void Dispatch(const IEvent& event) {
dispatch(handlers_, event);
}

private:
std::tuple<Handlers...> handlers_;
};

using EventDispatcher = StaticEventDispatcher<handler::ProcessHandler>;

} // namespace collector::events
45 changes: 45 additions & 0 deletions collector/lib/events/IEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <iostream>
#include <memory>

#include "Process.h"

namespace collector::events {

enum EventType {
ProcessStart,
NetworkConnection,
};

class IEvent {
public:
virtual ~IEvent() = default;

virtual EventType Type() const = 0;
};

using IEventPtr = std::shared_ptr<const IEvent>;

class ProcessStartEvent : public IEvent {
public:
ProcessStartEvent(const std::shared_ptr<IProcess>& process) : process_(process) {}

EventType Type() const {
return EventType::ProcessStart;
}

friend std::ostream& operator<<(std::ostream& stream, const ProcessStartEvent& event) {
stream << *event.Process();
return stream;
}

const std::shared_ptr<IProcess>& Process() const {
return process_;
}

private:
std::shared_ptr<IProcess> process_;
};

} // namespace collector::events
33 changes: 33 additions & 0 deletions collector/lib/events/handlers/Handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

namespace collector::events::handler {

// An EventHandler is the owner of a processing pipeline
// for a given event type.
//
// This base class is intended to formalize the API between
// event dispatchers and event handlers, using CRTP.
//
// Note: C++20 would allow us to use concepts for this API
// definition, and this should be changed if/when we move to
// the newer standard.
//
// example derived handler:
//
// ```cpp
// class MyEventHandler : public EventHandler<MyEventHandler, MyEventType> {
// public:
// void HandleImpl(const MyEventType& event) const {
// std::cout << event << std::endl;
// }
// }
// ```
template <typename Derived, typename EventType>
class EventHandler {
public:
void Handle(const EventType& event) const {
static_cast<const Derived*>(this)->HandleImpl(event);
}
};

} // namespace collector::events::handler
35 changes: 35 additions & 0 deletions collector/lib/events/handlers/ProcessHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include "events/IEvent.h"
#include "events/handlers/Handler.h"
#include "events/pipeline/Pipeline.h"
#include "events/pipeline/nodes/ProcessProtoTransformer.h"
#include "events/pipeline/nodes/SignalServiceConsumer.h"

namespace collector::events::handler {

class ProcessHandler : public EventHandler<ProcessHandler, ProcessStartEvent> {
public:
using EventType = ProcessStartEvent;

using ProcessPipeline = collector::pipeline::Pipeline<
pipeline::ProcessProtoTransformer,
pipeline::SignalServiceConsumer>;

ProcessHandler() {
process_pipeline_.Start();
}

~ProcessHandler() {
process_pipeline_.Stop();
}

void HandleImpl(const ProcessStartEvent& event) const {
process_pipeline_.Push(event);
}

private:
ProcessPipeline process_pipeline_;
};

} // namespace collector::events::handler
Loading
Loading