Skip to content

Provide configurable memory limit for Conn tracker buffer usage #2196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ DEFINE_uint32(datastream_buffer_retention_size,
gflags::Uint32FromEnv("PL_DATASTREAM_BUFFER_SIZE", 1024 * 1024),
"The maximum size of a data stream buffer retained between cycles.");

DEFINE_uint64(total_conn_tracker_mem_usage,
gflags::Uint64FromEnv("PX_TOTAL_CONN_TRACKER_MEM_USAGE", 0),
"The maximum size in bytes of the collective connection tracker buffers. Defaults to "
"0, which corresponds to no limit. When data is beyond this limit, new data is "
"dropped until the limit is no longer exceeded.");
DEFINE_uint64(max_body_bytes, gflags::Uint64FromEnv("PL_STIRLING_MAX_BODY_BYTES", 512),
"The maximum number of bytes in the body of protocols like HTTP");

Expand Down Expand Up @@ -834,6 +839,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
}
}

size_t current_conn_tracker_buffer_size = 0;
for (const auto& conn_tracker : conn_trackers_mgr_.active_trackers()) {
const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->protocol()];

Expand All @@ -857,7 +863,8 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
socket_info_mgr_.get());

if (transfer_spec.transfer_fn != nullptr) {
transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
current_conn_tracker_buffer_size +=
transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
} else {
// If there's no transfer function, then the tracker should not be holding any data.
// http::ProtocolTraits is used as a placeholder; the frames deque is expected to be
Expand All @@ -868,6 +875,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {

conn_tracker->IterationPostTick();
}
total_conn_tracker_mem_usage_ = current_conn_tracker_buffer_size;

CheckTracerState();

Expand Down Expand Up @@ -1094,6 +1102,19 @@ void SocketTraceConnector::AcceptDataEvent(std::unique_ptr<SocketDataEvent> even
WriteDataEvent(*event);
}

auto msg_size = event->msg.size();
auto protocol = event->attr.protocol;
total_conn_tracker_mem_usage_ += msg_size;
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
VLOG_EVERY_N(1, 1000) << absl::Substitute(
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
"Dropping data event of size $2 for protocol $3",
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, msg_size, protocol);
stats_.Increment(StatKey::kDroppedSocketDataEvent);
return;
}

stats_.Increment(StatKey::kPollSocketDataEventCount);
stats_.Increment(StatKey::kPollSocketDataEventAttrSize, sizeof(event->attr));
stats_.Increment(StatKey::kPollSocketDataEventDataSize, event->msg.size());
Expand All @@ -1113,11 +1134,32 @@ void SocketTraceConnector::AcceptConnStatsEvent(conn_stats_event_t event) {
}

void SocketTraceConnector::AcceptHTTP2Header(std::unique_ptr<HTTP2HeaderEvent> event) {
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
VLOG_EVERY_N(1, 1000) << absl::Substitute(
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
"Dropping header event",
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage);
stats_.Increment(StatKey::kDroppedSocketDataEvent);
return;
}

ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id);
tracker.AddHTTP2Header(std::move(event));
}

void SocketTraceConnector::AcceptHTTP2Data(std::unique_ptr<HTTP2DataEvent> event) {
auto payload_size = event->payload.size();
total_conn_tracker_mem_usage_ += payload_size;
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
VLOG_EVERY_N(1, 1000) << absl::Substitute(
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
"Dropping data event of size $2",
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, payload_size);
stats_.Increment(StatKey::kDroppedSocketDataEvent);
return;
}
ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id);
tracker.AddHTTP2Data(std::move(event));
}
Expand Down Expand Up @@ -1791,8 +1833,8 @@ void SocketTraceConnector::WriteDataEvent(const SocketDataEvent& event) {
//-----------------------------------------------------------------------------

template <typename TProtocolTraits>
void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker,
DataTable* data_table) {
size_t SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker,
DataTable* data_table) {
using TFrameType = typename TProtocolTraits::frame_type;
using TKey = typename TProtocolTraits::key_type;

Expand Down Expand Up @@ -1821,6 +1863,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
tracker->Cleanup<TProtocolTraits>(FLAGS_messages_size_limit_bytes,
FLAGS_datastream_buffer_retention_size,
message_expiry_timestamp, buffer_expiry_timestamp);
return tracker->MemUsage<TProtocolTraits>();
}

void SocketTraceConnector::TransferConnStats(ConnectorContext* ctx, DataTable* data_table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class SocketTraceConnector : public BCCSourceConnector {
/* OUT */ struct go_grpc_http2_header_event_t* header_event_data_go_style);

template <typename TProtocolTraits>
void TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table);
size_t TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table);
void TransferConnStats(ConnectorContext* ctx, DataTable* data_table);

void set_iteration_time(std::chrono::time_point<std::chrono::steady_clock> time) {
Expand Down Expand Up @@ -256,7 +256,7 @@ class SocketTraceConnector : public BCCSourceConnector {
int32_t trace_mode = TraceMode::Off;
uint32_t table_num = 0;
std::vector<endpoint_role_t> trace_roles;
std::function<void(SocketTraceConnector&, ConnectorContext*, ConnTracker*, DataTable*)>
std::function<size_t(SocketTraceConnector&, ConnectorContext*, ConnTracker*, DataTable*)>
transfer_fn = nullptr;
bool enabled = false;
};
Expand Down Expand Up @@ -300,6 +300,8 @@ class SocketTraceConnector : public BCCSourceConnector {

UProbeManager uprobe_mgr_;

size_t total_conn_tracker_mem_usage_ = 0;

enum class StatKey {
kLossSocketDataEvent,
kLossSocketControlEvent,
Expand All @@ -314,6 +316,8 @@ class SocketTraceConnector : public BCCSourceConnector {
kPollSocketDataEventAttrSize,
kPollSocketDataEventDataSize,
kPollSocketDataEventSize,

kDroppedSocketDataEvent,
};

utils::StatCounter<StatKey> stats_;
Expand Down
Loading