Skip to content

Commit f6aae6b

Browse files
committed
refactor: split collect_chunks into two methods
This refactor is needed because the shared logic in collect_trace_chunks and TracerPayloadParams. The way these structs are created makes replacing ByteString with the slice harder due to shared lifetime. Furthermore, the enums encodes two different codepaths, the spanBytes and span pb which never interact with each other. So having function that handle both span bytes and pb spans is pure complexity overhead. This refactor also has the advnatage if removing a bunch of panics and lines of code that were here because of the "fake" pb spans and trace exporter spans overlap
1 parent 1ee2ca2 commit f6aae6b

File tree

9 files changed

+423
-635
lines changed

9 files changed

+423
-635
lines changed

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 35 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ use arc_swap::{ArcSwap, ArcSwapOption};
1313
use bytes::Bytes;
1414
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
1515
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError};
16+
use datadog_trace_utils::span::SpanBytes;
1617
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
17-
use datadog_trace_utils::tracer_payload::{self, TraceCollection};
18+
use datadog_trace_utils::tracer_payload;
1819
use ddcommon::header::{
1920
APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR,
2021
};
@@ -230,11 +231,11 @@ impl TraceExporter {
230231
match self.input_format {
231232
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
232233
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) {
233-
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
234+
Ok((traces, _)) => self.send_deser_ser(traces),
234235
Err(e) => Err(TraceExporterError::Deserialization(e)),
235236
},
236237
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) {
237-
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
238+
Ok((traces, _)) => self.send_deser_ser(traces),
238239
Err(e) => Err(TraceExporterError::Deserialization(e)),
239240
},
240241
}
@@ -559,7 +560,7 @@ impl TraceExporter {
559560
/// Add all spans from the given iterator into the stats concentrator
560561
/// # Panic
561562
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
562-
fn add_spans_to_stats(&self, collection: &TraceCollection) {
563+
fn add_spans_to_stats(&self, traces: &[Vec<SpanBytes>]) {
563564
if let StatsComputationStatus::Enabled {
564565
stats_concentrator,
565566
cancellation_token: _,
@@ -569,25 +570,19 @@ impl TraceExporter {
569570
#[allow(clippy::unwrap_used)]
570571
let mut stats_concentrator = stats_concentrator.lock().unwrap();
571572

572-
match collection {
573-
TraceCollection::TraceChunk(traces) => {
574-
let spans = traces.iter().flat_map(|trace| trace.iter());
575-
for span in spans {
576-
stats_concentrator.add_span(span);
577-
}
578-
}
579-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
580-
TraceCollection::V07(_) => unreachable!(),
573+
let spans = traces.iter().flat_map(|trace| trace.iter());
574+
for span in spans {
575+
stats_concentrator.add_span(span);
581576
}
582577
}
583578
}
584579

585580
fn send_deser_ser(
586581
&self,
587-
mut collection: TraceCollection,
582+
mut traces: Vec<Vec<SpanBytes>>,
588583
) -> Result<String, TraceExporterError> {
589584
self.emit_metric(
590-
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, collection.len() as i64),
585+
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64),
591586
None,
592587
);
593588

@@ -596,12 +591,17 @@ impl TraceExporter {
596591
// Stats computation
597592
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
598593
if !self.client_computed_top_level {
599-
collection.set_top_level_spans();
594+
for chunk in traces.iter_mut() {
595+
datadog_trace_utils::span::trace_utils::compute_top_level_span(chunk);
596+
}
600597
}
601-
self.add_spans_to_stats(&collection);
598+
self.add_spans_to_stats(&traces);
602599
// Once stats have been computed we can drop all chunks that are not going to be
603600
// sampled by the agent
604-
let (dropped_p0_traces, dropped_p0_spans) = collection.drop_chunks();
601+
let datadog_trace_utils::span::trace_utils::DroppedP0Stats {
602+
dropped_p0_traces,
603+
dropped_p0_spans,
604+
} = datadog_trace_utils::span::trace_utils::drop_chunks(&mut traces);
605605

606606
// Update the headers to indicate that stats have been computed and forward dropped
607607
// traces counts
@@ -611,49 +611,23 @@ impl TraceExporter {
611611
header_tags.dropped_p0_spans = dropped_p0_spans;
612612
}
613613

614-
let payload = match self.input_format {
615-
TraceExporterInputFormat::V04 => match self.output_format {
616-
TraceExporterOutputFormat::V04 => trace_utils::collect_trace_chunks(
617-
collection,
618-
&header_tags,
619-
&mut tracer_payload::DefaultTraceChunkProcessor,
620-
self.endpoint.api_key.is_some(),
621-
false,
622-
)
623-
.map_err(|e| {
624-
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
625-
})?,
626-
TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks(
627-
collection,
628-
&header_tags,
629-
&mut tracer_payload::DefaultTraceChunkProcessor,
630-
self.endpoint.api_key.is_some(),
631-
true,
632-
)
633-
.map_err(|e| {
634-
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
635-
})?,
636-
},
637-
TraceExporterInputFormat::V05 => match self.output_format {
638-
TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks(
639-
collection,
640-
&header_tags,
641-
&mut tracer_payload::DefaultTraceChunkProcessor,
642-
self.endpoint.api_key.is_some(),
643-
true,
644-
)
645-
.map_err(|e| {
646-
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
647-
})?,
614+
let use_v05_format = match (self.input_format, self.output_format) {
615+
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04) => false,
616+
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V05)
617+
| (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05) => true,
618+
(TraceExporterInputFormat::V05, TraceExporterOutputFormat::V04) => {
648619
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
649-
_ => unreachable!(
650-
"Conversion from v05 to {:?} not implemented",
651-
self.output_format
652-
),
653-
},
654-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
655-
_ => unreachable!("Input format not implemented"),
620+
unreachable!("Conversion from v05 to v04 not implemented")
621+
}
622+
(TraceExporterInputFormat::Proxy, _) => {
623+
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
624+
unreachable!("Codepath invalid for proxy mode",)
625+
}
656626
};
627+
let payload: tracer_payload::TraceChunks =
628+
trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
629+
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
630+
})?;
657631

658632
let chunks = payload.size();
659633
let endpoint = Endpoint {
@@ -667,14 +641,12 @@ impl TraceExporter {
667641

668642
let strategy = RetryStrategy::default();
669643
let mp_payload = match &payload {
670-
tracer_payload::TracerPayloadCollection::V04(p) => {
644+
tracer_payload::TraceChunks::V04(p) => {
671645
rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)?
672646
}
673-
tracer_payload::TracerPayloadCollection::V05(p) => {
647+
tracer_payload::TraceChunks::V05(p) => {
674648
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)?
675649
}
676-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
677-
_ => unreachable!("Serialization for v07 not implemented"),
678650
};
679651

680652
let payload_len = mp_payload.len();

sidecar/src/service/sidecar_server.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use datadog_ipc::tarpc;
1616
use datadog_ipc::tarpc::context::Context;
1717
use datadog_ipc::transport::Transport;
1818
use datadog_trace_utils::trace_utils::SendData;
19-
use datadog_trace_utils::tracer_payload;
19+
use datadog_trace_utils::tracer_payload::decode_to_trace_chunks;
2020
use datadog_trace_utils::tracer_payload::TraceEncoding;
2121
use ddcommon::{Endpoint, MutexExt};
2222
use ddtelemetry::worker::{
@@ -275,20 +275,15 @@ impl SidecarServer {
275275
headers
276276
);
277277

278-
let mut size = 0;
279-
let mut processor = tracer_payload::DefaultTraceChunkProcessor;
280-
let mut payload_params = tracer_payload::TracerPayloadParams::new(
281-
data,
282-
&headers,
283-
&mut processor,
284-
target.api_key.is_some(),
285-
TraceEncoding::V04,
286-
);
287-
payload_params.measure_size(&mut size);
288-
match payload_params.try_into() {
289-
Ok(payload) => {
278+
match decode_to_trace_chunks(data, TraceEncoding::V04) {
279+
Ok((payload, size)) => {
290280
trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}");
291-
let data = SendData::new(size, payload, headers, target);
281+
let data = SendData::new(
282+
size,
283+
payload.into_tracer_payload_collection(),
284+
headers,
285+
target,
286+
);
292287
self.trace_flusher.enqueue(data);
293288
}
294289
Err(e) => {

trace-mini-agent/src/trace_processor.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span;
1313
use datadog_trace_protobuf::pb;
1414
use datadog_trace_utils::trace_utils::{self};
1515
use datadog_trace_utils::trace_utils::{EnvironmentType, SendData};
16-
use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceCollection};
16+
use datadog_trace_utils::tracer_payload::TraceChunkProcessor;
1717

1818
use crate::{
1919
config::Config,
@@ -96,15 +96,14 @@ impl TraceProcessor for ServerlessTraceProcessor {
9696
}
9797
};
9898

99-
let payload = match trace_utils::collect_trace_chunks(
100-
TraceCollection::V07(traces),
99+
let payload = match trace_utils::collect_pb_trace_chunks(
100+
traces,
101101
&tracer_header_tags,
102102
&mut ChunkProcessor {
103103
config: config.clone(),
104104
mini_agent_metadata: mini_agent_metadata.clone(),
105105
},
106106
true, // In mini agent, we always send agentless
107-
false,
108107
) {
109108
Ok(res) => res,
110109
Err(err) => {

trace-utils/benches/deserialization.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use criterion::{black_box, criterion_group, Criterion};
5-
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
6-
use datadog_trace_utils::tracer_payload::{
7-
DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams,
8-
};
5+
use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding};
96
use serde_json::{json, Value};
107

118
fn generate_spans(num_spans: usize, trace_id: u64) -> Vec<Value> {
@@ -65,24 +62,15 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) {
6562
let data = rmp_serde::to_vec(&generate_trace_chunks(20, 2_075))
6663
.expect("Failed to serialize test spans.");
6764
let data_as_bytes = tinybytes::Bytes::copy_from_slice(&data);
68-
let tracer_header_tags = &TracerHeaderTags::default();
6965

7066
c.bench_function(
7167
"benching deserializing traces from msgpack to their internal representation ",
7268
|b| {
7369
b.iter_batched(
7470
|| data_as_bytes.clone(),
7571
|data_as_bytes| {
76-
let result: anyhow::Result<TracerPayloadCollection> = black_box(
77-
TracerPayloadParams::new(
78-
data_as_bytes,
79-
tracer_header_tags,
80-
&mut DefaultTraceChunkProcessor,
81-
false,
82-
TraceEncoding::V04,
83-
)
84-
.try_into(),
85-
);
72+
let result =
73+
black_box(decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04));
8674
assert!(result.is_ok());
8775
// Return the result to avoid measuring the deallocation time
8876
result

trace-utils/src/msgpack_decoder/v04/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use crate::span::{SpanBytes, SpanSlice};
1717
///
1818
/// # Returns
1919
///
20-
/// * `Ok(Vec<TracerPayloadV04>)` - A vector of decoded `TracerPayloadV04` objects if successful.
20+
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
21+
/// successful. and the number of bytes in the slice used by the decoder.
2122
/// * `Err(DecodeError)` - An error if the decoding process fails.
2223
///
2324
/// # Errors
@@ -76,7 +77,8 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec<Vec<SpanBytes>>, usize)
7677
///
7778
/// # Returns
7879
///
79-
/// * `Ok(Vec<TracerPayloadV04>)` - A vector of decoded `Vec<SpanSlice>` objects if successful.
80+
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
81+
/// successful. and the number of bytes in the slice used by the decoder.
8082
/// * `Err(DecodeError)` - An error if the decoding process fails.
8183
///
8284
/// # Errors

0 commit comments

Comments
 (0)