Skip to content

ROX-28981: Split methods for collector iservice #2085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: mauro/ROX-28526-use-collector-iservice
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions collector/lib/CollectorOutput.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "CollectorOutput.h"

#include "internalapi/sensor/collector_iservice.pb.h"

#include "GRPCUtil.h"
#include "HostInfo.h"

Expand Down Expand Up @@ -40,7 +38,7 @@ void CollectorOutput::HandleOutputError() {
stream_interrupted_.notify_one();
}

SignalHandler::Result CollectorOutput::SensorOutput(const sensor::MsgFromCollector& msg) {
SignalHandler::Result CollectorOutput::SensorOutput(const sensor::ProcessSignal& msg) {
for (auto& client : sensor_clients_) {
auto res = client->SendMsg(msg);
switch (res) {
Expand Down Expand Up @@ -83,7 +81,7 @@ SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMe
SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
auto visitor = [this](auto&& m) {
using T = std::decay_t<decltype(m)>;
if constexpr (std::is_same_v<T, sensor::MsgFromCollector>) {
if constexpr (std::is_same_v<T, sensor::ProcessSignal>) {
return SensorOutput(m);
} else if constexpr (std::is_same_v<T, sensor::SignalStreamMessage>) {
return SignalOutput(m);
Expand Down
5 changes: 2 additions & 3 deletions collector/lib/CollectorOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <variant>

#include "internalapi/sensor/collector_iservice.pb.h"
#include "internalapi/sensor/signal_iservice.pb.h"

#include "CollectorConfig.h"
Expand All @@ -13,7 +12,7 @@

namespace collector {

using MessageType = std::variant<sensor::MsgFromCollector, sensor::SignalStreamMessage>;
using MessageType = std::variant<sensor::ProcessSignal, sensor::SignalStreamMessage>;

class CollectorOutput {
public:
Expand Down Expand Up @@ -62,7 +61,7 @@ class CollectorOutput {
bool EstablishGrpcStreamSingle();

void HandleOutputError();
SignalHandler::Result SensorOutput(const sensor::MsgFromCollector& msg);
SignalHandler::Result SensorOutput(const sensor::ProcessSignal& msg);
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);

std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
Expand Down
6 changes: 3 additions & 3 deletions collector/lib/ProcessSignalHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ SignalHandler::Result ProcessSignalHandler::HandleSensorSignal(sinsp_evt* evt) {
return IGNORED;
}

dtrace_probe(signal_msg->process_signal());
dtrace_probe(*signal_msg);

if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
if (!rate_limiter_.Allow(compute_process_key(*signal_msg))) {
++(stats_->nProcessRateLimitCount);
return IGNORED;
}
Expand All @@ -128,7 +128,7 @@ SignalHandler::Result ProcessSignalHandler::HandleExistingProcessSensor(sinsp_th
return IGNORED;
}

if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
if (!rate_limiter_.Allow(compute_process_key(*signal_msg))) {
++(stats_->nProcessRateLimitCount);
return IGNORED;
}
Expand Down
6 changes: 3 additions & 3 deletions collector/lib/SensorClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace collector {
bool SensorClient::Recreate() {
context_ = std::make_unique<grpc::ClientContext>();
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncCommunicate, channel_, context_.get());
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get());
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message();
Expand All @@ -18,14 +18,14 @@ bool SensorClient::Recreate() {
return true;
}

SignalHandler::Result SensorClient::SendMsg(const sensor::MsgFromCollector& msg) {
SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
if (!stream_active_.load(std::memory_order_acquire)) {
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
<< "GRPC stream is not established";
return SignalHandler::ERROR;
}

if (first_write_ && msg.has_process_signal()) {
if (first_write_) {
first_write_ = false;
return SignalHandler::NEEDS_REFRESH;
}
Expand Down
9 changes: 4 additions & 5 deletions collector/lib/SensorClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <grpcpp/channel.h>

#include "internalapi/sensor/collector_iservice.grpc.pb.h"
#include "internalapi/sensor/collector_iservice.pb.h"

#include "DuplexGRPC.h"
#include "SignalHandler.h"
Expand Down Expand Up @@ -39,7 +38,7 @@ class ISensorClient {
* @returns A SignalHandler::Result with the outcome of the send
* operation.
*/
virtual SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) = 0;
virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0;
};

class SensorClient : public ISensorClient {
Expand All @@ -60,7 +59,7 @@ class SensorClient : public ISensorClient {

bool Recreate() override;

SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override;
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override;

private:
std::shared_ptr<grpc::Channel> channel_;
Expand All @@ -69,15 +68,15 @@ class SensorClient : public ISensorClient {

// This needs to have the same lifetime as the class.
std::unique_ptr<grpc::ClientContext> context_;
std::unique_ptr<IDuplexClientWriter<sensor::MsgFromCollector>> writer_;
std::unique_ptr<IDuplexClientWriter<sensor::ProcessSignal>> writer_;

bool first_write_ = false;
};

class SensorClientStdout : public ISensorClient {
bool Recreate() override { return true; }

SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override {
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override {
LogProtobufMessage(msg);
return SignalHandler::PROCESSED;
}
Expand Down
32 changes: 7 additions & 25 deletions collector/lib/SensorClientFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <google/protobuf/util/time_util.h>

#include "internalapi/sensor/collector_iservice.pb.h"
#include "internalapi/sensor/collector.pb.h"
#include "internalapi/sensor/signal_iservice.pb.h"

#include "CollectorStats.h"
Expand Down Expand Up @@ -67,7 +67,7 @@ SensorClientFormatter::SensorClientFormatter(sinsp* inspector, const CollectorCo

SensorClientFormatter::~SensorClientFormatter() = default;

const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
return nullptr;
}
Expand All @@ -79,39 +79,21 @@ const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_evt*
return nullptr;
}

ProcessSignal* process_signal = CreateProcessSignal(event);
if (process_signal == nullptr) {
return nullptr;
}

auto* msg = AllocateRoot();
msg->clear_info();
msg->clear_register_();
msg->set_allocated_process_signal(process_signal);
return msg;
return CreateProcessSignal(event);
}

const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
Reset();
if (!ValidateProcessDetails(tinfo)) {
CLOG(INFO) << "Dropping process event: " << tinfo;
return nullptr;
}

ProcessSignal* signal = CreateProcessSignal(tinfo);
if (signal == nullptr) {
return nullptr;
}

auto* msg = AllocateRoot();
msg->clear_register_();
msg->clear_info();
msg->set_allocated_process_signal(signal);
return msg;
return CreateProcessSignal(tinfo);
}

ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
auto signal = Allocate<ProcessSignal>();
auto signal = AllocateRoot();

// set id
signal->set_id(UUIDStr());
Expand Down Expand Up @@ -193,7 +175,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
}

ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
auto signal = Allocate<ProcessSignal>();
auto signal = AllocateRoot();

// set id
signal->set_id(UUIDStr());
Expand Down
8 changes: 3 additions & 5 deletions collector/lib/SensorClientFormatter.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <gtest/gtest_prod.h>

#include "api/v1/signal.pb.h"
#include "internalapi/sensor/collector_iservice.pb.h"

#include "CollectorConfig.h"
#include "ContainerMetadata.h"
Expand All @@ -22,7 +21,7 @@ class EventExtractor;

namespace collector {

class SensorClientFormatter : public ProtoSignalFormatter<sensor::MsgFromCollector> {
class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
public:
SensorClientFormatter(const SensorClientFormatter&) = delete;
SensorClientFormatter(SensorClientFormatter&&) = delete;
Expand All @@ -35,10 +34,9 @@ class SensorClientFormatter : public ProtoSignalFormatter<sensor::MsgFromCollect
using Signal = v1::Signal;
using ProcessSignal = sensor::ProcessSignal;
using LineageInfo = sensor::ProcessSignal_LineageInfo;
using MsgFromCollector = sensor::MsgFromCollector;

const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo) override;
const ProcessSignal* ToProtoMessage(sinsp_evt* event) override;
const ProcessSignal* ToProtoMessage(sinsp_threadinfo* tinfo) override;

/**
* Get the list of processes that spawned the current one.
Expand Down
2 changes: 1 addition & 1 deletion collector/proto/third_party/stackrox
Submodule stackrox updated 33 files
+1 −1 COLLECTOR_VERSION
+22 −13 generated/internalapi/sensor/collector_iservice.pb.go
+66 −1 generated/internalapi/sensor/collector_iservice_grpc.pb.go
+6 −0 pkg/env/sensor.go
+3 −0 proto/internalapi/sensor/collector_iservice.proto
+19 −0 sensor/common/collector/service.go
+109 −0 sensor/common/collector/service_impl.go
+0 −1 sensor/common/deduper/deduper.go
+0 −1 sensor/common/detector/enricher.go
+3 −1 sensor/common/metrics/init.go
+64 −8 sensor/common/metrics/metrics.go
+168 −0 sensor/common/processindicator/component.go
+76 −0 sensor/common/processindicator/component_test.go
+1 −1 sensor/common/processindicator/enricher.go
+6 −2 sensor/common/processindicator/pipeline.go
+10 −13 sensor/common/processindicator/pipeline_test.go
+0 −1 sensor/common/registry/lazy_tls_image_registry.go
+0 −13 sensor/common/signal/pipeline.go
+36 −99 sensor/common/signal/signal_service.go
+5 −8 sensor/common/signal/singleton.go
+134 −0 sensor/debugger/collector/client.go
+8 −0 sensor/debugger/collector/fake_collector.go
+0 −1 sensor/kubernetes/enforcer/cronjob/suspend.go
+0 −1 sensor/kubernetes/eventpipeline/pipeline_impl.go
+3 −3 sensor/kubernetes/fake/fake.go
+0 −1 sensor/kubernetes/listener/namespace_patcher.go
+18 −13 sensor/kubernetes/sensor/sensor.go
+10 −3 sensor/tests/connection/runtime/runtime_test.go
+11 −3 sensor/tests/helper/collector.go
+4 −3 sensor/tests/helper/trace.go
+0 −1 sensor/upgrader/plan/planner.go
+0 −1 sensor/upgrader/resources/metadata.go
+1 −1 tests/e2e/lib.sh
6 changes: 2 additions & 4 deletions collector/test/CollectorOutputTest.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "internalapi/sensor/collector_iservice.pb.h"

#include "CollectorOutput.h"
#include "SensorClient.h"
#include "SignalServiceClient.h"
Expand All @@ -11,7 +9,7 @@ namespace collector {
class MockSensorClient : public ISensorClient {
public:
MOCK_METHOD(bool, Recreate, ());
MOCK_METHOD(SignalHandler::Result, SendMsg, (const sensor::MsgFromCollector&));
MOCK_METHOD(SignalHandler::Result, SendMsg, (const sensor::ProcessSignal&));
};

class MockSignalClient : public ISignalServiceClient {
Expand All @@ -32,7 +30,7 @@ class CollectorOutputTest : public testing::Test {
};

TEST_F(CollectorOutputTest, SensorClient) {
sensor::MsgFromCollector msg;
sensor::ProcessSignal msg;

EXPECT_CALL(*sensor_client, SendMsg).Times(1).WillOnce(testing::Return(SignalHandler::PROCESSED));

Expand Down
29 changes: 14 additions & 15 deletions integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ require (
github.com/stackrox/rox v0.0.0-20210914215712-9ac265932e28
github.com/stretchr/testify v1.10.0
github.com/thoas/go-funk v0.9.3
go.opentelemetry.io/otel/trace v1.34.0
go.opentelemetry.io/otel/trace v1.35.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
golang.org/x/sys v0.31.0
google.golang.org/grpc v1.71.0
golang.org/x/sys v0.32.0
google.golang.org/grpc v1.71.1
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.32.3
k8s.io/apimachinery v0.32.3
Expand All @@ -46,7 +46,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-openapi/swag v0.23.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/gonum/blas v0.0.0-20181208220705-f22b278b28ac // indirect
Expand All @@ -67,7 +67,6 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand All @@ -81,7 +80,7 @@ require (
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240409071808-615f978279ca // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.21.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -93,24 +92,24 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/oauth2 v0.28.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/oauth2 v0.29.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.11.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
google.golang.org/protobuf v1.36.5 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
Expand All @@ -129,7 +128,7 @@ replace (
github.com/fullsailor/pkcs7 => github.com/stackrox/pkcs7 v0.0.0-20220914154527-cfdb0aa47179
github.com/heroku/docker-registry-client => github.com/stackrox/docker-registry-client v0.0.0-20230714151239-78b1f5f70b8a
github.com/operator-framework/helm-operator-plugins => github.com/stackrox/helm-operator v0.0.10-0.20220919093109-89f9785764c6
github.com/stackrox/rox => github.com/stackrox/stackrox v0.0.0-20250325114232-8e68c6a26ed1
github.com/stackrox/rox => github.com/stackrox/stackrox v0.0.0-20250415091607-159cf2aea233
go.uber.org/zap => github.com/stackrox/zap v1.18.2-0.20240314134248-5f932edd0404

)
Loading
Loading