From f6aae6b252eb46eec03f924e45e5aa83dc3ee74a Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Mon, 14 Apr 2025 20:23:40 +0200 Subject: [PATCH 1/3] refactor: split collect_chunks into two methods This refactor is needed because the shared logic in collect_trace_chunks and TracerPayloadParams. The way these structs are created makes replacing ByteString with the slice harder due to shared lifetime. Furthermore, the enums encodes two different codepaths, the spanBytes and span pb which never interact with each other. So having function that handle both span bytes and pb spans is pure complexity overhead. This refactor also has the advnatage if removing a bunch of panics and lines of code that were here because of the "fake" pb spans and trace exporter spans overlap --- data-pipeline/src/trace_exporter/mod.rs | 98 ++--- sidecar/src/service/sidecar_server.rs | 23 +- trace-mini-agent/src/trace_processor.rs | 7 +- trace-utils/benches/deserialization.rs | 18 +- trace-utils/src/msgpack_decoder/v04/mod.rs | 6 +- trace-utils/src/span/trace_utils.rs | 184 ++++++++ trace-utils/src/trace_utils.rs | 165 ++++--- trace-utils/src/tracer_payload.rs | 478 ++++----------------- trace-utils/tests/test_send_data.rs | 79 ++-- 9 files changed, 423 insertions(+), 635 deletions(-) diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index f5f5c6a8b7..c8bb94a8cd 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -13,8 +13,9 @@ use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError}; use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError}; +use datadog_trace_utils::span::SpanBytes; use datadog_trace_utils::trace_utils::{self, TracerHeaderTags}; -use datadog_trace_utils::tracer_payload::{self, TraceCollection}; +use datadog_trace_utils::tracer_payload; use ddcommon::header::{ APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR, }; @@ -230,11 +231,11 @@ impl TraceExporter { match self.input_format { TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count), TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) { - Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), + Ok((traces, _)) => self.send_deser_ser(traces), Err(e) => Err(TraceExporterError::Deserialization(e)), }, TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) { - Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), + Ok((traces, _)) => self.send_deser_ser(traces), Err(e) => Err(TraceExporterError::Deserialization(e)), }, } @@ -559,7 +560,7 @@ impl TraceExporter { /// Add all spans from the given iterator into the stats concentrator /// # Panic /// Will panic if another thread panicked will holding the lock on `stats_concentrator` - fn add_spans_to_stats(&self, collection: &TraceCollection) { + fn add_spans_to_stats(&self, traces: &[Vec]) { if let StatsComputationStatus::Enabled { stats_concentrator, cancellation_token: _, @@ -569,25 +570,19 @@ impl TraceExporter { #[allow(clippy::unwrap_used)] let mut stats_concentrator = stats_concentrator.lock().unwrap(); - match collection { - TraceCollection::TraceChunk(traces) => { - let spans = traces.iter().flat_map(|trace| trace.iter()); - for span in spans { - stats_concentrator.add_span(span); - } - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - TraceCollection::V07(_) => unreachable!(), + let spans = traces.iter().flat_map(|trace| trace.iter()); + for span in spans { + stats_concentrator.add_span(span); } } } fn send_deser_ser( &self, - mut collection: TraceCollection, + mut traces: Vec>, ) -> Result { self.emit_metric( - HealthMetric::Count(health_metrics::STAT_DESER_TRACES, collection.len() as i64), + HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64), None, ); @@ -596,12 +591,17 @@ impl TraceExporter { // Stats computation if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() { if !self.client_computed_top_level { - collection.set_top_level_spans(); + for chunk in traces.iter_mut() { + datadog_trace_utils::span::trace_utils::compute_top_level_span(chunk); + } } - self.add_spans_to_stats(&collection); + self.add_spans_to_stats(&traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let (dropped_p0_traces, dropped_p0_spans) = collection.drop_chunks(); + let datadog_trace_utils::span::trace_utils::DroppedP0Stats { + dropped_p0_traces, + dropped_p0_spans, + } = datadog_trace_utils::span::trace_utils::drop_chunks(&mut traces); // Update the headers to indicate that stats have been computed and forward dropped // traces counts @@ -611,49 +611,23 @@ impl TraceExporter { header_tags.dropped_p0_spans = dropped_p0_spans; } - let payload = match self.input_format { - TraceExporterInputFormat::V04 => match self.output_format { - TraceExporterOutputFormat::V04 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - false, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, - TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - true, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, - }, - TraceExporterInputFormat::V05 => match self.output_format { - TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - true, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, + let use_v05_format = match (self.input_format, self.output_format) { + (TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04) => false, + (TraceExporterInputFormat::V04, TraceExporterOutputFormat::V05) + | (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05) => true, + (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V04) => { // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!( - "Conversion from v05 to {:?} not implemented", - self.output_format - ), - }, - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!("Input format not implemented"), + unreachable!("Conversion from v05 to v04 not implemented") + } + (TraceExporterInputFormat::Proxy, _) => { + // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). + unreachable!("Codepath invalid for proxy mode",) + } }; + let payload: tracer_payload::TraceChunks = + trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + })?; let chunks = payload.size(); let endpoint = Endpoint { @@ -667,14 +641,12 @@ impl TraceExporter { let strategy = RetryStrategy::default(); let mp_payload = match &payload { - tracer_payload::TracerPayloadCollection::V04(p) => { + tracer_payload::TraceChunks::V04(p) => { rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)? } - tracer_payload::TracerPayloadCollection::V05(p) => { + tracer_payload::TraceChunks::V05(p) => { rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)? } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!("Serialization for v07 not implemented"), }; let payload_len = mp_payload.len(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 988aa7f1ab..39426be09d 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -16,7 +16,7 @@ use datadog_ipc::tarpc; use datadog_ipc::tarpc::context::Context; use datadog_ipc::transport::Transport; use datadog_trace_utils::trace_utils::SendData; -use datadog_trace_utils::tracer_payload; +use datadog_trace_utils::tracer_payload::decode_to_trace_chunks; use datadog_trace_utils::tracer_payload::TraceEncoding; use ddcommon::{Endpoint, MutexExt}; use ddtelemetry::worker::{ @@ -275,20 +275,15 @@ impl SidecarServer { headers ); - let mut size = 0; - let mut processor = tracer_payload::DefaultTraceChunkProcessor; - let mut payload_params = tracer_payload::TracerPayloadParams::new( - data, - &headers, - &mut processor, - target.api_key.is_some(), - TraceEncoding::V04, - ); - payload_params.measure_size(&mut size); - match payload_params.try_into() { - Ok(payload) => { + match decode_to_trace_chunks(data, TraceEncoding::V04) { + Ok((payload, size)) => { trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}"); - let data = SendData::new(size, payload, headers, target); + let data = SendData::new( + size, + payload.into_tracer_payload_collection(), + headers, + target, + ); self.trace_flusher.enqueue(data); } Err(e) => { diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 232be1e1f0..24a8c86ba4 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -13,7 +13,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::trace_utils::{EnvironmentType, SendData}; -use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceCollection}; +use datadog_trace_utils::tracer_payload::TraceChunkProcessor; use crate::{ config::Config, @@ -96,15 +96,14 @@ impl TraceProcessor for ServerlessTraceProcessor { } }; - let payload = match trace_utils::collect_trace_chunks( - TraceCollection::V07(traces), + let payload = match trace_utils::collect_pb_trace_chunks( + traces, &tracer_header_tags, &mut ChunkProcessor { config: config.clone(), mini_agent_metadata: mini_agent_metadata.clone(), }, true, // In mini agent, we always send agentless - false, ) { Ok(res) => res, Err(err) => { diff --git a/trace-utils/benches/deserialization.rs b/trace-utils/benches/deserialization.rs index 91477d95d7..85d9ef5578 100644 --- a/trace-utils/benches/deserialization.rs +++ b/trace-utils/benches/deserialization.rs @@ -2,10 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use criterion::{black_box, criterion_group, Criterion}; -use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; -use datadog_trace_utils::tracer_payload::{ - DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams, -}; +use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; use serde_json::{json, Value}; fn generate_spans(num_spans: usize, trace_id: u64) -> Vec { @@ -65,7 +62,6 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) { let data = rmp_serde::to_vec(&generate_trace_chunks(20, 2_075)) .expect("Failed to serialize test spans."); let data_as_bytes = tinybytes::Bytes::copy_from_slice(&data); - let tracer_header_tags = &TracerHeaderTags::default(); c.bench_function( "benching deserializing traces from msgpack to their internal representation ", @@ -73,16 +69,8 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) { b.iter_batched( || data_as_bytes.clone(), |data_as_bytes| { - let result: anyhow::Result = black_box( - TracerPayloadParams::new( - data_as_bytes, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(), - ); + let result = + black_box(decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04)); assert!(result.is_ok()); // Return the result to avoid measuring the deallocation time result diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 5687385b2c..4567c28ea4 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -17,7 +17,8 @@ use crate::span::{SpanBytes, SpanSlice}; /// /// # Returns /// -/// * `Ok(Vec)` - A vector of decoded `TracerPayloadV04` objects if successful. +/// * `Ok(Vec, usize)` - A vector of decoded `Vec` objects if +/// successful. and the number of bytes in the slice used by the decoder. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors @@ -76,7 +77,8 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize) /// /// # Returns /// -/// * `Ok(Vec)` - A vector of decoded `Vec` objects if successful. +/// * `Ok(Vec, usize)` - A vector of decoded `Vec` objects if +/// successful. and the number of bytes in the slice used by the decoder. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors diff --git a/trace-utils/src/span/trace_utils.rs b/trace-utils/src/span/trace_utils.rs index 801c6f0b2a..3f534e3186 100644 --- a/trace-utils/src/span/trace_utils.rs +++ b/trace-utils/src/span/trace_utils.rs @@ -83,6 +83,76 @@ pub fn is_partial_snapshot(span: &Span) -> bool { .is_some_and(|v| *v >= 0.0) } +pub struct DroppedP0Stats { + pub dropped_p0_traces: usize, + pub dropped_p0_spans: usize, +} + +// Keys used for sampling +const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; +const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism"; +const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; + +/// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by +/// the agent. +/// +/// # Returns +/// +/// A tuple containing the dropped p0 stats, the first value correspond the amount of traces +/// dropped and the latter to the spans dropped. +pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats +where + T: SpanText + Default, +{ + let mut dropped_p0_traces = 0; + let mut dropped_p0_spans = 0; + + traces.retain_mut(|chunk| { + // List of spans to keep even if the chunk is dropped + let mut sampled_indexes = Vec::new(); + for (index, span) in chunk.iter().enumerate() { + // ErrorSampler + if span.error == 1 { + // We send chunks containing an error + return true; + } + // PrioritySampler and NoPrioritySampler + let priority = span.metrics.get(SAMPLING_PRIORITY_KEY); + if has_top_level(span) && (priority.is_none() || priority.is_some_and(|p| *p > 0.0)) { + // We send chunks with positive priority or no priority + return true; + } + // SingleSpanSampler and AnalyzedSpansSampler + else if span + .metrics + .get(SAMPLING_SINGLE_SPAN_MECHANISM) + .is_some_and(|m| *m == 8.0) + || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY) + { + // We send spans sampled by single-span sampling or analyzed spans + sampled_indexes.push(index); + } + } + dropped_p0_spans += chunk.len() - sampled_indexes.len(); + if sampled_indexes.is_empty() { + // If no spans were sampled we can drop the whole chunk + dropped_p0_traces += 1; + return false; + } + let sampled_spans = sampled_indexes + .iter() + .map(|i| std::mem::take(&mut chunk[*i])) + .collect(); + *chunk = sampled_spans; + true + }); + + DroppedP0Stats { + dropped_p0_traces, + dropped_p0_spans, + } +} + #[cfg(test)] mod tests { use super::*; @@ -176,4 +246,118 @@ mod tests { .collect(); assert_eq!(spans_marked_as_top_level, [1, 4, 5]) } + + #[test] + fn test_drop_chunks() { + let chunk_with_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 1.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_null_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_without_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_error = vec![ + SpanBytes { + span_id: 1, + error: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_a_single_span = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]), + ..Default::default() + }, + ]; + let chunk_with_analyzed_span = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]), + ..Default::default() + }, + ]; + + let chunks_and_expected_sampled_spans = vec![ + (chunk_with_priority, 2), + (chunk_with_null_priority, 0), + (chunk_without_priority, 2), + (chunk_with_error, 2), + (chunk_with_a_single_span, 1), + (chunk_with_analyzed_span, 1), + ]; + + for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() { + let mut traces = vec![chunk]; + drop_chunks(&mut traces); + + if expected_count == 0 { + assert!(traces.is_empty()); + } else { + assert_eq!(traces[0].len(), expected_count); + } + } + } } diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index af5ee44d53..0083f4fa88 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -6,8 +6,8 @@ pub use crate::send_data::SendData; use crate::span::v05; use crate::span::v05::dict::SharedDict; pub use crate::tracer_header_tags::TracerHeaderTags; -use crate::tracer_payload; -use crate::tracer_payload::{TraceCollection, TracerPayloadCollection}; +use crate::tracer_payload::TracerPayloadCollection; +use crate::tracer_payload::{self, TraceChunks}; use anyhow::anyhow; use bytes::buf::Reader; use datadog_trace_normalization::normalizer; @@ -597,100 +597,96 @@ macro_rules! parse_root_span_tags { } } -pub fn collect_trace_chunks( - traces: TraceCollection, +pub fn collect_trace_chunks( + traces: Vec>, + use_v05_format: bool, +) -> anyhow::Result { + if use_v05_format { + let mut shared_dict = SharedDict::default(); + let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); + for trace in traces { + let v05_trace = trace.iter().try_fold( + Vec::with_capacity(trace.len()), + |mut acc, span| -> anyhow::Result> { + acc.push(v05::from_span_bytes(span, &mut shared_dict)?); + Ok(acc) + }, + )?; + + v05_traces.push(v05_trace); + } + Ok(TraceChunks::V05((shared_dict.dict(), v05_traces))) + } else { + Ok(TraceChunks::V04(traces)) + } +} + +pub fn collect_pb_trace_chunks( + mut traces: Vec>, tracer_header_tags: &TracerHeaderTags, process_chunk: &mut T, is_agentless: bool, - use_v05_format: bool, ) -> anyhow::Result { - match traces { - TraceCollection::TraceChunk(traces) => { - if use_v05_format { - let mut shared_dict = SharedDict::default(); - let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); - for trace in traces { - let v05_trace = trace.iter().try_fold( - Vec::with_capacity(trace.len()), - |mut acc, span| -> anyhow::Result> { - acc.push(v05::from_span_bytes(span, &mut shared_dict)?); - Ok(acc) - }, - )?; - - v05_traces.push(v05_trace); - } - Ok(TracerPayloadCollection::V05(( - shared_dict.dict(), - v05_traces, - ))) - } else { - Ok(TracerPayloadCollection::V04(traces)) - } - } - TraceCollection::V07(mut traces) => { - let mut trace_chunks: Vec = Vec::new(); + let mut trace_chunks: Vec = Vec::new(); - // We'll skip setting the global metadata and rely on the agent to unpack these - let mut gathered_root_span_tags = !is_agentless; - let mut root_span_tags = RootSpanTags::default(); + // We'll skip setting the global metadata and rely on the agent to unpack these + let mut gathered_root_span_tags = !is_agentless; + let mut root_span_tags = RootSpanTags::default(); - for trace in traces.iter_mut() { - if is_agentless { - if let Err(e) = normalizer::normalize_trace(trace) { - error!("Error normalizing trace: {e}"); - } - } + for trace in traces.iter_mut() { + if is_agentless { + if let Err(e) = normalizer::normalize_trace(trace) { + error!("Error normalizing trace: {e}"); + } + } - let mut chunk = construct_trace_chunk(trace.to_vec()); + let mut chunk = construct_trace_chunk(trace.to_vec()); - let root_span_index = match get_root_span_index(trace) { - Ok(res) => res, - Err(e) => { - error!("Error getting the root span index of a trace, skipping. {e}"); - continue; - } - }; + let root_span_index = match get_root_span_index(trace) { + Ok(res) => res, + Err(e) => { + error!("Error getting the root span index of a trace, skipping. {e}"); + continue; + } + }; - if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { - error!("Error normalizing trace chunk: {e}"); - } + if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { + error!("Error normalizing trace chunk: {e}"); + } - for span in chunk.spans.iter_mut() { - // TODO: obfuscate & truncate spans - if tracer_header_tags.client_computed_top_level { - update_tracer_top_level(span); - } - } + for span in chunk.spans.iter_mut() { + // TODO: obfuscate & truncate spans + if tracer_header_tags.client_computed_top_level { + update_tracer_top_level(span); + } + } - if !tracer_header_tags.client_computed_top_level { - compute_top_level_span(&mut chunk.spans); - } + if !tracer_header_tags.client_computed_top_level { + compute_top_level_span(&mut chunk.spans); + } - process_chunk.process(&mut chunk, root_span_index); + process_chunk.process(&mut chunk, root_span_index); - trace_chunks.push(chunk); + trace_chunks.push(chunk); - if !gathered_root_span_tags { - gathered_root_span_tags = true; - let meta_map = &trace[root_span_index].meta; - parse_root_span_tags!( - meta_map, - { - "env" => root_span_tags.env, - "version" => root_span_tags.app_version, - "_dd.hostname" => root_span_tags.hostname, - "runtime-id" => root_span_tags.runtime_id, - } - ); + if !gathered_root_span_tags { + gathered_root_span_tags = true; + let meta_map = &trace[root_span_index].meta; + parse_root_span_tags!( + meta_map, + { + "env" => root_span_tags.env, + "version" => root_span_tags.app_version, + "_dd.hostname" => root_span_tags.hostname, + "runtime-id" => root_span_tags.runtime_id, } - } - - Ok(TracerPayloadCollection::V07(vec![ - construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags), - ])) + ); } } + + Ok(TracerPayloadCollection::V07(vec![ + construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags), + ])) } /// Returns true if a span should be measured (i.e., it should get trace metrics calculated). @@ -1126,17 +1122,10 @@ mod tests { fn test_collect_trace_chunks_v05() { let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - let collection = collect_trace_chunks( - TraceCollection::TraceChunk(vec![chunk]), - &TracerHeaderTags::default(), - &mut tracer_payload::DefaultTraceChunkProcessor, - false, - true, - ) - .unwrap(); + let collection = collect_trace_chunks(vec![chunk], true).unwrap(); let (dict, traces) = match collection { - TracerPayloadCollection::V05(payload) => payload, + TraceChunks::V05(payload) => payload, _ => panic!("Unexpected type"), }; diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 8274d07c81..ee55b69ffd 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,26 +1,16 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{trace_utils, v05, SpanBytes}; -use crate::{ - msgpack_decoder, - trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, -}; +use crate::span::{v05, SpanBytes}; +use crate::trace_utils::collect_trace_chunks; +use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; use datadog_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; use tinybytes; -type DroppedP0Traces = usize; -type DroppedP0Spans = usize; pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; -pub type DroppedP0Stats = (DroppedP0Traces, DroppedP0Spans); - -// Keys used for sampling -const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; -const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism"; -const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; #[derive(Debug, Clone)] /// Enumerates the different encoding types. @@ -33,109 +23,30 @@ pub enum TraceEncoding { V07, } -/// A collection of traces before they are turned into TraceChunks. -pub enum TraceCollection { - /// Collection of traces using protobuf representation of trace chunks. Used for V07 - /// implementation. - V07(Vec>), - /// Collection of traces using the SpanBytes representation of trace chunks. This - /// representation allows the use of ByteString for representing Strings in order to avoid - /// unnecessary cloning during the processing of the chunks. Used for V04 and V05 - /// implementations. - TraceChunk(Vec>), +#[derive(Debug, Clone)] +pub enum TraceChunks { + /// Collection of TraceChunkSpan. + V04(Vec>), + /// Collection of TraceChunkSpan with de-duplicated strings. + V05((Vec, Vec>)), } -impl TraceCollection { - pub fn len(&self) -> usize { - match self { - TraceCollection::V07(traces) => traces.len(), - TraceCollection::TraceChunk(traces) => traces.len(), - } - } - - pub fn is_empty(&self) -> bool { +impl TraceChunks { + pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection { match self { - TraceCollection::V07(traces) => traces.is_empty(), - TraceCollection::TraceChunk(traces) => traces.is_empty(), + TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), + TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), } } +} - pub fn set_top_level_spans(&mut self) { - match self { - TraceCollection::TraceChunk(traces) => { - for chunk in traces.iter_mut() { - trace_utils::compute_top_level_span(chunk); - } - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - TraceCollection::V07(_) => { - unimplemented!("set_top_level_spans not implemented for v07") - } - } - } - - /// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by - /// the agent. - /// - /// # Returns - /// - /// A tuple containing the dropped p0 stats, the first value correspond the amount of traces - /// dropped and the latter to the spans dropped. - pub fn drop_chunks(&mut self) -> DroppedP0Stats { - let mut dropped_p0_traces = 0; - let mut dropped_p0_spans = 0; - +impl TraceChunks { + /// Returns the number of traces in the chunk + pub fn size(&self) -> usize { match self { - TraceCollection::TraceChunk(traces) => { - traces.retain_mut(|chunk| { - // List of spans to keep even if the chunk is dropped - let mut sampled_indexes = Vec::new(); - for (index, span) in chunk.iter().enumerate() { - // ErrorSampler - if span.error == 1 { - // We send chunks containing an error - return true; - } - // PrioritySampler and NoPrioritySampler - let priority = span.metrics.get(SAMPLING_PRIORITY_KEY); - if trace_utils::has_top_level(span) - && (priority.is_none() || priority.is_some_and(|p| *p > 0.0)) - { - // We send chunks with positive priority or no priority - return true; - } - // SingleSpanSampler and AnalyzedSpansSampler - else if span - .metrics - .get(SAMPLING_SINGLE_SPAN_MECHANISM) - .is_some_and(|m| *m == 8.0) - || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY) - { - // We send spans sampled by single-span sampling or analyzed spans - sampled_indexes.push(index); - } - } - dropped_p0_spans += chunk.len() - sampled_indexes.len(); - if sampled_indexes.is_empty() { - // If no spans were sampled we can drop the whole chunk - dropped_p0_traces += 1; - return false; - } - let sampled_spans = sampled_indexes - .iter() - .map(|i| std::mem::take(&mut chunk[*i])) - .collect(); - *chunk = sampled_spans; - true - }); - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - TraceCollection::V07(_) => unimplemented!("drop_chunks not implemented for v07"), + TraceChunks::V04(traces) => traces.len(), + TraceChunks::V05((_, traces)) => traces.len(), } - - (dropped_p0_traces, dropped_p0_spans) } } @@ -278,151 +189,57 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { // Default implementation does nothing. } } -/// Represents the parameters required to collect trace chunks from msgpack data. + +/// This method processes the msgpack data contained within `data` based on +/// the specified `encoding_type`, converting it into a collection of tracer payloads. +/// +/// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are +/// supported. +/// +/// # Returns +/// +/// A `Result` containing either the successfully converted `TraceChunks` and the length consummed +/// from the data or an error if the conversion fails. Possible errors include issues with +/// deserializing the msgpack data or if the data does not conform to the expected format. +/// +/// # Examples /// -/// This struct encapsulates all the necessary parameters for converting msgpack data into -/// a `TracerPayloadCollection`. It is designed to work with the `TryInto` trait to facilitate -/// the conversion process, handling different encoding types and ensuring that all required -/// data is available for the conversion. -pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { - /// A tinybytes::Bytes slice containing the serialized msgpack data. +/// ```rust +/// use datadog_trace_protobuf::pb; +/// use datadog_trace_utils::trace_utils::TracerHeaderTags; +/// use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; +/// use std::convert::TryInto; +/// use tinybytes; +/// // This will likely be a &[u8] slice in practice. +/// let data: Vec = Vec::new(); +/// let data_as_bytes = tinybytes::Bytes::from(data); +/// let result = decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04) +/// .map(|(chunks, _size)| chunks.into_tracer_payload_collection()); +/// +/// match result { +/// Ok(collection) => println!("Successfully converted to TracerPayloadCollection."), +/// Err(e) => println!("Failed to convert: {:?}", e), +/// } +/// ``` +pub fn decode_to_trace_chunks( data: tinybytes::Bytes, - /// Reference to `TracerHeaderTags` containing metadata for the trace. - tracer_header_tags: &'a TracerHeaderTags<'a>, - /// Amount of data consumed from buffer - size: Option<&'a mut usize>, - /// A mutable reference to an implementation of `TraceChunkProcessor` that processes each - /// `TraceChunk` after it is constructed but before it is added to the TracerPayloadCollection. - /// TraceChunks are only available for v07 traces. - chunk_processor: &'a mut T, - /// A boolean indicating whether the agent is running in an agentless mode. This is used to - /// determine what protocol trace chunks should be serialized into when being sent. - is_agentless: bool, - /// The encoding type of the trace data, determining how the data should be deserialized and - /// processed. encoding_type: TraceEncoding, -} - -impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> { - pub fn new( - data: tinybytes::Bytes, - tracer_header_tags: &'a TracerHeaderTags, - chunk_processor: &'a mut T, - is_agentless: bool, - encoding_type: TraceEncoding, - ) -> TracerPayloadParams<'a, T> { - TracerPayloadParams { - data, - tracer_header_tags, - size: None, - chunk_processor, - is_agentless, - encoding_type, - } +) -> Result<(TraceChunks, usize), anyhow::Error> { + let (data, size) = match encoding_type { + TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), + TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), + // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). + #[allow(clippy::unimplemented)] + _ => unimplemented!( + "Encodings other than TraceEncoding::V04 and TraceEncoding::V05 not implemented yet." + ), } + .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; - pub fn measure_size(&mut self, size: &'a mut usize) { - self.size = Some(size); - } -} -// TODO: APMSP-1282 - Implement TryInto for other encoding types. Supporting TraceChunkProcessor but -// not supporting v07 is a bit pointless for now. -impl<'a, T: TraceChunkProcessor + 'a> TryInto - for TracerPayloadParams<'a, T> -{ - type Error = anyhow::Error; - /// Attempts to convert `TracerPayloadParams` into a `TracerPayloadCollection`. - /// - /// This method processes the msgpack data contained within `TracerPayloadParams` based on - /// the specified `encoding_type`, converting it into a collection of tracer payloads. - /// The conversion process involves deserializing the msgpack data, applying any necessary - /// processing through `process_chunk`, and assembling the resulting data into - /// a `TracerPayloadCollection`. - /// - /// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are - /// supported. - /// - /// # Returns - /// - /// A `Result` containing either the successfully converted `TracerPayloadCollection` or - /// an error if the conversion fails. Possible errors include issues with deserializing the - /// msgpack data or if the data does not conform to the expected format. - /// - /// # Examples - /// - /// ```rust - /// use datadog_trace_protobuf::pb; - /// use datadog_trace_utils::trace_utils::TracerHeaderTags; - /// use datadog_trace_utils::tracer_payload::{ - /// DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams, - /// }; - /// use std::convert::TryInto; - /// use tinybytes; - /// // This will likely be a &[u8] slice in practice. - /// let data: Vec = Vec::new(); - /// let data_as_bytes = tinybytes::Bytes::from(data); - /// let tracer_header_tags = &TracerHeaderTags::default(); - /// let result: Result = TracerPayloadParams::new( - /// data_as_bytes, - /// tracer_header_tags, - /// &mut DefaultTraceChunkProcessor, - /// false, - /// TraceEncoding::V04, - /// ) - /// .try_into(); - /// - /// match result { - /// Ok(collection) => println!("Successfully converted to TracerPayloadCollection."), - /// Err(e) => println!("Failed to convert: {:?}", e), - /// } - /// ``` - fn try_into(self) -> Result { - match self.encoding_type { - TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::from_bytes(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; - - if let Some(size_ref) = self.size { - *size_ref = size; - } - - Ok(collect_trace_chunks( - TraceCollection::TraceChunk(traces), - self.tracer_header_tags, - self.chunk_processor, - self.is_agentless, - false, - )?) - }, - TraceEncoding::V05 => { - let (traces, size) = match msgpack_decoder::v05::from_bytes(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; - - if let Some(size_ref) = self.size { - *size_ref = size; - } - - Ok(collect_trace_chunks( - TraceCollection::TraceChunk(traces), - self.tracer_header_tags, - self.chunk_processor, - self.is_agentless, - true, - )?) - }, - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - _ => unimplemented!("Encodings other than TraceEncoding::V04 and TraceEncoding::V05 not implemented yet."), - } - } + Ok(( + collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?, + size, + )) } #[cfg(test)] @@ -435,8 +252,6 @@ mod tests { use std::collections::HashMap; use tinybytes::BytesString; - const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level"; - fn create_dummy_collection_v07() -> TracerPayloadCollection { TracerPayloadCollection::V07(vec![pb::TracerPayload { container_id: "".to_string(), @@ -588,23 +403,14 @@ mod tests { .expect("Failed to serialize test span."); let data = tinybytes::Bytes::from(data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let result: anyhow::Result = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(data, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); - assert_eq!(2, collection.size()); + let (chunks, _) = result.unwrap(); + assert_eq!(2, chunks.size()); - if let TracerPayloadCollection::V04(traces) = collection { + if let TraceChunks::V04(traces) = chunks { assert_eq!(expected_serialized_span_data1, traces[0]); assert_eq!(expected_serialized_span_data2, traces[1]); } else { @@ -618,20 +424,11 @@ mod tests { let empty_data = vec![0x90]; let data = tinybytes::Bytes::from(empty_data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let result: anyhow::Result = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(data, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); + let (collection, _) = result.unwrap(); assert_eq!(0, collection.size()); } @@ -641,144 +438,17 @@ mod tests { let expected = vec![dummy_trace.clone()]; let payload = rmp_serde::to_vec_named(&expected).unwrap(); let payload = tinybytes::Bytes::from(payload); - let tracer_header_tags = &TracerHeaderTags::default(); - let result: anyhow::Result = TracerPayloadParams::new( - payload, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(payload, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); + let (collection, _size) = result.unwrap(); assert_eq!(1, collection.size()); - if let TracerPayloadCollection::V04(traces) = collection { + if let TraceChunks::V04(traces) = collection { assert_eq!(dummy_trace, traces[0]); } else { panic!("Invalid collection type returned for try_into"); } } - - #[test] - fn test_drop_chunks() { - let chunk_with_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 1.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_null_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_without_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_error = vec![ - SpanBytes { - span_id: 1, - error: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_a_single_span = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]), - ..Default::default() - }, - ]; - let chunk_with_analyzed_span = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]), - ..Default::default() - }, - ]; - - let chunks_and_expected_sampled_spans = vec![ - (chunk_with_priority, 2), - (chunk_with_null_priority, 0), - (chunk_without_priority, 2), - (chunk_with_error, 2), - (chunk_with_a_single_span, 1), - (chunk_with_analyzed_span, 1), - ]; - - for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() { - let mut collection = TraceCollection::TraceChunk(vec![chunk]); - collection.drop_chunks(); - - let traces = match collection { - TraceCollection::TraceChunk(t) => t, - _ => panic!("Collection must contain TraceChunkSpan"), - }; - - if expected_count == 0 { - assert!(traces.is_empty()); - } else { - assert_eq!(traces[0].len(), expected_count); - } - } - } } diff --git a/trace-utils/tests/test_send_data.rs b/trace-utils/tests/test_send_data.rs index 2209c90e70..777ed36586 100644 --- a/trace-utils/tests/test_send_data.rs +++ b/trace-utils/tests/test_send_data.rs @@ -7,9 +7,7 @@ mod tracing_integration_tests { use datadog_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; use datadog_trace_utils::test_utils::{create_test_json_span, create_test_no_alloc_span}; use datadog_trace_utils::trace_utils::TracerHeaderTags; - use datadog_trace_utils::tracer_payload::{ - DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadParams, - }; + use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; #[cfg(target_os = "linux")] use ddcommon::connector::uds::socket_path_to_uri; use ddcommon::Endpoint; @@ -103,19 +101,17 @@ mod tracing_integration_tests { let data = get_v04_trace_snapshot_test_payload("test_send_data_v04_snapshot"); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, _size) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + 300, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await; @@ -197,19 +193,17 @@ mod tracing_integration_tests { let data = tinybytes::Bytes::from(encoded_data); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, _) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + 300, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await; @@ -237,19 +231,16 @@ mod tracing_integration_tests { let empty_data = vec![0x90]; let data = tinybytes::Bytes::from(empty_data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let payload_collection = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); - let data = SendData::new(0, payload_collection, header_tags, &endpoint); + let (payload_collection, _) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + + let data = SendData::new( + 0, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let result = data.send().await; @@ -319,19 +310,17 @@ mod tracing_integration_tests { let data = get_v04_trace_snapshot_test_payload("test_send_data_v04_snapshot_uds"); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, size) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + size, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await; From 8f9c6d961f631c10c78439c35a5105c787a47f3d Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 15 Apr 2025 15:36:50 +0200 Subject: [PATCH 2/3] Remove v0.7 trace encoding --- trace-utils/src/tracer_payload.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index ee55b69ffd..9b97085c76 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -19,8 +19,6 @@ pub enum TraceEncoding { V04, /// v054 encoding (TracerPayloadV04). V05, - /// v0.7 encoding (TracerPayload). - V07, } #[derive(Debug, Clone)] @@ -228,11 +226,6 @@ pub fn decode_to_trace_chunks( let (data, size) = match encoding_type { TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - _ => unimplemented!( - "Encodings other than TraceEncoding::V04 and TraceEncoding::V05 not implemented yet." - ), } .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; From 5db3bf2337928510960684204425a9b117a686f3 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 15 Apr 2025 16:33:32 +0200 Subject: [PATCH 3/3] Make TraceChunk generic on the span text for easier migration --- data-pipeline/src/trace_exporter/mod.rs | 7 +++--- trace-utils/src/span/mod.rs | 11 +++++++-- trace-utils/src/span/v05/dict.rs | 32 +++++++++++++------------ trace-utils/src/span/v05/mod.rs | 11 +++++---- trace-utils/src/test_utils/mod.rs | 5 ++-- trace-utils/src/trace_utils.rs | 10 ++++---- trace-utils/src/tracer_payload.rs | 16 ++++++------- 7 files changed, 51 insertions(+), 41 deletions(-) diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index c8bb94a8cd..bf2138f7e4 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -624,10 +624,9 @@ impl TraceExporter { unreachable!("Codepath invalid for proxy mode",) } }; - let payload: tracer_payload::TraceChunks = - trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?; + let payload = trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + })?; let chunks = payload.size(); let endpoint = Endpoint { diff --git a/trace-utils/src/span/mod.rs b/trace-utils/src/span/mod.rs index 3e8f144a53..0a629be1de 100644 --- a/trace-utils/src/span/mod.rs +++ b/trace-utils/src/span/mod.rs @@ -12,6 +12,9 @@ use std::fmt; use std::hash::Hash; use std::str::FromStr; use tinybytes::{Bytes, BytesString}; +use v05::dict::SharedDict; + +use crate::tracer_payload::TraceChunks; #[derive(Debug, PartialEq)] pub enum SpanKey { @@ -60,9 +63,9 @@ impl FromStr for SpanKey { /// Trait representing the requirements for a type to be used as a Span "string" type. /// Note: Borrow is not required by the derived traits, but allows to access HashMap elements /// from a static str and check if the string is empty. -pub trait SpanText: Eq + Hash + Borrow + Serialize {} +pub trait SpanText: Eq + Hash + Borrow + Serialize + Default + Clone {} /// Implement the SpanText trait for any type which satisfies the sub traits. -impl + Serialize> SpanText for T {} +impl + Serialize + Default + Clone> SpanText for T {} /// Checks if the `value` represents an empty string. Used to skip serializing empty strings /// with serde. @@ -259,6 +262,10 @@ pub type SpanEventSlice<'a> = SpanEvent<&'a str>; pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>; pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>; +pub type TraceChunksBytes = TraceChunks; + +pub type SharedDictBytes = SharedDict; + impl SpanSlice<'_> { /// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal /// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is diff --git a/trace-utils/src/span/v05/dict.rs b/trace-utils/src/span/v05/dict.rs index 120345a7c0..d4c6265f5d 100644 --- a/trace-utils/src/span/v05/dict.rs +++ b/trace-utils/src/span/v05/dict.rs @@ -2,58 +2,60 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashMap; -use tinybytes::{Bytes, BytesString}; + +use crate::span::SpanText; /// This struct represents the shared dictionary used for interning all the strings belonging to a /// v05 trace chunk. -pub struct SharedDict { +pub struct SharedDict { /// Map strings with their index (O(1) retrieval complexity). - string_map: HashMap, + string_map: HashMap, /// Since the collection needs to be ordered an additional vector to keep the insertion order. - dict: Vec, + dict: Vec, } -impl SharedDict { +impl SharedDict { /// Gets the index of the interned string. If the string is not part of the dictionary it is /// added and its corresponding index returned. /// /// # Arguments: /// /// * `str`: string to look up in the dictionary. - pub fn get_or_insert(&mut self, str: &BytesString) -> Result { - if let Some(index) = self.string_map.get(str) { + pub fn get_or_insert(&mut self, s: &T) -> Result { + if let Some(index) = self.string_map.get(s.borrow()) { (*index).try_into() } else { let index = self.dict.len(); - self.dict.push(str.clone()); - self.string_map.insert(str.clone(), index); + self.dict.push(s.clone()); + self.string_map.insert(s.clone(), index); index.try_into() } } /// Returns the dictionary. This method consumes the structure. - pub fn dict(mut self) -> Vec { + pub fn dict(mut self) -> Vec { std::mem::take(&mut self.dict) } } -impl Default for SharedDict { +impl Default for SharedDict { fn default() -> Self { - let empty_str = unsafe { BytesString::from_bytes_unchecked(Bytes::from_static(b"")) }; Self { - string_map: HashMap::from([(empty_str.clone(), 0)]), - dict: vec![empty_str.clone()], + string_map: HashMap::from([(T::default(), 0)]), + dict: vec![T::default()], } } } #[cfg(test)] mod tests { + use tinybytes::{Bytes, BytesString}; + use super::*; #[test] fn default_test() { - let dict = SharedDict::default(); + let dict: SharedDict = SharedDict::default(); assert_eq!(dict.string_map.len(), 1); assert_eq!(dict.dict.len(), 1); diff --git a/trace-utils/src/span/v05/mod.rs b/trace-utils/src/span/v05/mod.rs index 042bc4c354..61a179dfde 100644 --- a/trace-utils/src/span/v05/mod.rs +++ b/trace-utils/src/span/v05/mod.rs @@ -3,8 +3,7 @@ pub mod dict; -use crate::span::v05::dict::SharedDict; -use crate::span::SpanBytes; +use crate::span::{v05::dict::SharedDict, SpanText}; use anyhow::Result; use serde::Serialize; use std::collections::HashMap; @@ -29,7 +28,10 @@ pub struct Span { pub r#type: u32, } -pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result { +pub fn from_span( + span: &crate::span::Span, + dict: &mut SharedDict, +) -> Result { Ok(Span { service: dict.get_or_insert(&span.service)?, name: dict.get_or_insert(&span.name)?, @@ -61,6 +63,7 @@ pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result #[cfg(test)] mod tests { use super::*; + use crate::span::SpanBytes; use tinybytes::BytesString; #[test] @@ -87,7 +90,7 @@ mod tests { }; let mut dict = SharedDict::default(); - let v05_span = from_span_bytes(&span, &mut dict).unwrap(); + let v05_span = from_span(&span, &mut dict).unwrap(); let dict = dict.dict(); diff --git a/trace-utils/src/test_utils/mod.rs b/trace-utils/src/test_utils/mod.rs index d52f63fb9c..e7a2f9ea6c 100644 --- a/trace-utils/src/test_utils/mod.rs +++ b/trace-utils/src/test_utils/mod.rs @@ -15,9 +15,8 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; -use crate::span::v05; -use crate::span::v05::dict::SharedDict; use crate::span::SpanBytes; +use crate::span::{v05, SharedDictBytes}; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use datadog_trace_protobuf::pb; @@ -230,7 +229,7 @@ pub fn create_test_v05_span( parent_id: u64, start: i64, is_top_level: bool, - dict: &mut SharedDict, + dict: &mut SharedDictBytes, metrics: Option>, ) -> v05::Span { let mut meta = HashMap::from([ diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index 0083f4fa88..ec7684e3ea 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -3,8 +3,8 @@ pub use crate::send_data::send_data_result::SendDataResult; pub use crate::send_data::SendData; -use crate::span::v05; use crate::span::v05::dict::SharedDict; +use crate::span::{v05, SpanText}; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use crate::tracer_payload::{self, TraceChunks}; @@ -597,10 +597,10 @@ macro_rules! parse_root_span_tags { } } -pub fn collect_trace_chunks( - traces: Vec>, +pub fn collect_trace_chunks( + traces: Vec>>, use_v05_format: bool, -) -> anyhow::Result { +) -> anyhow::Result> { if use_v05_format { let mut shared_dict = SharedDict::default(); let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); @@ -608,7 +608,7 @@ pub fn collect_trace_chunks( let v05_trace = trace.iter().try_fold( Vec::with_capacity(trace.len()), |mut acc, span| -> anyhow::Result> { - acc.push(v05::from_span_bytes(span, &mut shared_dict)?); + acc.push(v05::from_span(span, &mut shared_dict)?); Ok(acc) }, )?; diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 9b97085c76..7bf6b91335 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,13 +1,13 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{v05, SpanBytes}; +use crate::span::{v05, Span, SpanBytes, SpanText}; use crate::trace_utils::collect_trace_chunks; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; use datadog_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; -use tinybytes; +use tinybytes::{self, BytesString}; pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; @@ -22,14 +22,14 @@ pub enum TraceEncoding { } #[derive(Debug, Clone)] -pub enum TraceChunks { +pub enum TraceChunks { /// Collection of TraceChunkSpan. - V04(Vec>), + V04(Vec>>), /// Collection of TraceChunkSpan with de-duplicated strings. - V05((Vec, Vec>)), + V05((Vec, Vec>)), } -impl TraceChunks { +impl TraceChunks { pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), @@ -38,7 +38,7 @@ impl TraceChunks { } } -impl TraceChunks { +impl TraceChunks { /// Returns the number of traces in the chunk pub fn size(&self) -> usize { match self { @@ -222,7 +222,7 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { pub fn decode_to_trace_chunks( data: tinybytes::Bytes, encoding_type: TraceEncoding, -) -> Result<(TraceChunks, usize), anyhow::Error> { +) -> Result<(TraceChunks, usize), anyhow::Error> { let (data, size) = match encoding_type { TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),