@@ -13,8 +13,9 @@ use arc_swap::{ArcSwap, ArcSwapOption};
1313use bytes:: Bytes ;
1414use datadog_trace_utils:: msgpack_decoder:: { self , decode:: error:: DecodeError } ;
1515use datadog_trace_utils:: send_with_retry:: { send_with_retry, RetryStrategy , SendWithRetryError } ;
16+ use datadog_trace_utils:: span:: SpanBytes ;
1617use datadog_trace_utils:: trace_utils:: { self , TracerHeaderTags } ;
17- use datadog_trace_utils:: tracer_payload:: { self , TraceCollection } ;
18+ use datadog_trace_utils:: tracer_payload:: { self } ;
1819use ddcommon:: header:: {
1920 APPLICATION_MSGPACK_STR , DATADOG_SEND_REAL_HTTP_STATUS_STR , DATADOG_TRACE_COUNT_STR ,
2021} ;
@@ -230,11 +231,11 @@ impl TraceExporter {
230231 match self . input_format {
231232 TraceExporterInputFormat :: Proxy => self . send_proxy ( data. as_ref ( ) , trace_count) ,
232233 TraceExporterInputFormat :: V04 => match msgpack_decoder:: v04:: from_bytes ( data) {
233- Ok ( ( traces, _) ) => self . send_deser_ser ( TraceCollection :: TraceChunk ( traces) ) ,
234+ Ok ( ( traces, _) ) => self . send_deser_ser ( traces) ,
234235 Err ( e) => Err ( TraceExporterError :: Deserialization ( e) ) ,
235236 } ,
236237 TraceExporterInputFormat :: V05 => match msgpack_decoder:: v05:: from_bytes ( data) {
237- Ok ( ( traces, _) ) => self . send_deser_ser ( TraceCollection :: TraceChunk ( traces) ) ,
238+ Ok ( ( traces, _) ) => self . send_deser_ser ( traces) ,
238239 Err ( e) => Err ( TraceExporterError :: Deserialization ( e) ) ,
239240 } ,
240241 }
@@ -559,7 +560,7 @@ impl TraceExporter {
559560 /// Add all spans from the given iterator into the stats concentrator
560561 /// # Panic
561562 /// Will panic if another thread panicked will holding the lock on `stats_concentrator`
562- fn add_spans_to_stats ( & self , collection : & TraceCollection ) {
563+ fn add_spans_to_stats ( & self , traces : & [ Vec < SpanBytes > ] ) {
563564 if let StatsComputationStatus :: Enabled {
564565 stats_concentrator,
565566 cancellation_token : _,
@@ -569,25 +570,19 @@ impl TraceExporter {
569570 #[ allow( clippy:: unwrap_used) ]
570571 let mut stats_concentrator = stats_concentrator. lock ( ) . unwrap ( ) ;
571572
572- match collection {
573- TraceCollection :: TraceChunk ( traces) => {
574- let spans = traces. iter ( ) . flat_map ( |trace| trace. iter ( ) ) ;
575- for span in spans {
576- stats_concentrator. add_span ( span) ;
577- }
578- }
579- // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
580- TraceCollection :: V07 ( _) => unreachable ! ( ) ,
573+ let spans = traces. iter ( ) . flat_map ( |trace| trace. iter ( ) ) ;
574+ for span in spans {
575+ stats_concentrator. add_span ( span) ;
581576 }
582577 }
583578 }
584579
585580 fn send_deser_ser (
586581 & self ,
587- mut collection : TraceCollection ,
582+ mut traces : Vec < Vec < SpanBytes > > ,
588583 ) -> Result < String , TraceExporterError > {
589584 self . emit_metric (
590- HealthMetric :: Count ( health_metrics:: STAT_DESER_TRACES , collection . len ( ) as i64 ) ,
585+ HealthMetric :: Count ( health_metrics:: STAT_DESER_TRACES , traces . len ( ) as i64 ) ,
591586 None ,
592587 ) ;
593588
@@ -596,12 +591,17 @@ impl TraceExporter {
596591 // Stats computation
597592 if let StatsComputationStatus :: Enabled { .. } = & * * self . client_side_stats . load ( ) {
598593 if !self . client_computed_top_level {
599- collection. set_top_level_spans ( ) ;
594+ for chunk in traces. iter_mut ( ) {
595+ datadog_trace_utils:: span:: trace_utils:: compute_top_level_span ( chunk) ;
596+ }
600597 }
601- self . add_spans_to_stats ( & collection ) ;
598+ self . add_spans_to_stats ( & traces ) ;
602599 // Once stats have been computed we can drop all chunks that are not going to be
603600 // sampled by the agent
604- let ( dropped_p0_traces, dropped_p0_spans) = collection. drop_chunks ( ) ;
601+ let datadog_trace_utils:: span:: trace_utils:: DroppedP0Stats {
602+ dropped_p0_traces,
603+ dropped_p0_spans,
604+ } = datadog_trace_utils:: span:: trace_utils:: drop_chunks ( & mut traces) ;
605605
606606 // Update the headers to indicate that stats have been computed and forward dropped
607607 // traces counts
@@ -611,49 +611,23 @@ impl TraceExporter {
611611 header_tags. dropped_p0_spans = dropped_p0_spans;
612612 }
613613
614- let payload = match self . input_format {
615- TraceExporterInputFormat :: V04 => match self . output_format {
616- TraceExporterOutputFormat :: V04 => trace_utils:: collect_trace_chunks (
617- collection,
618- & header_tags,
619- & mut tracer_payload:: DefaultTraceChunkProcessor ,
620- self . endpoint . api_key . is_some ( ) ,
621- false ,
622- )
623- . map_err ( |e| {
624- TraceExporterError :: Deserialization ( DecodeError :: InvalidFormat ( e. to_string ( ) ) )
625- } ) ?,
626- TraceExporterOutputFormat :: V05 => trace_utils:: collect_trace_chunks (
627- collection,
628- & header_tags,
629- & mut tracer_payload:: DefaultTraceChunkProcessor ,
630- self . endpoint . api_key . is_some ( ) ,
631- true ,
632- )
633- . map_err ( |e| {
634- TraceExporterError :: Deserialization ( DecodeError :: InvalidFormat ( e. to_string ( ) ) )
635- } ) ?,
636- } ,
637- TraceExporterInputFormat :: V05 => match self . output_format {
638- TraceExporterOutputFormat :: V05 => trace_utils:: collect_trace_chunks (
639- collection,
640- & header_tags,
641- & mut tracer_payload:: DefaultTraceChunkProcessor ,
642- self . endpoint . api_key . is_some ( ) ,
643- true ,
644- )
645- . map_err ( |e| {
646- TraceExporterError :: Deserialization ( DecodeError :: InvalidFormat ( e. to_string ( ) ) )
647- } ) ?,
614+ let use_v05_format = match ( self . input_format , self . output_format ) {
615+ ( TraceExporterInputFormat :: V04 , TraceExporterOutputFormat :: V04 ) => false ,
616+ ( TraceExporterInputFormat :: V04 , TraceExporterOutputFormat :: V05 )
617+ | ( TraceExporterInputFormat :: V05 , TraceExporterOutputFormat :: V05 ) => true ,
618+ ( TraceExporterInputFormat :: V05 , TraceExporterOutputFormat :: V04 ) => {
648619 // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
649- _ => unreachable ! (
650- "Conversion from v05 to {:?} not implemented" ,
651- self . output_format
652- ) ,
653- } ,
654- // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
655- _ => unreachable ! ( "Input format not implemented" ) ,
620+ unreachable ! ( "Conversion from v05 to v04 not implemented" )
621+ }
622+ ( TraceExporterInputFormat :: Proxy , _) => {
623+ // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
624+ unreachable ! ( "Codepath invalid for proxy mode" , )
625+ }
656626 } ;
627+ let payload: tracer_payload:: TraceChunks =
628+ trace_utils:: collect_trace_chunks ( traces, use_v05_format) . map_err ( |e| {
629+ TraceExporterError :: Deserialization ( DecodeError :: InvalidFormat ( e. to_string ( ) ) )
630+ } ) ?;
657631
658632 let chunks = payload. size ( ) ;
659633 let endpoint = Endpoint {
@@ -667,14 +641,12 @@ impl TraceExporter {
667641
668642 let strategy = RetryStrategy :: default ( ) ;
669643 let mp_payload = match & payload {
670- tracer_payload:: TracerPayloadCollection :: V04 ( p) => {
644+ tracer_payload:: TraceChunks :: V04 ( p) => {
671645 rmp_serde:: to_vec_named ( p) . map_err ( TraceExporterError :: Serialization ) ?
672646 }
673- tracer_payload:: TracerPayloadCollection :: V05 ( p) => {
647+ tracer_payload:: TraceChunks :: V05 ( p) => {
674648 rmp_serde:: to_vec ( p) . map_err ( TraceExporterError :: Serialization ) ?
675649 }
676- // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
677- _ => unreachable ! ( "Serialization for v07 not implemented" ) ,
678650 } ;
679651
680652 let payload_len = mp_payload. len ( ) ;
0 commit comments