Skip to content

Commit 4bd92b8

Browse files
committed
Address PR comments
- Rollback no grpc channel behaviour. - Add documentation to new classes. - Remove unneeded workaround for old kernels. - Minor refactorings in SensorClientFormatter.
1 parent b72128b commit 4bd92b8

8 files changed

+188
-168
lines changed

collector/lib/CollectorOutput.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77

88
namespace collector {
99

10-
CollectorOutput::CollectorOutput(const CollectorConfig& config) {
11-
use_sensor_client_ = !config.UseLegacyServices();
12-
10+
CollectorOutput::CollectorOutput(const CollectorConfig& config)
11+
: use_sensor_client_(!config.UseLegacyServices()) {
1312
if (config.grpc_channel != nullptr) {
1413
channel_ = config.grpc_channel;
1514

@@ -22,7 +21,7 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config) {
2221
}
2322
}
2423

25-
if (config.UseStdout()) {
24+
if (config.grpc_channel == nullptr || config.UseStdout()) {
2625
if (use_sensor_client_) {
2726
auto sensor_client = std::make_unique<SensorClientStdout>();
2827
sensor_clients_.emplace_back(std::move(sensor_client));
@@ -32,10 +31,6 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config) {
3231
}
3332
}
3433

35-
if (sensor_clients_.empty() && signal_clients_.empty()) {
36-
CLOG(FATAL) << "No available output configured";
37-
}
38-
3934
thread_.Start([this] { EstablishGrpcStream(); });
4035
}
4136

@@ -125,7 +120,7 @@ bool CollectorOutput::EstablishGrpcStreamSingle() {
125120
}
126121

127122
// Refresh all clients
128-
auto success = true;
123+
bool success = true;
129124
for (const auto& client : signal_clients_) {
130125
success &= client->Refresh();
131126
}

collector/lib/CollectorOutput.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,23 @@ class CollectorOutput {
3838
signal_clients_.emplace_back(std::move(signal_client));
3939
}
4040

41+
/**
42+
* Send a message to sensor.
43+
*
44+
* @param msg One of sensor::MsgFromCollector or
45+
* sensor::SignalStreamMessage, the proper service to be
46+
* used will be determined from the type held in msg.
47+
* @returns A SignalHandler::Result with the outcome of the send
48+
* operation
49+
*/
4150
SignalHandler::Result SendMsg(const MessageType& msg);
4251

52+
/**
53+
* Whether we should use the new iservice or not.
54+
*
55+
* @returns true if configuration indicates we should use the new
56+
* iservice, false otherwise.
57+
*/
4358
bool UseSensorClient() const { return use_sensor_client_; }
4459

4560
private:
@@ -53,10 +68,10 @@ class CollectorOutput {
5368
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
5469
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;
5570

56-
bool use_sensor_client_{true};
71+
bool use_sensor_client_ = true;
5772

5873
StoppableThread thread_;
59-
std::atomic<bool> stream_active_{false};
74+
std::atomic<bool> stream_active_ = false;
6075
std::condition_variable stream_interrupted_;
6176
std::shared_ptr<grpc::Channel> channel_;
6277
};

collector/lib/ProtoSignalFormatter.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,26 @@ class ProtoSignalFormatter : public BaseProtoSignalFormatter, protected ProtoAll
3030
public:
3131
ProtoSignalFormatter() = default;
3232

33+
/**
34+
* Turn a sinsp_evt into a protobuf message to be sent to sensor.
35+
*
36+
* @param event The event to be translated.
37+
* @returns A pointer to the message to be sent or nullptr if no
38+
* message should be sent. Ownership of the message is kept
39+
* in the implementing class, caller must not attempt to free
40+
* it.
41+
*/
3342
const Message* ToProtoMessage(sinsp_evt* event) override = 0;
43+
44+
/**
45+
* Turn a sinsp_threadinfo into a protobuf message to be sent to sensor.
46+
*
47+
* @param event The event to be translated.
48+
* @returns A pointer to the message to be sent or nullptr if no
49+
* message should be sent. Ownership of the message is kept
50+
* in the implementing class, caller must not attempt to free
51+
* it.
52+
*/
3453
const Message* ToProtoMessage(sinsp_threadinfo* tinfo) override = 0;
3554
};
3655

collector/lib/SensorClient.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,22 @@ class ISensorClient {
2626
ISensorClient& operator=(ISensorClient&&) = delete;
2727
virtual ~ISensorClient() = default;
2828

29+
/**
30+
* Recreate the internal state of the object to allow communication.
31+
*
32+
* Mostly useful for handling gRPC reconnections.
33+
*
34+
* @returns true if the refresh was succesful, false otherwise.
35+
*/
2936
virtual bool Refresh() = 0;
3037

38+
/**
39+
* Send a message to sensor through the iservice.
40+
*
41+
* @param msg The message to be sent to sensor.
42+
* @returns A SignalHandler::Result with the outcome of the send
43+
* operation.
44+
*/
3145
virtual SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) = 0;
3246
};
3347

collector/lib/SensorClientFormatter.cpp

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_evt*
7676
Reset();
7777

7878
if (!ValidateProcessDetails(event)) {
79-
CLOG(INFO) << "Dropping process event: " << ProcessDetails(event);
79+
CLOG(INFO) << "Dropping process event: " << ToString(event);
8080
return nullptr;
8181
}
8282

@@ -178,10 +178,9 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
178178
}
179179

180180
// set process lineage
181-
std::vector<LineageInfo> lineage;
182-
this->GetProcessLineage(event->get_thread_info(), lineage);
181+
auto lineage = GetProcessLineage(event->get_thread_info());
183182
for (const auto& p : lineage) {
184-
auto signal_lineage = signal->add_lineage_info();
183+
auto* signal_lineage = signal->add_lineage_info();
185184
signal_lineage->set_parent_exec_file_path(p.parent_exec_file_path());
186185
signal_lineage->set_parent_uid(p.parent_uid());
187186
}
@@ -241,11 +240,10 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinf
241240
signal->set_container_id(tinfo->m_container_id);
242241

243242
// set process lineage
244-
std::vector<LineageInfo> lineage;
245-
GetProcessLineage(tinfo, lineage);
243+
auto lineage = GetProcessLineage(tinfo);
246244

247245
for (const auto& p : lineage) {
248-
auto signal_lineage = signal->add_lineage_info();
246+
auto* signal_lineage = signal->add_lineage_info();
249247
signal_lineage->set_parent_exec_file_path(p.parent_exec_file_path());
250248
signal_lineage->set_parent_uid(p.parent_uid());
251249
}
@@ -258,7 +256,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinf
258256
return signal;
259257
}
260258

261-
std::string SensorClientFormatter::ProcessDetails(sinsp_evt* event) {
259+
std::string SensorClientFormatter::ToString(sinsp_evt* event) {
262260
std::stringstream ss;
263261
const std::string* path = event_extractor_->get_exepath(event);
264262
const std::string* name = event_extractor_->get_comm(event);
@@ -293,37 +291,28 @@ bool SensorClientFormatter::ValidateProcessDetails(sinsp_evt* event) {
293291
return ValidateProcessDetails(tinfo);
294292
}
295293

296-
int SensorClientFormatter::GetTotalStringLength(const std::vector<LineageInfo>& lineage) {
297-
int totalStringLength = 0;
298-
for (LineageInfo l : lineage) {
299-
totalStringLength += l.parent_exec_file_path().size();
300-
}
301-
302-
return totalStringLength;
303-
}
294+
void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
295+
int string_total = std::accumulate(lineage.cbegin(), lineage.cend(), 0, [](int acc, const auto& l) {
296+
return acc + l.parent_exec_file_path().size();
297+
});
304298

305-
void SensorClientFormatter::CountLineage(const std::vector<LineageInfo>& lineage) {
306-
int totalStringLength = GetTotalStringLength(lineage);
307299
COUNTER_INC(CollectorStats::process_lineage_counts);
308300
COUNTER_ADD(CollectorStats::process_lineage_total, lineage.size());
309301
COUNTER_ADD(CollectorStats::process_lineage_sqr_total, lineage.size() * lineage.size());
310-
COUNTER_ADD(CollectorStats::process_lineage_string_total, totalStringLength);
302+
COUNTER_ADD(CollectorStats::process_lineage_string_total, string_total);
311303
}
312304

313-
void SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
314-
std::vector<LineageInfo>& lineage) {
305+
std::vector<LineageInfo> SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
306+
std::vector<LineageInfo> lineage;
315307
if (tinfo == nullptr) {
316-
return;
308+
return lineage;
317309
}
318-
sinsp_threadinfo* mt = nullptr;
319-
if (tinfo->is_main_thread()) {
320-
mt = tinfo;
321-
} else {
322-
mt = tinfo->get_main_thread();
323-
if (mt == nullptr) {
324-
return;
325-
}
310+
311+
sinsp_threadinfo* mt = tinfo->get_main_thread();
312+
if (mt == nullptr) {
313+
return lineage;
326314
}
315+
327316
sinsp_threadinfo::visitor_func_t visitor = [&lineage](sinsp_threadinfo* pt) {
328317
if (pt == nullptr) {
329318
return false;
@@ -332,22 +321,9 @@ void SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
332321
return false;
333322
}
334323

335-
//
336324
// Collection of process lineage information should stop at the container
337325
// boundary to avoid collecting host process information.
338-
//
339-
// In back-ported eBPF probes, `m_vpid` will not be set for containers
340-
// running when collector comes online because /proc/{pid}/status does
341-
// not contain namespace information, so `m_container_id` is checked
342-
// instead. `m_container_id` is not enough on its own to identify
343-
// containerized processes, because it is not guaranteed to be set on
344-
// all platforms.
345-
//
346-
if (pt->m_vpid == 0) {
347-
if (pt->m_container_id.empty()) {
348-
return false;
349-
}
350-
} else if (pt->m_pid == pt->m_vpid) {
326+
if (pt->m_pid == pt->m_vpid) {
351327
return false;
352328
}
353329

@@ -371,7 +347,9 @@ void SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
371347
return true;
372348
};
373349
mt->traverse_parent_state(visitor);
374-
CountLineage(lineage);
350+
UpdateLineageStats(lineage);
351+
352+
return lineage;
375353
}
376354

377355
} // namespace collector

collector/lib/SensorClientFormatter.h

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,68 @@ class SensorClientFormatter : public ProtoSignalFormatter<sensor::MsgFromCollect
4141
const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
4242
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo) override;
4343

44-
void GetProcessLineage(sinsp_threadinfo* tinfo, std::vector<LineageInfo>& lineage);
44+
/**
45+
* Get the list of processes that spawned the current one.
46+
*
47+
* The list will be limited to processes inside the same container as
48+
* the one being processed.
49+
*
50+
* @param tinfo The sinsp_threadinfo for which we should generate
51+
* lineage.
52+
* @returns A vector with the lineage information.
53+
*/
54+
static std::vector<LineageInfo> GetProcessLineage(sinsp_threadinfo* tinfo);
4555

4656
private:
4757
FRIEND_TEST(SensorClientFormatterTest, NoProcessArguments);
4858
FRIEND_TEST(SensorClientFormatterTest, ProcessArguments);
4959

60+
/**
61+
* Allocate and fill in a ProcessSignal message.
62+
*
63+
* @param event A Falco event to be translated into a ProcessSignal.
64+
* @returns A non-owning pointer to the ProcessSignal.
65+
*/
5066
ProcessSignal* CreateProcessSignal(sinsp_evt* event);
51-
Signal* CreateSignal(sinsp_evt* event);
67+
68+
/**
69+
* Check if the provided threadinfo has the required fields.
70+
*
71+
* @param tinfo the Falco threadinfo to validate.
72+
* @returns true if the threadinfo is valid, false otherwise.
73+
*/
5274
bool ValidateProcessDetails(const sinsp_threadinfo* tinfo);
75+
76+
/**
77+
* Check if the provided event has the required fields.
78+
*
79+
* @param event the Falco event to validate.
80+
* @returns true if the threadinfo is valid, false otherwise.
81+
*/
5382
bool ValidateProcessDetails(sinsp_evt* event);
54-
std::string ProcessDetails(sinsp_evt* event);
5583

56-
Signal* CreateSignal(sinsp_threadinfo* tinfo);
84+
/**
85+
* Translate a Falco event to a printable string.
86+
*
87+
* @param event The Falco event to be translated.
88+
* @returns A printable string.
89+
*/
90+
std::string ToString(sinsp_evt* event);
91+
92+
/**
93+
* Allocate and fill in a ProcessSignal message.
94+
*
95+
* @param tinfo A Falco threadinfo to be translated into a ProcessSignal.
96+
* @returns A non-owning pointer to the ProcessSignal.
97+
*/
5798
ProcessSignal* CreateProcessSignal(sinsp_threadinfo* tinfo);
58-
int GetTotalStringLength(const std::vector<LineageInfo>& lineage);
59-
void CountLineage(const std::vector<LineageInfo>& lineage);
99+
100+
/**
101+
* Update lineage related prometheus stats.
102+
*
103+
* @param lineage The lineage used for updating the stats.
104+
*/
105+
static void UpdateLineageStats(const std::vector<LineageInfo>& lineage);
60106

61107
const EventNames& event_names_;
62108
std::unique_ptr<system_inspector::EventExtractor> event_extractor_;

collector/lib/SignalServiceClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class SignalServiceClient : public ISignalServiceClient {
6161
std::unique_ptr<grpc::ClientContext> context_;
6262
std::unique_ptr<IDuplexClientWriter<SignalStreamMessage>> writer_;
6363

64-
bool first_write_{};
64+
bool first_write_ = false;
6565
};
6666

6767
class StdoutSignalServiceClient : public ISignalServiceClient {

0 commit comments

Comments
 (0)