Skip to content

Commit 75b7613

Browse files
committed
Restore ProcessSignalFormatter, add SensorClientFormatter
This change makes it possible to have backwards compatibility with older sensor versions that might no implement the new service.
1 parent 1c98811 commit 75b7613

13 files changed

+624
-68
lines changed

collector/lib/CollectorOutput.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,35 @@
11
#include "CollectorOutput.h"
22

3+
#include "internalapi/sensor/collector.pb.h"
4+
#include "internalapi/sensor/collector_iservice.pb.h"
5+
6+
#include "HostInfo.h"
7+
38
namespace collector {
49

10+
CollectorOutput::CollectorOutput(const CollectorConfig& config) {
11+
if (config.grpc_channel != nullptr) {
12+
auto sensor_client = std::make_unique<SensorClient>(config.grpc_channel);
13+
auto signal_client = std::make_unique<SignalServiceClient>(config.grpc_channel);
14+
sensor_clients_.emplace_back(std::move(sensor_client));
15+
signal_clients_.emplace_back(std::move(signal_client));
16+
}
17+
18+
if (config.UseStdout()) {
19+
auto sensor_client = std::make_unique<SensorClientStdout>();
20+
auto signal_client = std::make_unique<StdoutSignalServiceClient>();
21+
sensor_clients_.emplace_back(std::move(sensor_client));
22+
signal_clients_.emplace_back(std::move(signal_client));
23+
}
24+
25+
if (sensor_clients_.empty() || signal_clients_.empty()) {
26+
CLOG(FATAL) << "No available output configured";
27+
}
28+
29+
StartClients(sensor_clients_);
30+
StartClients(signal_clients_);
31+
}
32+
533
namespace {
634
SignalHandler::Result SensorOutput(std::vector<std::unique_ptr<ISensorClient>>& clients, const sensor::MsgFromCollector& msg) {
735
for (auto& client : clients) {
@@ -39,4 +67,19 @@ SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
3967

4068
return std::visit(visitor, msg);
4169
}
70+
71+
void CollectorOutput::Register() {
72+
sensor::MsgFromCollector msg;
73+
msg.clear_info();
74+
msg.clear_process_signal();
75+
msg.mutable_register_()->set_hostname(GetHostname());
76+
77+
for (const auto& client : sensor_clients_) {
78+
auto res = client->SendMsg(msg);
79+
if (res != SignalHandler::PROCESSED) {
80+
use_sensor_client_ = false;
81+
break;
82+
}
83+
}
84+
}
4285
} // namespace collector

collector/lib/CollectorOutput.h

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,17 @@ class CollectorOutput {
2222
CollectorOutput& operator=(const CollectorOutput&) = delete;
2323
CollectorOutput& operator=(CollectorOutput&&) = delete;
2424

25-
CollectorOutput(const CollectorConfig& config) {
26-
if (config.grpc_channel != nullptr) {
27-
auto sensor_client = std::make_unique<SensorClient>(config.grpc_channel);
28-
auto signal_client = std::make_unique<SignalServiceClient>(config.grpc_channel);
29-
sensor_clients_.emplace_back(std::move(sensor_client));
30-
signal_clients_.emplace_back(std::move(signal_client));
31-
}
32-
33-
if (config.UseStdout()) {
34-
auto sensor_client = std::make_unique<SensorClientStdout>();
35-
auto signal_client = std::make_unique<StdoutSignalServiceClient>();
36-
sensor_clients_.emplace_back(std::move(sensor_client));
37-
signal_clients_.emplace_back(std::move(signal_client));
38-
}
39-
40-
if (sensor_clients_.empty() || signal_clients_.empty()) {
41-
CLOG(FATAL) << "No available output configured";
42-
}
43-
44-
StartClients(sensor_clients_);
45-
StartClients(signal_clients_);
46-
}
25+
CollectorOutput(const CollectorConfig& config);
4726

4827
~CollectorOutput() {
4928
StopClients(sensor_clients_);
5029
StopClients(signal_clients_);
5130
}
5231

5332
SignalHandler::Result SendMsg(const MessageType& msg);
33+
void Register();
34+
35+
bool UseSensorClient() { return use_sensor_client_; }
5436

5537
private:
5638
template <typename T>
@@ -69,6 +51,8 @@ class CollectorOutput {
6951

7052
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
7153
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;
54+
55+
bool use_sensor_client_{true};
7256
};
7357

7458
} // namespace collector

collector/lib/CollectorService.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ void CollectorService::RunForever() {
106106
CLOG(FATAL) << "Unable to start collector stats exporter";
107107
}
108108

109+
output_.Register();
110+
109111
system_inspector_.Start();
110112

111113
ControlValue cv;

collector/lib/ProcessSignalFormatter.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
#include <google/protobuf/util/time_util.h>
66

7-
#include "internalapi/sensor/collector_iservice.pb.h"
87
#include "internalapi/sensor/signal_iservice.pb.h"
98

109
#include "CollectorStats.h"
@@ -68,7 +67,7 @@ ProcessSignalFormatter::ProcessSignalFormatter(
6867

6968
ProcessSignalFormatter::~ProcessSignalFormatter() = default;
7069

71-
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
70+
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
7271
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7372
return nullptr;
7473
}
@@ -85,30 +84,34 @@ const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_evt
8584
return nullptr;
8685
}
8786

88-
auto* msg = AllocateRoot();
89-
msg->clear_info();
90-
msg->clear_register_();
91-
msg->set_allocated_process_signal(process_signal);
92-
return msg;
87+
auto* signal = Allocate<Signal>();
88+
signal->set_allocated_process_signal(process_signal);
89+
90+
SignalStreamMessage* signal_stream_message = AllocateRoot();
91+
signal_stream_message->clear_collector_register_request();
92+
signal_stream_message->set_allocated_signal(signal);
93+
return signal_stream_message;
9394
}
9495

95-
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
96+
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
9697
Reset();
9798
if (!ValidateProcessDetails(tinfo)) {
9899
CLOG(INFO) << "Dropping process event: " << tinfo;
99100
return nullptr;
100101
}
101102

102-
ProcessSignal* signal = CreateProcessSignal(tinfo);
103-
if (signal == nullptr) {
103+
ProcessSignal* process_signal = CreateProcessSignal(tinfo);
104+
if (process_signal == nullptr) {
104105
return nullptr;
105106
}
106107

107-
auto* msg = AllocateRoot();
108-
msg->clear_register_();
109-
msg->clear_info();
110-
msg->set_allocated_process_signal(signal);
111-
return msg;
108+
auto* signal = Allocate<Signal>();
109+
signal->set_allocated_process_signal(process_signal);
110+
111+
SignalStreamMessage* signal_stream_message = AllocateRoot();
112+
signal_stream_message->clear_collector_register_request();
113+
signal_stream_message->set_allocated_signal(signal);
114+
return signal_stream_message;
112115
}
113116

114117
ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
@@ -170,7 +173,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
170173
// set time
171174
auto timestamp = Allocate<Timestamp>();
172175
*timestamp = TimeUtil::NanosecondsToTimestamp(event->get_ts());
173-
signal->set_allocated_creation_time(timestamp);
176+
signal->set_allocated_time(timestamp);
174177

175178
// set container_id
176179
if (const std::string* container_id = event_extractor_->get_container_id(event)) {
@@ -235,7 +238,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_threadinfo* tin
235238
// set time
236239
auto timestamp = Allocate<Timestamp>();
237240
*timestamp = TimeUtil::NanosecondsToTimestamp(tinfo->m_clone_ts);
238-
signal->set_allocated_creation_time(timestamp);
241+
signal->set_allocated_time(timestamp);
239242

240243
// set container_id
241244
signal->set_container_id(tinfo->m_container_id);

collector/lib/ProcessSignalFormatter.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#include "api/v1/signal.pb.h"
99
#include "internalapi/sensor/collector_iservice.pb.h"
10+
#include "internalapi/sensor/signal_iservice.pb.h"
11+
#include "storage/process_indicator.pb.h"
1012

1113
#include "CollectorConfig.h"
1214
#include "ContainerMetadata.h"
@@ -23,18 +25,17 @@ class EventExtractor;
2325

2426
namespace collector {
2527

26-
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::MsgFromCollector> {
28+
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::SignalStreamMessage> {
2729
public:
2830
ProcessSignalFormatter(sinsp* inspector, const CollectorConfig& config);
2931
~ProcessSignalFormatter();
3032

3133
using Signal = v1::Signal;
32-
using ProcessSignal = sensor::ProcessSignal;
33-
using LineageInfo = sensor::ProcessSignal_LineageInfo;
34-
using MsgFromCollector = sensor::MsgFromCollector;
34+
using ProcessSignal = storage::ProcessSignal;
35+
using LineageInfo = storage::ProcessSignal_LineageInfo;
3536

36-
const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
37-
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo);
37+
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_evt* event) override;
38+
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_threadinfo* tinfo) override;
3839

3940
void GetProcessLineage(sinsp_threadinfo* tinfo, std::vector<LineageInfo>& lineage);
4041

collector/lib/ProcessSignalHandler.cpp

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66

77
#include <libsinsp/sinsp.h>
88

9-
#include "storage/process_indicator.pb.h"
10-
119
#include "RateLimit.h"
12-
#include "system-inspector/EventExtractor.h"
1310

1411
namespace collector {
1512

16-
std::string compute_process_key(const ::sensor::ProcessSignal& s) {
13+
namespace {
14+
// The template functions in this namespace are meant to be used with
15+
// sensor::ProcessSignal and storage::ProcessSignal, which are almost
16+
// the same... Except they are not...
17+
template <typename S>
18+
std::string compute_process_key(const S& s) {
1719
std::stringstream ss;
1820
ss << s.container_id() << " " << s.name() << " ";
1921
if (s.args().length() <= 256) {
@@ -25,19 +27,39 @@ std::string compute_process_key(const ::sensor::ProcessSignal& s) {
2527
return ss.str();
2628
}
2729

30+
template <typename S>
31+
void dtrace_probe(const S& s) {
32+
const char* name = s.name().c_str();
33+
const uint32_t pid = s.pid();
34+
DTRACE_PROBE2(collector, process_signal_handler, name, pid);
35+
}
36+
} // namespace
37+
2838
SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
29-
const auto* signal_msg = formatter_.ToProtoMessage(evt);
39+
if (client_->UseSensorClient()) {
40+
return HandleSensorSignal(evt);
41+
}
42+
return HandleProcessSignal(evt);
43+
}
44+
45+
SignalHandler::Result ProcessSignalHandler::HandleExistingProcess(sinsp_threadinfo* tinfo) {
46+
if (client_->UseSensorClient()) {
47+
return HandleExistingProcessSensor(tinfo);
48+
}
49+
return HandleExistingProcessSignal(tinfo);
50+
}
51+
52+
SignalHandler::Result ProcessSignalHandler::HandleProcessSignal(sinsp_evt* evt) {
53+
const auto* signal_msg = signal_formatter_.ToProtoMessage(evt);
3054

3155
if (signal_msg == nullptr) {
3256
++(stats_->nProcessResolutionFailuresByEvt);
3357
return IGNORED;
3458
}
3559

36-
const char* name = signal_msg->process_signal().name().c_str();
37-
const uint32_t pid = signal_msg->process_signal().pid();
38-
DTRACE_PROBE2(collector, process_signal_handler, name, pid);
60+
dtrace_probe(signal_msg->signal().process_signal());
3961

40-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
62+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
4163
++(stats_->nProcessRateLimitCount);
4264
return IGNORED;
4365
}
@@ -52,13 +74,38 @@ SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
5274
return result;
5375
}
5476

55-
SignalHandler::Result ProcessSignalHandler::HandleExistingProcess(sinsp_threadinfo* tinfo) {
56-
const auto* signal_msg = formatter_.ToProtoMessage(tinfo);
77+
SignalHandler::Result ProcessSignalHandler::HandleExistingProcessSignal(sinsp_threadinfo* tinfo) {
78+
const auto* signal_msg = signal_formatter_.ToProtoMessage(tinfo);
5779
if (signal_msg == nullptr) {
5880
++(stats_->nProcessResolutionFailuresByTinfo);
5981
return IGNORED;
6082
}
6183

84+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
85+
++(stats_->nProcessRateLimitCount);
86+
return IGNORED;
87+
}
88+
89+
auto result = client_->SendMsg(*signal_msg);
90+
if (result == SignalHandler::PROCESSED) {
91+
++(stats_->nProcessSent);
92+
} else if (result == SignalHandler::ERROR) {
93+
++(stats_->nProcessSendFailures);
94+
}
95+
96+
return result;
97+
}
98+
99+
SignalHandler::Result ProcessSignalHandler::HandleSensorSignal(sinsp_evt* evt) {
100+
const auto* signal_msg = sensor_formatter_.ToProtoMessage(evt);
101+
102+
if (signal_msg == nullptr) {
103+
++(stats_->nProcessResolutionFailuresByEvt);
104+
return IGNORED;
105+
}
106+
107+
dtrace_probe(signal_msg->process_signal());
108+
62109
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
63110
++(stats_->nProcessRateLimitCount);
64111
return IGNORED;
@@ -74,6 +121,27 @@ SignalHandler::Result ProcessSignalHandler::HandleExistingProcess(sinsp_threadin
74121
return result;
75122
}
76123

124+
SignalHandler::Result ProcessSignalHandler::HandleExistingProcessSensor(sinsp_threadinfo* tinfo) {
125+
const auto* signal_msg = sensor_formatter_.ToProtoMessage(tinfo);
126+
if (signal_msg == nullptr) {
127+
++(stats_->nProcessResolutionFailuresByTinfo);
128+
return IGNORED;
129+
}
130+
131+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
132+
++(stats_->nProcessRateLimitCount);
133+
return IGNORED;
134+
}
135+
136+
auto result = client_->SendMsg(*signal_msg);
137+
if (result == SignalHandler::PROCESSED) {
138+
++(stats_->nProcessSent);
139+
} else if (result == SignalHandler::ERROR) {
140+
++(stats_->nProcessSendFailures);
141+
}
142+
143+
return result;
144+
}
77145
std::vector<std::string> ProcessSignalHandler::GetRelevantEvents() {
78146
return {"execve<"};
79147
}

0 commit comments

Comments
 (0)