diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index f5f5c6a8b7..bf2138f7e4 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -13,8 +13,9 @@ use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError}; use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError}; +use datadog_trace_utils::span::SpanBytes; use datadog_trace_utils::trace_utils::{self, TracerHeaderTags}; -use datadog_trace_utils::tracer_payload::{self, TraceCollection}; +use datadog_trace_utils::tracer_payload; use ddcommon::header::{ APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR, }; @@ -230,11 +231,11 @@ impl TraceExporter { match self.input_format { TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count), TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) { - Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), + Ok((traces, _)) => self.send_deser_ser(traces), Err(e) => Err(TraceExporterError::Deserialization(e)), }, TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) { - Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), + Ok((traces, _)) => self.send_deser_ser(traces), Err(e) => Err(TraceExporterError::Deserialization(e)), }, } @@ -559,7 +560,7 @@ impl TraceExporter { /// Add all spans from the given iterator into the stats concentrator /// # Panic /// Will panic if another thread panicked will holding the lock on `stats_concentrator` - fn add_spans_to_stats(&self, collection: &TraceCollection) { + fn add_spans_to_stats(&self, traces: &[Vec]) { if let StatsComputationStatus::Enabled { stats_concentrator, cancellation_token: _, @@ -569,25 +570,19 @@ impl TraceExporter { #[allow(clippy::unwrap_used)] let mut stats_concentrator = stats_concentrator.lock().unwrap(); - match collection { - TraceCollection::TraceChunk(traces) => { - let spans = traces.iter().flat_map(|trace| trace.iter()); - for span in spans { - stats_concentrator.add_span(span); - } - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - TraceCollection::V07(_) => unreachable!(), + let spans = traces.iter().flat_map(|trace| trace.iter()); + for span in spans { + stats_concentrator.add_span(span); } } } fn send_deser_ser( &self, - mut collection: TraceCollection, + mut traces: Vec>, ) -> Result { self.emit_metric( - HealthMetric::Count(health_metrics::STAT_DESER_TRACES, collection.len() as i64), + HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64), None, ); @@ -596,12 +591,17 @@ impl TraceExporter { // Stats computation if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() { if !self.client_computed_top_level { - collection.set_top_level_spans(); + for chunk in traces.iter_mut() { + datadog_trace_utils::span::trace_utils::compute_top_level_span(chunk); + } } - self.add_spans_to_stats(&collection); + self.add_spans_to_stats(&traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let (dropped_p0_traces, dropped_p0_spans) = collection.drop_chunks(); + let datadog_trace_utils::span::trace_utils::DroppedP0Stats { + dropped_p0_traces, + dropped_p0_spans, + } = datadog_trace_utils::span::trace_utils::drop_chunks(&mut traces); // Update the headers to indicate that stats have been computed and forward dropped // traces counts @@ -611,49 +611,22 @@ impl TraceExporter { header_tags.dropped_p0_spans = dropped_p0_spans; } - let payload = match self.input_format { - TraceExporterInputFormat::V04 => match self.output_format { - TraceExporterOutputFormat::V04 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - false, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, - TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - true, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, - }, - TraceExporterInputFormat::V05 => match self.output_format { - TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks( - collection, - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - true, - ) - .map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - })?, + let use_v05_format = match (self.input_format, self.output_format) { + (TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04) => false, + (TraceExporterInputFormat::V04, TraceExporterOutputFormat::V05) + | (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05) => true, + (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V04) => { // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!( - "Conversion from v05 to {:?} not implemented", - self.output_format - ), - }, - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!("Input format not implemented"), + unreachable!("Conversion from v05 to v04 not implemented") + } + (TraceExporterInputFormat::Proxy, _) => { + // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). + unreachable!("Codepath invalid for proxy mode",) + } }; + let payload = trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + })?; let chunks = payload.size(); let endpoint = Endpoint { @@ -667,14 +640,12 @@ impl TraceExporter { let strategy = RetryStrategy::default(); let mp_payload = match &payload { - tracer_payload::TracerPayloadCollection::V04(p) => { + tracer_payload::TraceChunks::V04(p) => { rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)? } - tracer_payload::TracerPayloadCollection::V05(p) => { + tracer_payload::TraceChunks::V05(p) => { rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)? } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - _ => unreachable!("Serialization for v07 not implemented"), }; let payload_len = mp_payload.len(); diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 988aa7f1ab..39426be09d 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -16,7 +16,7 @@ use datadog_ipc::tarpc; use datadog_ipc::tarpc::context::Context; use datadog_ipc::transport::Transport; use datadog_trace_utils::trace_utils::SendData; -use datadog_trace_utils::tracer_payload; +use datadog_trace_utils::tracer_payload::decode_to_trace_chunks; use datadog_trace_utils::tracer_payload::TraceEncoding; use ddcommon::{Endpoint, MutexExt}; use ddtelemetry::worker::{ @@ -275,20 +275,15 @@ impl SidecarServer { headers ); - let mut size = 0; - let mut processor = tracer_payload::DefaultTraceChunkProcessor; - let mut payload_params = tracer_payload::TracerPayloadParams::new( - data, - &headers, - &mut processor, - target.api_key.is_some(), - TraceEncoding::V04, - ); - payload_params.measure_size(&mut size); - match payload_params.try_into() { - Ok(payload) => { + match decode_to_trace_chunks(data, TraceEncoding::V04) { + Ok((payload, size)) => { trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}"); - let data = SendData::new(size, payload, headers, target); + let data = SendData::new( + size, + payload.into_tracer_payload_collection(), + headers, + target, + ); self.trace_flusher.enqueue(data); } Err(e) => { diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 232be1e1f0..24a8c86ba4 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -13,7 +13,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::trace_utils::{EnvironmentType, SendData}; -use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceCollection}; +use datadog_trace_utils::tracer_payload::TraceChunkProcessor; use crate::{ config::Config, @@ -96,15 +96,14 @@ impl TraceProcessor for ServerlessTraceProcessor { } }; - let payload = match trace_utils::collect_trace_chunks( - TraceCollection::V07(traces), + let payload = match trace_utils::collect_pb_trace_chunks( + traces, &tracer_header_tags, &mut ChunkProcessor { config: config.clone(), mini_agent_metadata: mini_agent_metadata.clone(), }, true, // In mini agent, we always send agentless - false, ) { Ok(res) => res, Err(err) => { diff --git a/trace-utils/benches/deserialization.rs b/trace-utils/benches/deserialization.rs index 91477d95d7..85d9ef5578 100644 --- a/trace-utils/benches/deserialization.rs +++ b/trace-utils/benches/deserialization.rs @@ -2,10 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use criterion::{black_box, criterion_group, Criterion}; -use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; -use datadog_trace_utils::tracer_payload::{ - DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams, -}; +use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; use serde_json::{json, Value}; fn generate_spans(num_spans: usize, trace_id: u64) -> Vec { @@ -65,7 +62,6 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) { let data = rmp_serde::to_vec(&generate_trace_chunks(20, 2_075)) .expect("Failed to serialize test spans."); let data_as_bytes = tinybytes::Bytes::copy_from_slice(&data); - let tracer_header_tags = &TracerHeaderTags::default(); c.bench_function( "benching deserializing traces from msgpack to their internal representation ", @@ -73,16 +69,8 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) { b.iter_batched( || data_as_bytes.clone(), |data_as_bytes| { - let result: anyhow::Result = black_box( - TracerPayloadParams::new( - data_as_bytes, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(), - ); + let result = + black_box(decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04)); assert!(result.is_ok()); // Return the result to avoid measuring the deallocation time result diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 5687385b2c..4567c28ea4 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -17,7 +17,8 @@ use crate::span::{SpanBytes, SpanSlice}; /// /// # Returns /// -/// * `Ok(Vec)` - A vector of decoded `TracerPayloadV04` objects if successful. +/// * `Ok(Vec, usize)` - A vector of decoded `Vec` objects if +/// successful. and the number of bytes in the slice used by the decoder. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors @@ -76,7 +77,8 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize) /// /// # Returns /// -/// * `Ok(Vec)` - A vector of decoded `Vec` objects if successful. +/// * `Ok(Vec, usize)` - A vector of decoded `Vec` objects if +/// successful. and the number of bytes in the slice used by the decoder. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors diff --git a/trace-utils/src/span/mod.rs b/trace-utils/src/span/mod.rs index 3e8f144a53..0a629be1de 100644 --- a/trace-utils/src/span/mod.rs +++ b/trace-utils/src/span/mod.rs @@ -12,6 +12,9 @@ use std::fmt; use std::hash::Hash; use std::str::FromStr; use tinybytes::{Bytes, BytesString}; +use v05::dict::SharedDict; + +use crate::tracer_payload::TraceChunks; #[derive(Debug, PartialEq)] pub enum SpanKey { @@ -60,9 +63,9 @@ impl FromStr for SpanKey { /// Trait representing the requirements for a type to be used as a Span "string" type. /// Note: Borrow is not required by the derived traits, but allows to access HashMap elements /// from a static str and check if the string is empty. -pub trait SpanText: Eq + Hash + Borrow + Serialize {} +pub trait SpanText: Eq + Hash + Borrow + Serialize + Default + Clone {} /// Implement the SpanText trait for any type which satisfies the sub traits. -impl + Serialize> SpanText for T {} +impl + Serialize + Default + Clone> SpanText for T {} /// Checks if the `value` represents an empty string. Used to skip serializing empty strings /// with serde. @@ -259,6 +262,10 @@ pub type SpanEventSlice<'a> = SpanEvent<&'a str>; pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>; pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>; +pub type TraceChunksBytes = TraceChunks; + +pub type SharedDictBytes = SharedDict; + impl SpanSlice<'_> { /// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal /// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is diff --git a/trace-utils/src/span/trace_utils.rs b/trace-utils/src/span/trace_utils.rs index 801c6f0b2a..3f534e3186 100644 --- a/trace-utils/src/span/trace_utils.rs +++ b/trace-utils/src/span/trace_utils.rs @@ -83,6 +83,76 @@ pub fn is_partial_snapshot(span: &Span) -> bool { .is_some_and(|v| *v >= 0.0) } +pub struct DroppedP0Stats { + pub dropped_p0_traces: usize, + pub dropped_p0_spans: usize, +} + +// Keys used for sampling +const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; +const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism"; +const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; + +/// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by +/// the agent. +/// +/// # Returns +/// +/// A tuple containing the dropped p0 stats, the first value correspond the amount of traces +/// dropped and the latter to the spans dropped. +pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats +where + T: SpanText + Default, +{ + let mut dropped_p0_traces = 0; + let mut dropped_p0_spans = 0; + + traces.retain_mut(|chunk| { + // List of spans to keep even if the chunk is dropped + let mut sampled_indexes = Vec::new(); + for (index, span) in chunk.iter().enumerate() { + // ErrorSampler + if span.error == 1 { + // We send chunks containing an error + return true; + } + // PrioritySampler and NoPrioritySampler + let priority = span.metrics.get(SAMPLING_PRIORITY_KEY); + if has_top_level(span) && (priority.is_none() || priority.is_some_and(|p| *p > 0.0)) { + // We send chunks with positive priority or no priority + return true; + } + // SingleSpanSampler and AnalyzedSpansSampler + else if span + .metrics + .get(SAMPLING_SINGLE_SPAN_MECHANISM) + .is_some_and(|m| *m == 8.0) + || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY) + { + // We send spans sampled by single-span sampling or analyzed spans + sampled_indexes.push(index); + } + } + dropped_p0_spans += chunk.len() - sampled_indexes.len(); + if sampled_indexes.is_empty() { + // If no spans were sampled we can drop the whole chunk + dropped_p0_traces += 1; + return false; + } + let sampled_spans = sampled_indexes + .iter() + .map(|i| std::mem::take(&mut chunk[*i])) + .collect(); + *chunk = sampled_spans; + true + }); + + DroppedP0Stats { + dropped_p0_traces, + dropped_p0_spans, + } +} + #[cfg(test)] mod tests { use super::*; @@ -176,4 +246,118 @@ mod tests { .collect(); assert_eq!(spans_marked_as_top_level, [1, 4, 5]) } + + #[test] + fn test_drop_chunks() { + let chunk_with_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 1.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_null_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_without_priority = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_error = vec![ + SpanBytes { + span_id: 1, + error: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_a_single_span = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]), + ..Default::default() + }, + ]; + let chunk_with_analyzed_span = vec![ + SpanBytes { + span_id: 1, + metrics: HashMap::from([ + (SAMPLING_PRIORITY_KEY.into(), 0.0), + (TRACER_TOP_LEVEL_KEY.into(), 1.0), + ]), + ..Default::default() + }, + SpanBytes { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]), + ..Default::default() + }, + ]; + + let chunks_and_expected_sampled_spans = vec![ + (chunk_with_priority, 2), + (chunk_with_null_priority, 0), + (chunk_without_priority, 2), + (chunk_with_error, 2), + (chunk_with_a_single_span, 1), + (chunk_with_analyzed_span, 1), + ]; + + for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() { + let mut traces = vec![chunk]; + drop_chunks(&mut traces); + + if expected_count == 0 { + assert!(traces.is_empty()); + } else { + assert_eq!(traces[0].len(), expected_count); + } + } + } } diff --git a/trace-utils/src/span/v05/dict.rs b/trace-utils/src/span/v05/dict.rs index 120345a7c0..d4c6265f5d 100644 --- a/trace-utils/src/span/v05/dict.rs +++ b/trace-utils/src/span/v05/dict.rs @@ -2,58 +2,60 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashMap; -use tinybytes::{Bytes, BytesString}; + +use crate::span::SpanText; /// This struct represents the shared dictionary used for interning all the strings belonging to a /// v05 trace chunk. -pub struct SharedDict { +pub struct SharedDict { /// Map strings with their index (O(1) retrieval complexity). - string_map: HashMap, + string_map: HashMap, /// Since the collection needs to be ordered an additional vector to keep the insertion order. - dict: Vec, + dict: Vec, } -impl SharedDict { +impl SharedDict { /// Gets the index of the interned string. If the string is not part of the dictionary it is /// added and its corresponding index returned. /// /// # Arguments: /// /// * `str`: string to look up in the dictionary. - pub fn get_or_insert(&mut self, str: &BytesString) -> Result { - if let Some(index) = self.string_map.get(str) { + pub fn get_or_insert(&mut self, s: &T) -> Result { + if let Some(index) = self.string_map.get(s.borrow()) { (*index).try_into() } else { let index = self.dict.len(); - self.dict.push(str.clone()); - self.string_map.insert(str.clone(), index); + self.dict.push(s.clone()); + self.string_map.insert(s.clone(), index); index.try_into() } } /// Returns the dictionary. This method consumes the structure. - pub fn dict(mut self) -> Vec { + pub fn dict(mut self) -> Vec { std::mem::take(&mut self.dict) } } -impl Default for SharedDict { +impl Default for SharedDict { fn default() -> Self { - let empty_str = unsafe { BytesString::from_bytes_unchecked(Bytes::from_static(b"")) }; Self { - string_map: HashMap::from([(empty_str.clone(), 0)]), - dict: vec![empty_str.clone()], + string_map: HashMap::from([(T::default(), 0)]), + dict: vec![T::default()], } } } #[cfg(test)] mod tests { + use tinybytes::{Bytes, BytesString}; + use super::*; #[test] fn default_test() { - let dict = SharedDict::default(); + let dict: SharedDict = SharedDict::default(); assert_eq!(dict.string_map.len(), 1); assert_eq!(dict.dict.len(), 1); diff --git a/trace-utils/src/span/v05/mod.rs b/trace-utils/src/span/v05/mod.rs index 042bc4c354..61a179dfde 100644 --- a/trace-utils/src/span/v05/mod.rs +++ b/trace-utils/src/span/v05/mod.rs @@ -3,8 +3,7 @@ pub mod dict; -use crate::span::v05::dict::SharedDict; -use crate::span::SpanBytes; +use crate::span::{v05::dict::SharedDict, SpanText}; use anyhow::Result; use serde::Serialize; use std::collections::HashMap; @@ -29,7 +28,10 @@ pub struct Span { pub r#type: u32, } -pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result { +pub fn from_span( + span: &crate::span::Span, + dict: &mut SharedDict, +) -> Result { Ok(Span { service: dict.get_or_insert(&span.service)?, name: dict.get_or_insert(&span.name)?, @@ -61,6 +63,7 @@ pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result #[cfg(test)] mod tests { use super::*; + use crate::span::SpanBytes; use tinybytes::BytesString; #[test] @@ -87,7 +90,7 @@ mod tests { }; let mut dict = SharedDict::default(); - let v05_span = from_span_bytes(&span, &mut dict).unwrap(); + let v05_span = from_span(&span, &mut dict).unwrap(); let dict = dict.dict(); diff --git a/trace-utils/src/test_utils/mod.rs b/trace-utils/src/test_utils/mod.rs index d52f63fb9c..e7a2f9ea6c 100644 --- a/trace-utils/src/test_utils/mod.rs +++ b/trace-utils/src/test_utils/mod.rs @@ -15,9 +15,8 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; -use crate::span::v05; -use crate::span::v05::dict::SharedDict; use crate::span::SpanBytes; +use crate::span::{v05, SharedDictBytes}; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use datadog_trace_protobuf::pb; @@ -230,7 +229,7 @@ pub fn create_test_v05_span( parent_id: u64, start: i64, is_top_level: bool, - dict: &mut SharedDict, + dict: &mut SharedDictBytes, metrics: Option>, ) -> v05::Span { let mut meta = HashMap::from([ diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index af5ee44d53..ec7684e3ea 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -3,11 +3,11 @@ pub use crate::send_data::send_data_result::SendDataResult; pub use crate::send_data::SendData; -use crate::span::v05; use crate::span::v05::dict::SharedDict; +use crate::span::{v05, SpanText}; pub use crate::tracer_header_tags::TracerHeaderTags; -use crate::tracer_payload; -use crate::tracer_payload::{TraceCollection, TracerPayloadCollection}; +use crate::tracer_payload::TracerPayloadCollection; +use crate::tracer_payload::{self, TraceChunks}; use anyhow::anyhow; use bytes::buf::Reader; use datadog_trace_normalization::normalizer; @@ -597,100 +597,96 @@ macro_rules! parse_root_span_tags { } } -pub fn collect_trace_chunks( - traces: TraceCollection, +pub fn collect_trace_chunks( + traces: Vec>>, + use_v05_format: bool, +) -> anyhow::Result> { + if use_v05_format { + let mut shared_dict = SharedDict::default(); + let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); + for trace in traces { + let v05_trace = trace.iter().try_fold( + Vec::with_capacity(trace.len()), + |mut acc, span| -> anyhow::Result> { + acc.push(v05::from_span(span, &mut shared_dict)?); + Ok(acc) + }, + )?; + + v05_traces.push(v05_trace); + } + Ok(TraceChunks::V05((shared_dict.dict(), v05_traces))) + } else { + Ok(TraceChunks::V04(traces)) + } +} + +pub fn collect_pb_trace_chunks( + mut traces: Vec>, tracer_header_tags: &TracerHeaderTags, process_chunk: &mut T, is_agentless: bool, - use_v05_format: bool, ) -> anyhow::Result { - match traces { - TraceCollection::TraceChunk(traces) => { - if use_v05_format { - let mut shared_dict = SharedDict::default(); - let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); - for trace in traces { - let v05_trace = trace.iter().try_fold( - Vec::with_capacity(trace.len()), - |mut acc, span| -> anyhow::Result> { - acc.push(v05::from_span_bytes(span, &mut shared_dict)?); - Ok(acc) - }, - )?; - - v05_traces.push(v05_trace); - } - Ok(TracerPayloadCollection::V05(( - shared_dict.dict(), - v05_traces, - ))) - } else { - Ok(TracerPayloadCollection::V04(traces)) - } - } - TraceCollection::V07(mut traces) => { - let mut trace_chunks: Vec = Vec::new(); + let mut trace_chunks: Vec = Vec::new(); - // We'll skip setting the global metadata and rely on the agent to unpack these - let mut gathered_root_span_tags = !is_agentless; - let mut root_span_tags = RootSpanTags::default(); + // We'll skip setting the global metadata and rely on the agent to unpack these + let mut gathered_root_span_tags = !is_agentless; + let mut root_span_tags = RootSpanTags::default(); - for trace in traces.iter_mut() { - if is_agentless { - if let Err(e) = normalizer::normalize_trace(trace) { - error!("Error normalizing trace: {e}"); - } - } + for trace in traces.iter_mut() { + if is_agentless { + if let Err(e) = normalizer::normalize_trace(trace) { + error!("Error normalizing trace: {e}"); + } + } - let mut chunk = construct_trace_chunk(trace.to_vec()); + let mut chunk = construct_trace_chunk(trace.to_vec()); - let root_span_index = match get_root_span_index(trace) { - Ok(res) => res, - Err(e) => { - error!("Error getting the root span index of a trace, skipping. {e}"); - continue; - } - }; + let root_span_index = match get_root_span_index(trace) { + Ok(res) => res, + Err(e) => { + error!("Error getting the root span index of a trace, skipping. {e}"); + continue; + } + }; - if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { - error!("Error normalizing trace chunk: {e}"); - } + if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { + error!("Error normalizing trace chunk: {e}"); + } - for span in chunk.spans.iter_mut() { - // TODO: obfuscate & truncate spans - if tracer_header_tags.client_computed_top_level { - update_tracer_top_level(span); - } - } + for span in chunk.spans.iter_mut() { + // TODO: obfuscate & truncate spans + if tracer_header_tags.client_computed_top_level { + update_tracer_top_level(span); + } + } - if !tracer_header_tags.client_computed_top_level { - compute_top_level_span(&mut chunk.spans); - } + if !tracer_header_tags.client_computed_top_level { + compute_top_level_span(&mut chunk.spans); + } - process_chunk.process(&mut chunk, root_span_index); + process_chunk.process(&mut chunk, root_span_index); - trace_chunks.push(chunk); + trace_chunks.push(chunk); - if !gathered_root_span_tags { - gathered_root_span_tags = true; - let meta_map = &trace[root_span_index].meta; - parse_root_span_tags!( - meta_map, - { - "env" => root_span_tags.env, - "version" => root_span_tags.app_version, - "_dd.hostname" => root_span_tags.hostname, - "runtime-id" => root_span_tags.runtime_id, - } - ); + if !gathered_root_span_tags { + gathered_root_span_tags = true; + let meta_map = &trace[root_span_index].meta; + parse_root_span_tags!( + meta_map, + { + "env" => root_span_tags.env, + "version" => root_span_tags.app_version, + "_dd.hostname" => root_span_tags.hostname, + "runtime-id" => root_span_tags.runtime_id, } - } - - Ok(TracerPayloadCollection::V07(vec![ - construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags), - ])) + ); } } + + Ok(TracerPayloadCollection::V07(vec![ + construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags), + ])) } /// Returns true if a span should be measured (i.e., it should get trace metrics calculated). @@ -1126,17 +1122,10 @@ mod tests { fn test_collect_trace_chunks_v05() { let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - let collection = collect_trace_chunks( - TraceCollection::TraceChunk(vec![chunk]), - &TracerHeaderTags::default(), - &mut tracer_payload::DefaultTraceChunkProcessor, - false, - true, - ) - .unwrap(); + let collection = collect_trace_chunks(vec![chunk], true).unwrap(); let (dict, traces) = match collection { - TracerPayloadCollection::V05(payload) => payload, + TraceChunks::V05(payload) => payload, _ => panic!("Unexpected type"), }; diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 8274d07c81..7bf6b91335 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,26 +1,16 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{trace_utils, v05, SpanBytes}; -use crate::{ - msgpack_decoder, - trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, -}; +use crate::span::{v05, Span, SpanBytes, SpanText}; +use crate::trace_utils::collect_trace_chunks; +use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; use datadog_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; -use tinybytes; +use tinybytes::{self, BytesString}; -type DroppedP0Traces = usize; -type DroppedP0Spans = usize; pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; -pub type DroppedP0Stats = (DroppedP0Traces, DroppedP0Spans); - -// Keys used for sampling -const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; -const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism"; -const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; #[derive(Debug, Clone)] /// Enumerates the different encoding types. @@ -29,113 +19,32 @@ pub enum TraceEncoding { V04, /// v054 encoding (TracerPayloadV04). V05, - /// v0.7 encoding (TracerPayload). - V07, } -/// A collection of traces before they are turned into TraceChunks. -pub enum TraceCollection { - /// Collection of traces using protobuf representation of trace chunks. Used for V07 - /// implementation. - V07(Vec>), - /// Collection of traces using the SpanBytes representation of trace chunks. This - /// representation allows the use of ByteString for representing Strings in order to avoid - /// unnecessary cloning during the processing of the chunks. Used for V04 and V05 - /// implementations. - TraceChunk(Vec>), +#[derive(Debug, Clone)] +pub enum TraceChunks { + /// Collection of TraceChunkSpan. + V04(Vec>>), + /// Collection of TraceChunkSpan with de-duplicated strings. + V05((Vec, Vec>)), } -impl TraceCollection { - pub fn len(&self) -> usize { +impl TraceChunks { + pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection { match self { - TraceCollection::V07(traces) => traces.len(), - TraceCollection::TraceChunk(traces) => traces.len(), - } - } - - pub fn is_empty(&self) -> bool { - match self { - TraceCollection::V07(traces) => traces.is_empty(), - TraceCollection::TraceChunk(traces) => traces.is_empty(), - } - } - - pub fn set_top_level_spans(&mut self) { - match self { - TraceCollection::TraceChunk(traces) => { - for chunk in traces.iter_mut() { - trace_utils::compute_top_level_span(chunk); - } - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - TraceCollection::V07(_) => { - unimplemented!("set_top_level_spans not implemented for v07") - } + TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), + TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), } } +} - /// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by - /// the agent. - /// - /// # Returns - /// - /// A tuple containing the dropped p0 stats, the first value correspond the amount of traces - /// dropped and the latter to the spans dropped. - pub fn drop_chunks(&mut self) -> DroppedP0Stats { - let mut dropped_p0_traces = 0; - let mut dropped_p0_spans = 0; - +impl TraceChunks { + /// Returns the number of traces in the chunk + pub fn size(&self) -> usize { match self { - TraceCollection::TraceChunk(traces) => { - traces.retain_mut(|chunk| { - // List of spans to keep even if the chunk is dropped - let mut sampled_indexes = Vec::new(); - for (index, span) in chunk.iter().enumerate() { - // ErrorSampler - if span.error == 1 { - // We send chunks containing an error - return true; - } - // PrioritySampler and NoPrioritySampler - let priority = span.metrics.get(SAMPLING_PRIORITY_KEY); - if trace_utils::has_top_level(span) - && (priority.is_none() || priority.is_some_and(|p| *p > 0.0)) - { - // We send chunks with positive priority or no priority - return true; - } - // SingleSpanSampler and AnalyzedSpansSampler - else if span - .metrics - .get(SAMPLING_SINGLE_SPAN_MECHANISM) - .is_some_and(|m| *m == 8.0) - || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY) - { - // We send spans sampled by single-span sampling or analyzed spans - sampled_indexes.push(index); - } - } - dropped_p0_spans += chunk.len() - sampled_indexes.len(); - if sampled_indexes.is_empty() { - // If no spans were sampled we can drop the whole chunk - dropped_p0_traces += 1; - return false; - } - let sampled_spans = sampled_indexes - .iter() - .map(|i| std::mem::take(&mut chunk[*i])) - .collect(); - *chunk = sampled_spans; - true - }); - } - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - TraceCollection::V07(_) => unimplemented!("drop_chunks not implemented for v07"), + TraceChunks::V04(traces) => traces.len(), + TraceChunks::V05((_, traces)) => traces.len(), } - - (dropped_p0_traces, dropped_p0_spans) } } @@ -278,151 +187,52 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { // Default implementation does nothing. } } -/// Represents the parameters required to collect trace chunks from msgpack data. + +/// This method processes the msgpack data contained within `data` based on +/// the specified `encoding_type`, converting it into a collection of tracer payloads. +/// +/// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are +/// supported. +/// +/// # Returns +/// +/// A `Result` containing either the successfully converted `TraceChunks` and the length consummed +/// from the data or an error if the conversion fails. Possible errors include issues with +/// deserializing the msgpack data or if the data does not conform to the expected format. +/// +/// # Examples +/// +/// ```rust +/// use datadog_trace_protobuf::pb; +/// use datadog_trace_utils::trace_utils::TracerHeaderTags; +/// use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; +/// use std::convert::TryInto; +/// use tinybytes; +/// // This will likely be a &[u8] slice in practice. +/// let data: Vec = Vec::new(); +/// let data_as_bytes = tinybytes::Bytes::from(data); +/// let result = decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04) +/// .map(|(chunks, _size)| chunks.into_tracer_payload_collection()); /// -/// This struct encapsulates all the necessary parameters for converting msgpack data into -/// a `TracerPayloadCollection`. It is designed to work with the `TryInto` trait to facilitate -/// the conversion process, handling different encoding types and ensuring that all required -/// data is available for the conversion. -pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { - /// A tinybytes::Bytes slice containing the serialized msgpack data. +/// match result { +/// Ok(collection) => println!("Successfully converted to TracerPayloadCollection."), +/// Err(e) => println!("Failed to convert: {:?}", e), +/// } +/// ``` +pub fn decode_to_trace_chunks( data: tinybytes::Bytes, - /// Reference to `TracerHeaderTags` containing metadata for the trace. - tracer_header_tags: &'a TracerHeaderTags<'a>, - /// Amount of data consumed from buffer - size: Option<&'a mut usize>, - /// A mutable reference to an implementation of `TraceChunkProcessor` that processes each - /// `TraceChunk` after it is constructed but before it is added to the TracerPayloadCollection. - /// TraceChunks are only available for v07 traces. - chunk_processor: &'a mut T, - /// A boolean indicating whether the agent is running in an agentless mode. This is used to - /// determine what protocol trace chunks should be serialized into when being sent. - is_agentless: bool, - /// The encoding type of the trace data, determining how the data should be deserialized and - /// processed. encoding_type: TraceEncoding, -} - -impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> { - pub fn new( - data: tinybytes::Bytes, - tracer_header_tags: &'a TracerHeaderTags, - chunk_processor: &'a mut T, - is_agentless: bool, - encoding_type: TraceEncoding, - ) -> TracerPayloadParams<'a, T> { - TracerPayloadParams { - data, - tracer_header_tags, - size: None, - chunk_processor, - is_agentless, - encoding_type, - } - } - - pub fn measure_size(&mut self, size: &'a mut usize) { - self.size = Some(size); +) -> Result<(TraceChunks, usize), anyhow::Error> { + let (data, size) = match encoding_type { + TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), + TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), } -} -// TODO: APMSP-1282 - Implement TryInto for other encoding types. Supporting TraceChunkProcessor but -// not supporting v07 is a bit pointless for now. -impl<'a, T: TraceChunkProcessor + 'a> TryInto - for TracerPayloadParams<'a, T> -{ - type Error = anyhow::Error; - /// Attempts to convert `TracerPayloadParams` into a `TracerPayloadCollection`. - /// - /// This method processes the msgpack data contained within `TracerPayloadParams` based on - /// the specified `encoding_type`, converting it into a collection of tracer payloads. - /// The conversion process involves deserializing the msgpack data, applying any necessary - /// processing through `process_chunk`, and assembling the resulting data into - /// a `TracerPayloadCollection`. - /// - /// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are - /// supported. - /// - /// # Returns - /// - /// A `Result` containing either the successfully converted `TracerPayloadCollection` or - /// an error if the conversion fails. Possible errors include issues with deserializing the - /// msgpack data or if the data does not conform to the expected format. - /// - /// # Examples - /// - /// ```rust - /// use datadog_trace_protobuf::pb; - /// use datadog_trace_utils::trace_utils::TracerHeaderTags; - /// use datadog_trace_utils::tracer_payload::{ - /// DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams, - /// }; - /// use std::convert::TryInto; - /// use tinybytes; - /// // This will likely be a &[u8] slice in practice. - /// let data: Vec = Vec::new(); - /// let data_as_bytes = tinybytes::Bytes::from(data); - /// let tracer_header_tags = &TracerHeaderTags::default(); - /// let result: Result = TracerPayloadParams::new( - /// data_as_bytes, - /// tracer_header_tags, - /// &mut DefaultTraceChunkProcessor, - /// false, - /// TraceEncoding::V04, - /// ) - /// .try_into(); - /// - /// match result { - /// Ok(collection) => println!("Successfully converted to TracerPayloadCollection."), - /// Err(e) => println!("Failed to convert: {:?}", e), - /// } - /// ``` - fn try_into(self) -> Result { - match self.encoding_type { - TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::from_bytes(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; - - if let Some(size_ref) = self.size { - *size_ref = size; - } - - Ok(collect_trace_chunks( - TraceCollection::TraceChunk(traces), - self.tracer_header_tags, - self.chunk_processor, - self.is_agentless, - false, - )?) - }, - TraceEncoding::V05 => { - let (traces, size) = match msgpack_decoder::v05::from_bytes(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; - - if let Some(size_ref) = self.size { - *size_ref = size; - } + .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; - Ok(collect_trace_chunks( - TraceCollection::TraceChunk(traces), - self.tracer_header_tags, - self.chunk_processor, - self.is_agentless, - true, - )?) - }, - // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). - #[allow(clippy::unimplemented)] - _ => unimplemented!("Encodings other than TraceEncoding::V04 and TraceEncoding::V05 not implemented yet."), - } - } + Ok(( + collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?, + size, + )) } #[cfg(test)] @@ -435,8 +245,6 @@ mod tests { use std::collections::HashMap; use tinybytes::BytesString; - const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level"; - fn create_dummy_collection_v07() -> TracerPayloadCollection { TracerPayloadCollection::V07(vec![pb::TracerPayload { container_id: "".to_string(), @@ -588,23 +396,14 @@ mod tests { .expect("Failed to serialize test span."); let data = tinybytes::Bytes::from(data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let result: anyhow::Result = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(data, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); - assert_eq!(2, collection.size()); + let (chunks, _) = result.unwrap(); + assert_eq!(2, chunks.size()); - if let TracerPayloadCollection::V04(traces) = collection { + if let TraceChunks::V04(traces) = chunks { assert_eq!(expected_serialized_span_data1, traces[0]); assert_eq!(expected_serialized_span_data2, traces[1]); } else { @@ -618,20 +417,11 @@ mod tests { let empty_data = vec![0x90]; let data = tinybytes::Bytes::from(empty_data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let result: anyhow::Result = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(data, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); + let (collection, _) = result.unwrap(); assert_eq!(0, collection.size()); } @@ -641,144 +431,17 @@ mod tests { let expected = vec![dummy_trace.clone()]; let payload = rmp_serde::to_vec_named(&expected).unwrap(); let payload = tinybytes::Bytes::from(payload); - let tracer_header_tags = &TracerHeaderTags::default(); - let result: anyhow::Result = TracerPayloadParams::new( - payload, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into(); + let result = decode_to_trace_chunks(payload, TraceEncoding::V04); assert!(result.is_ok()); - let collection = result.unwrap(); + let (collection, _size) = result.unwrap(); assert_eq!(1, collection.size()); - if let TracerPayloadCollection::V04(traces) = collection { + if let TraceChunks::V04(traces) = collection { assert_eq!(dummy_trace, traces[0]); } else { panic!("Invalid collection type returned for try_into"); } } - - #[test] - fn test_drop_chunks() { - let chunk_with_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 1.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_null_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_without_priority = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_error = vec![ - SpanBytes { - span_id: 1, - error: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - ..Default::default() - }, - ]; - let chunk_with_a_single_span = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]), - ..Default::default() - }, - ]; - let chunk_with_analyzed_span = vec![ - SpanBytes { - span_id: 1, - metrics: HashMap::from([ - (SAMPLING_PRIORITY_KEY.into(), 0.0), - (TRACER_TOP_LEVEL_KEY.into(), 1.0), - ]), - ..Default::default() - }, - SpanBytes { - span_id: 2, - parent_id: 1, - metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]), - ..Default::default() - }, - ]; - - let chunks_and_expected_sampled_spans = vec![ - (chunk_with_priority, 2), - (chunk_with_null_priority, 0), - (chunk_without_priority, 2), - (chunk_with_error, 2), - (chunk_with_a_single_span, 1), - (chunk_with_analyzed_span, 1), - ]; - - for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() { - let mut collection = TraceCollection::TraceChunk(vec![chunk]); - collection.drop_chunks(); - - let traces = match collection { - TraceCollection::TraceChunk(t) => t, - _ => panic!("Collection must contain TraceChunkSpan"), - }; - - if expected_count == 0 { - assert!(traces.is_empty()); - } else { - assert_eq!(traces[0].len(), expected_count); - } - } - } } diff --git a/trace-utils/tests/test_send_data.rs b/trace-utils/tests/test_send_data.rs index 2209c90e70..777ed36586 100644 --- a/trace-utils/tests/test_send_data.rs +++ b/trace-utils/tests/test_send_data.rs @@ -7,9 +7,7 @@ mod tracing_integration_tests { use datadog_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; use datadog_trace_utils::test_utils::{create_test_json_span, create_test_no_alloc_span}; use datadog_trace_utils::trace_utils::TracerHeaderTags; - use datadog_trace_utils::tracer_payload::{ - DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadParams, - }; + use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding}; #[cfg(target_os = "linux")] use ddcommon::connector::uds::socket_path_to_uri; use ddcommon::Endpoint; @@ -103,19 +101,17 @@ mod tracing_integration_tests { let data = get_v04_trace_snapshot_test_payload("test_send_data_v04_snapshot"); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, _size) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + 300, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await; @@ -197,19 +193,17 @@ mod tracing_integration_tests { let data = tinybytes::Bytes::from(encoded_data); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, _) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + 300, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await; @@ -237,19 +231,16 @@ mod tracing_integration_tests { let empty_data = vec![0x90]; let data = tinybytes::Bytes::from(empty_data); - let tracer_header_tags = &TracerHeaderTags::default(); - - let payload_collection = TracerPayloadParams::new( - data, - tracer_header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); - let data = SendData::new(0, payload_collection, header_tags, &endpoint); + let (payload_collection, _) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + + let data = SendData::new( + 0, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let result = data.send().await; @@ -319,19 +310,17 @@ mod tracing_integration_tests { let data = get_v04_trace_snapshot_test_payload("test_send_data_v04_snapshot_uds"); - let payload_collection = TracerPayloadParams::new( - data, - &header_tags, - &mut DefaultTraceChunkProcessor, - false, - TraceEncoding::V04, - ) - .try_into() - .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); + let (payload_collection, size) = decode_to_trace_chunks(data, TraceEncoding::V04) + .expect("unable to convert TracerPayloadParams to TracerPayloadCollection"); test_agent.start_session(snapshot_name, None).await; - let data = SendData::new(300, payload_collection, header_tags, &endpoint); + let data = SendData::new( + size, + payload_collection.into_tracer_payload_collection(), + header_tags, + &endpoint, + ); let _result = data.send().await;