Skip to content

Commit 97bfc3c

Browse files
committed
Refactor NetworkStatusNotifier
This change started out from the effort to unify the services used to communicate with sensor. While attempting to add an output object to the notifier, it became apparent this class has some issues with ownership of its members, which in turn makes it a bit odd to extend and modify. In order to alleviate the problem the following changes are introduced: - The connection scraper is now owned by NetworkStatusNotifier. - The communication channel for network info is now owned by NetworkStatusNotifier. - Connection stats are now owned by NetworkStatusNotifier. - Move ownership of the prometheus registry into CollectorStatsExporter. - Refactor CollectorConnectionStats into its own file. When stating "X is now owned by Y" it stands for X now is part of Y and its lifecycle is fully handled by Y (i.e X is created with Y and destroyed with Y, preferably following RAII).
1 parent 14555ea commit 97bfc3c

12 files changed

+172
-158
lines changed

collector/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ add_subdirectory(lib)
6868
add_executable(collector collector.cpp)
6969
target_link_libraries(collector collector_lib)
7070

71-
target_link_libraries(collector prometheus-cpp::core prometheus-cpp::pull)
72-
7371
add_executable(connscrape connscrape.cpp)
7472
target_link_libraries(connscrape collector_lib)
7573

collector/lib/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ target_link_libraries(collector_lib uuid)
1414
target_link_libraries(collector_lib gRPC::grpc++)
1515
target_link_libraries(collector_lib civetweb::civetweb-cpp)
1616
target_link_libraries(collector_lib yaml-cpp::yaml-cpp)
17+
target_link_libraries(collector_lib prometheus-cpp::core prometheus-cpp::pull)
1718

1819
target_link_libraries(collector_lib rox-proto)
1920

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#ifndef _COLLECTOR_CONNECTION_STATS_
2+
#define _COLLECTOR_CONNECTION_STATS_
3+
4+
#include <algorithm>
5+
6+
#include "prometheus/gauge.h"
7+
#include "prometheus/registry.h"
8+
#include "prometheus/summary.h"
9+
10+
namespace collector {
11+
template <typename T>
12+
class CollectorConnectionStats {
13+
public:
14+
CollectorConnectionStats(
15+
prometheus::Registry* registry,
16+
const std::string& name,
17+
const std::string& help,
18+
std::chrono::milliseconds max_age,
19+
const std::vector<double>& quantiles,
20+
double error) {
21+
auto& family = prometheus::BuildSummary()
22+
.Name(name)
23+
.Help(help)
24+
.Register(*registry);
25+
auto q = MakeQuantiles(quantiles, error);
26+
inbound_private_summary_ = &family.Add({{"dir", "in"}, {"peer", "private"}}, q, max_age);
27+
inbound_public_summary_ = &family.Add({{"dir", "in"}, {"peer", "public"}}, q, max_age);
28+
outbound_private_summary_ = &family.Add({{"dir", "out"}, {"peer", "private"}}, q, max_age);
29+
outbound_public_summary_ = &family.Add({{"dir", "out"}, {"peer", "public"}}, q, max_age);
30+
}
31+
32+
void Observe(T inbound_private, T inbound_public, T outbound_private, T outbound_public) {
33+
inbound_private_summary_->Observe(inbound_private);
34+
inbound_public_summary_->Observe(inbound_public);
35+
outbound_private_summary_->Observe(outbound_private);
36+
outbound_public_summary_->Observe(outbound_public);
37+
}
38+
39+
private:
40+
prometheus::Summary* inbound_private_summary_;
41+
prometheus::Summary* inbound_public_summary_;
42+
prometheus::Summary* outbound_private_summary_;
43+
prometheus::Summary* outbound_public_summary_;
44+
45+
prometheus::Summary::Quantiles MakeQuantiles(std::vector<double> quantiles, double error) {
46+
prometheus::Summary::Quantiles result;
47+
48+
result.reserve(quantiles.size());
49+
50+
auto make_quantile = [error](double q) -> prometheus::detail::CKMSQuantiles::Quantile {
51+
return {q, error};
52+
};
53+
54+
std::transform(quantiles.begin(), quantiles.end(), std::back_inserter(result), make_quantile);
55+
56+
return result;
57+
}
58+
};
59+
} // namespace collector
60+
61+
#endif

collector/lib/CollectorService.cpp

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
3232
control_(control),
3333
signum_(*signum),
3434
server_(OPTIONS),
35-
registry_(std::make_shared<prometheus::Registry>()),
3635
exposer_(PROMETHEUS_PORT),
37-
exporter_(registry_, &config_, &system_inspector_),
36+
exporter_(&config_, &system_inspector_),
3837
config_loader_(config_) {
3938
CLOG(INFO) << "Config: " << config_;
4039

@@ -43,28 +42,17 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
4342
// In case if no GRPC is used, continue to setup networking infrasturcture
4443
// with empty grpc_channel. NetworkConnectionInfoServiceComm will pick it
4544
// up and use stdout instead.
46-
if (config_.IsProcessesListeningOnPortsEnabled()) {
47-
process_store_ = std::make_shared<ProcessStore>(&system_inspector_);
48-
}
49-
conn_scraper_ = std::make_shared<ConnScraper>(config_.HostProc(), process_store_);
5045
conn_tracker_ = std::make_shared<ConnectionTracker>();
5146
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs(config_.IgnoredL4ProtoPortPairs());
5247
conn_tracker_->UpdateIgnoredL4ProtoPortPairs(std::move(ignored_l4proto_port_pairs));
5348
conn_tracker_->UpdateIgnoredNetworks(config_.IgnoredNetworks());
5449
conn_tracker_->UpdateNonAggregatedNetworks(config_.NonAggregatedNetworks());
5550

56-
network_connection_info_service_comm_ = std::make_shared<NetworkConnectionInfoServiceComm>(config_.grpc_channel);
57-
58-
auto total_reporter = config_.EnableConnectionStats() ? exporter_.GetConnectionsTotalReporter() : nullptr;
59-
auto rate_reporter = config_.EnableConnectionStats() ? exporter_.GetConnectionsRateReporter() : nullptr;
60-
6151
net_status_notifier_ = std::make_unique<NetworkStatusNotifier>(
62-
conn_scraper_,
6352
conn_tracker_,
64-
network_connection_info_service_comm_,
6553
config_,
66-
total_reporter,
67-
rate_reporter);
54+
&system_inspector_,
55+
exporter_.GetRegistry().get());
6856

6957
auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());
7058
network_signal_handler->SetCollectConnectionStatus(config_.CollectConnectionStatus());
@@ -88,17 +76,18 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
8876
}
8977

9078
// Prometheus
91-
exposer_.RegisterCollectable(registry_);
79+
exposer_.RegisterCollectable(exporter_.GetRegistry());
9280
}
9381

9482
CollectorService::~CollectorService() {
9583
config_loader_.Stop();
9684
server_.close();
97-
exporter_.stop();
9885
if (net_status_notifier_) {
9986
net_status_notifier_->Stop();
10087
}
10188

89+
exporter_.stop();
90+
10291
// system_inspector_ needs to be last, since other components relay on it.
10392
system_inspector_.CleanUp();
10493
}

collector/lib/CollectorService.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,13 @@ class CollectorService {
4545
std::vector<std::unique_ptr<CivetWrapper>> civet_endpoints_;
4646

4747
// Prometheus
48-
std::shared_ptr<prometheus::Registry> registry_;
4948
prometheus::Exposer exposer_;
5049
CollectorStatsExporter exporter_;
5150

5251
ConfigLoader config_loader_;
5352

5453
// Network monitoring
5554
std::shared_ptr<ConnectionTracker> conn_tracker_;
56-
std::shared_ptr<IConnScraper> conn_scraper_;
5755
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
5856
std::shared_ptr<ProcessStore> process_store_;
5957
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;

collector/lib/CollectorStats.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,6 @@ class CollectorStats {
9696
CollectorStats() {};
9797
};
9898

99-
template <typename T>
100-
class CollectorConnectionStats {
101-
public:
102-
virtual void Observe(T inbound_private, T inbound_public, T outbound_private, T outbound_public) = 0;
103-
104-
virtual ~CollectorConnectionStats() {}
105-
};
106-
10799
namespace internal {
108100

109101
template <typename T>

collector/lib/CollectorStatsExporter.cpp

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -9,73 +9,14 @@
99
#include "Logging.h"
1010
#include "Utility.h"
1111
#include "prometheus/gauge.h"
12-
#include "prometheus/summary.h"
1312
#include "system-inspector/Service.h"
1413

1514
namespace collector {
1615

17-
template <typename T>
18-
class CollectorConnectionStatsPrometheus : public CollectorConnectionStats<T> {
19-
public:
20-
CollectorConnectionStatsPrometheus(
21-
std::shared_ptr<prometheus::Registry> registry,
22-
std::string name,
23-
std::string help,
24-
std::chrono::milliseconds max_age,
25-
const std::vector<double>& quantiles,
26-
double error) : family_(prometheus::BuildSummary().Name(name).Help(help).Register(*registry)),
27-
inbound_private_summary_(family_.Add({{"dir", "in"}, {"peer", "private"}}, MakeQuantiles(quantiles, error), max_age)),
28-
inbound_public_summary_(family_.Add({{"dir", "in"}, {"peer", "public"}}, MakeQuantiles(quantiles, error), max_age)),
29-
outbound_private_summary_(family_.Add({{"dir", "out"}, {"peer", "private"}}, MakeQuantiles(quantiles, error), max_age)),
30-
outbound_public_summary_(family_.Add({{"dir", "out"}, {"peer", "public"}}, MakeQuantiles(quantiles, error), max_age)) {}
31-
32-
void Observe(T inbound_private, T inbound_public, T outbound_private, T outbound_public) override {
33-
inbound_private_summary_.Observe(inbound_private);
34-
inbound_public_summary_.Observe(inbound_public);
35-
outbound_private_summary_.Observe(outbound_private);
36-
outbound_public_summary_.Observe(outbound_public);
37-
}
38-
39-
private:
40-
prometheus::Family<prometheus::Summary>& family_;
41-
prometheus::Summary& inbound_private_summary_;
42-
prometheus::Summary& inbound_public_summary_;
43-
prometheus::Summary& outbound_private_summary_;
44-
prometheus::Summary& outbound_public_summary_;
45-
46-
prometheus::Summary::Quantiles MakeQuantiles(std::vector<double> quantiles, double error) {
47-
prometheus::Summary::Quantiles result;
48-
49-
result.reserve(quantiles.size());
50-
51-
auto make_quantile = [error](double q) -> prometheus::detail::CKMSQuantiles::Quantile {
52-
return {q, error};
53-
};
54-
55-
std::transform(quantiles.begin(), quantiles.end(), std::back_inserter(result), make_quantile);
56-
57-
return result;
58-
}
59-
};
60-
61-
CollectorStatsExporter::CollectorStatsExporter(std::shared_ptr<prometheus::Registry> registry, const CollectorConfig* config, system_inspector::Service* si)
62-
: registry_(std::move(registry)),
16+
CollectorStatsExporter::CollectorStatsExporter(const CollectorConfig* config, system_inspector::Service* si)
17+
: registry_(std::make_shared<prometheus::Registry>()),
6318
config_(config),
64-
system_inspector_(si),
65-
connections_total_reporter_(std::make_shared<CollectorConnectionStatsPrometheus<unsigned int>>(
66-
registry_,
67-
"rox_connections_total",
68-
"Amount of stored connections over time",
69-
std::chrono::minutes{config->GetConnectionStatsWindow()},
70-
config->GetConnectionStatsQuantiles(),
71-
config->GetConnectionStatsError())),
72-
connections_rate_reporter_(std::make_shared<CollectorConnectionStatsPrometheus<float>>(
73-
registry_,
74-
"rox_connections_rate",
75-
"Rate of connections over time",
76-
std::chrono::minutes{config->GetConnectionStatsWindow()},
77-
config->GetConnectionStatsQuantiles(),
78-
config->GetConnectionStatsError())) {}
19+
system_inspector_(si) {}
7920

8021
bool CollectorStatsExporter::start() {
8122
if (!thread_.Start(&CollectorStatsExporter::run, this)) {

collector/lib/CollectorStatsExporter.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,27 @@
44
#include <memory>
55

66
#include "CollectorConfig.h"
7+
#include "CollectorConnectionStats.h"
78
#include "CollectorStats.h"
89
#include "StoppableThread.h"
910
#include "prometheus/registry.h"
1011
#include "system-inspector/Service.h"
1112

1213
namespace collector {
13-
1414
class CollectorStatsExporter {
1515
public:
16-
CollectorStatsExporter(std::shared_ptr<prometheus::Registry> registry, const CollectorConfig* config, system_inspector::Service* si);
16+
CollectorStatsExporter(const CollectorConfig* config, system_inspector::Service* si);
1717

1818
bool start();
1919
void run();
2020
void stop();
2121

22-
std::shared_ptr<CollectorConnectionStats<unsigned int>> GetConnectionsTotalReporter() { return connections_total_reporter_; }
23-
std::shared_ptr<CollectorConnectionStats<float>> GetConnectionsRateReporter() { return connections_rate_reporter_; }
22+
std::shared_ptr<prometheus::Registry>& GetRegistry() { return registry_; }
2423

2524
private:
2625
std::shared_ptr<prometheus::Registry> registry_;
2726
const CollectorConfig* config_;
2827
system_inspector::Service* system_inspector_;
29-
std::shared_ptr<CollectorConnectionStats<unsigned int>> connections_total_reporter_;
30-
std::shared_ptr<CollectorConnectionStats<float>> connections_rate_reporter_;
3128
StoppableThread thread_;
3229
};
3330

0 commit comments

Comments
 (0)