diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 2a494cc56cd..d3e08a5efce 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -170,6 +170,11 @@ DEFINE_uint32(datastream_buffer_retention_size, gflags::Uint32FromEnv("PL_DATASTREAM_BUFFER_SIZE", 1024 * 1024), "The maximum size of a data stream buffer retained between cycles."); +DEFINE_uint64(total_conn_tracker_mem_usage, + gflags::Uint64FromEnv("PX_TOTAL_CONN_TRACKER_MEM_USAGE", 0), + "The maximum size in bytes of the collective connection tracker buffers. Defaults to " + "0, which corresponds to no limit. When data is beyond this limit, new data is " + "dropped until the limit is no longer exceeded."); DEFINE_uint64(max_body_bytes, gflags::Uint64FromEnv("PL_STIRLING_MAX_BODY_BYTES", 512), "The maximum number of bytes in the body of protocols like HTTP"); @@ -834,6 +839,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) { } } + size_t current_conn_tracker_buffer_size = 0; for (const auto& conn_tracker : conn_trackers_mgr_.active_trackers()) { const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->protocol()]; @@ -857,7 +863,8 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) { socket_info_mgr_.get()); if (transfer_spec.transfer_fn != nullptr) { - transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table); + current_conn_tracker_buffer_size += + transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table); } else { // If there's no transfer function, then the tracker should not be holding any data. // http::ProtocolTraits is used as a placeholder; the frames deque is expected to be @@ -868,6 +875,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) { conn_tracker->IterationPostTick(); } + total_conn_tracker_mem_usage_ = current_conn_tracker_buffer_size; CheckTracerState(); @@ -1094,6 +1102,19 @@ void SocketTraceConnector::AcceptDataEvent(std::unique_ptr even WriteDataEvent(*event); } + auto msg_size = event->msg.size(); + auto protocol = event->attr.protocol; + total_conn_tracker_mem_usage_ += msg_size; + if (FLAGS_total_conn_tracker_mem_usage != 0 && + total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) { + VLOG_EVERY_N(1, 1000) << absl::Substitute( + "Total buffer size of all active ConnTrackers $0 exceeds the limit $1. " + "Dropping data event of size $2 for protocol $3", + total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, msg_size, protocol); + stats_.Increment(StatKey::kDroppedSocketDataEvent); + return; + } + stats_.Increment(StatKey::kPollSocketDataEventCount); stats_.Increment(StatKey::kPollSocketDataEventAttrSize, sizeof(event->attr)); stats_.Increment(StatKey::kPollSocketDataEventDataSize, event->msg.size()); @@ -1113,11 +1134,32 @@ void SocketTraceConnector::AcceptConnStatsEvent(conn_stats_event_t event) { } void SocketTraceConnector::AcceptHTTP2Header(std::unique_ptr event) { + if (FLAGS_total_conn_tracker_mem_usage != 0 && + total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) { + VLOG_EVERY_N(1, 1000) << absl::Substitute( + "Total buffer size of all active ConnTrackers $0 exceeds the limit $1. " + "Dropping header event", + total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage); + stats_.Increment(StatKey::kDroppedSocketDataEvent); + return; + } + ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id); tracker.AddHTTP2Header(std::move(event)); } void SocketTraceConnector::AcceptHTTP2Data(std::unique_ptr event) { + auto payload_size = event->payload.size(); + total_conn_tracker_mem_usage_ += payload_size; + if (FLAGS_total_conn_tracker_mem_usage != 0 && + total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) { + VLOG_EVERY_N(1, 1000) << absl::Substitute( + "Total buffer size of all active ConnTrackers $0 exceeds the limit $1. " + "Dropping data event of size $2", + total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, payload_size); + stats_.Increment(StatKey::kDroppedSocketDataEvent); + return; + } ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id); tracker.AddHTTP2Data(std::move(event)); } @@ -1791,8 +1833,8 @@ void SocketTraceConnector::WriteDataEvent(const SocketDataEvent& event) { //----------------------------------------------------------------------------- template -void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker, - DataTable* data_table) { +size_t SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker, + DataTable* data_table) { using TFrameType = typename TProtocolTraits::frame_type; using TKey = typename TProtocolTraits::key_type; @@ -1821,6 +1863,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr tracker->Cleanup(FLAGS_messages_size_limit_bytes, FLAGS_datastream_buffer_retention_size, message_expiry_timestamp, buffer_expiry_timestamp); + return tracker->MemUsage(); } void SocketTraceConnector::TransferConnStats(ConnectorContext* ctx, DataTable* data_table) { diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.h b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.h index 2534f57a71e..1c939ffeba7 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.h +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.h @@ -213,7 +213,7 @@ class SocketTraceConnector : public BCCSourceConnector { /* OUT */ struct go_grpc_http2_header_event_t* header_event_data_go_style); template - void TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table); + size_t TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table); void TransferConnStats(ConnectorContext* ctx, DataTable* data_table); void set_iteration_time(std::chrono::time_point time) { @@ -256,7 +256,7 @@ class SocketTraceConnector : public BCCSourceConnector { int32_t trace_mode = TraceMode::Off; uint32_t table_num = 0; std::vector trace_roles; - std::function + std::function transfer_fn = nullptr; bool enabled = false; }; @@ -300,6 +300,8 @@ class SocketTraceConnector : public BCCSourceConnector { UProbeManager uprobe_mgr_; + size_t total_conn_tracker_mem_usage_ = 0; + enum class StatKey { kLossSocketDataEvent, kLossSocketControlEvent, @@ -314,6 +316,8 @@ class SocketTraceConnector : public BCCSourceConnector { kPollSocketDataEventAttrSize, kPollSocketDataEventDataSize, kPollSocketDataEventSize, + + kDroppedSocketDataEvent, }; utils::StatCounter stats_;