Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 34 additions & 63 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError};
use datadog_trace_utils::span::SpanBytes;
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
use datadog_trace_utils::tracer_payload::{self, TraceCollection};
use datadog_trace_utils::tracer_payload;
use ddcommon::header::{
APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR,
};
Expand Down Expand Up @@ -230,11 +231,11 @@ impl TraceExporter {
match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Ok((traces, _)) => self.send_deser_ser(traces),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Ok((traces, _)) => self.send_deser_ser(traces),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
}
Expand Down Expand Up @@ -559,7 +560,7 @@ impl TraceExporter {
/// Add all spans from the given iterator into the stats concentrator
/// # Panic
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
fn add_spans_to_stats(&self, collection: &TraceCollection) {
fn add_spans_to_stats(&self, traces: &[Vec<SpanBytes>]) {
if let StatsComputationStatus::Enabled {
stats_concentrator,
cancellation_token: _,
Expand All @@ -569,25 +570,19 @@ impl TraceExporter {
#[allow(clippy::unwrap_used)]
let mut stats_concentrator = stats_concentrator.lock().unwrap();

match collection {
TraceCollection::TraceChunk(traces) => {
let spans = traces.iter().flat_map(|trace| trace.iter());
for span in spans {
stats_concentrator.add_span(span);
}
}
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
TraceCollection::V07(_) => unreachable!(),
let spans = traces.iter().flat_map(|trace| trace.iter());
for span in spans {
stats_concentrator.add_span(span);
}
}
}

fn send_deser_ser(
&self,
mut collection: TraceCollection,
mut traces: Vec<Vec<SpanBytes>>,
) -> Result<String, TraceExporterError> {
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, collection.len() as i64),
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64),
None,
);

Expand All @@ -596,12 +591,17 @@ impl TraceExporter {
// Stats computation
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
if !self.client_computed_top_level {
collection.set_top_level_spans();
for chunk in traces.iter_mut() {
datadog_trace_utils::span::trace_utils::compute_top_level_span(chunk);
}
}
self.add_spans_to_stats(&collection);
self.add_spans_to_stats(&traces);
// Once stats have been computed we can drop all chunks that are not going to be
// sampled by the agent
let (dropped_p0_traces, dropped_p0_spans) = collection.drop_chunks();
let datadog_trace_utils::span::trace_utils::DroppedP0Stats {
dropped_p0_traces,
dropped_p0_spans,
} = datadog_trace_utils::span::trace_utils::drop_chunks(&mut traces);

// Update the headers to indicate that stats have been computed and forward dropped
// traces counts
Expand All @@ -611,49 +611,22 @@ impl TraceExporter {
header_tags.dropped_p0_spans = dropped_p0_spans;
}

let payload = match self.input_format {
TraceExporterInputFormat::V04 => match self.output_format {
TraceExporterOutputFormat::V04 => trace_utils::collect_trace_chunks(
collection,
&header_tags,
&mut tracer_payload::DefaultTraceChunkProcessor,
self.endpoint.api_key.is_some(),
false,
)
.map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})?,
TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks(
collection,
&header_tags,
&mut tracer_payload::DefaultTraceChunkProcessor,
self.endpoint.api_key.is_some(),
true,
)
.map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})?,
},
TraceExporterInputFormat::V05 => match self.output_format {
TraceExporterOutputFormat::V05 => trace_utils::collect_trace_chunks(
collection,
&header_tags,
&mut tracer_payload::DefaultTraceChunkProcessor,
self.endpoint.api_key.is_some(),
true,
)
.map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})?,
let use_v05_format = match (self.input_format, self.output_format) {
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04) => false,
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V05)
| (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05) => true,
(TraceExporterInputFormat::V05, TraceExporterOutputFormat::V04) => {
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
_ => unreachable!(
"Conversion from v05 to {:?} not implemented",
self.output_format
),
},
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
_ => unreachable!("Input format not implemented"),
unreachable!("Conversion from v05 to v04 not implemented")
}
(TraceExporterInputFormat::Proxy, _) => {
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
unreachable!("Codepath invalid for proxy mode",)
}
};
let payload = trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
})?;

let chunks = payload.size();
let endpoint = Endpoint {
Expand All @@ -667,14 +640,12 @@ impl TraceExporter {

let strategy = RetryStrategy::default();
let mp_payload = match &payload {
tracer_payload::TracerPayloadCollection::V04(p) => {
tracer_payload::TraceChunks::V04(p) => {
rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)?
}
tracer_payload::TracerPayloadCollection::V05(p) => {
tracer_payload::TraceChunks::V05(p) => {
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)?
}
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
_ => unreachable!("Serialization for v07 not implemented"),
};

let payload_len = mp_payload.len();
Expand Down
23 changes: 9 additions & 14 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use datadog_ipc::tarpc;
use datadog_ipc::tarpc::context::Context;
use datadog_ipc::transport::Transport;
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::tracer_payload;
use datadog_trace_utils::tracer_payload::decode_to_trace_chunks;
use datadog_trace_utils::tracer_payload::TraceEncoding;
use ddcommon::{Endpoint, MutexExt};
use ddtelemetry::worker::{
Expand Down Expand Up @@ -275,20 +275,15 @@ impl SidecarServer {
headers
);

let mut size = 0;
let mut processor = tracer_payload::DefaultTraceChunkProcessor;
let mut payload_params = tracer_payload::TracerPayloadParams::new(
data,
&headers,
&mut processor,
target.api_key.is_some(),
TraceEncoding::V04,
);
payload_params.measure_size(&mut size);
match payload_params.try_into() {
Ok(payload) => {
match decode_to_trace_chunks(data, TraceEncoding::V04) {
Ok((payload, size)) => {
trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}");
let data = SendData::new(size, payload, headers, target);
let data = SendData::new(
size,
payload.into_tracer_payload_collection(),
headers,
target,
);
self.trace_flusher.enqueue(data);
}
Err(e) => {
Expand Down
7 changes: 3 additions & 4 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::{self};
use datadog_trace_utils::trace_utils::{EnvironmentType, SendData};
use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceCollection};
use datadog_trace_utils::tracer_payload::TraceChunkProcessor;

use crate::{
config::Config,
Expand Down Expand Up @@ -96,15 +96,14 @@ impl TraceProcessor for ServerlessTraceProcessor {
}
};

let payload = match trace_utils::collect_trace_chunks(
TraceCollection::V07(traces),
let payload = match trace_utils::collect_pb_trace_chunks(
traces,
&tracer_header_tags,
&mut ChunkProcessor {
config: config.clone(),
mini_agent_metadata: mini_agent_metadata.clone(),
},
true, // In mini agent, we always send agentless
false,
) {
Ok(res) => res,
Err(err) => {
Expand Down
18 changes: 3 additions & 15 deletions trace-utils/benches/deserialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use criterion::{black_box, criterion_group, Criterion};
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
use datadog_trace_utils::tracer_payload::{
DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams,
};
use datadog_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding};
use serde_json::{json, Value};

fn generate_spans(num_spans: usize, trace_id: u64) -> Vec<Value> {
Expand Down Expand Up @@ -65,24 +62,15 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) {
let data = rmp_serde::to_vec(&generate_trace_chunks(20, 2_075))
.expect("Failed to serialize test spans.");
let data_as_bytes = tinybytes::Bytes::copy_from_slice(&data);
let tracer_header_tags = &TracerHeaderTags::default();

c.bench_function(
"benching deserializing traces from msgpack to their internal representation ",
|b| {
b.iter_batched(
|| data_as_bytes.clone(),
|data_as_bytes| {
let result: anyhow::Result<TracerPayloadCollection> = black_box(
TracerPayloadParams::new(
data_as_bytes,
tracer_header_tags,
&mut DefaultTraceChunkProcessor,
false,
TraceEncoding::V04,
)
.try_into(),
);
let result =
black_box(decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04));
assert!(result.is_ok());
// Return the result to avoid measuring the deallocation time
result
Expand Down
6 changes: 4 additions & 2 deletions trace-utils/src/msgpack_decoder/v04/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use crate::span::{SpanBytes, SpanSlice};
///
/// # Returns
///
/// * `Ok(Vec<TracerPayloadV04>)` - A vector of decoded `TracerPayloadV04` objects if successful.
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
/// successful. and the number of bytes in the slice used by the decoder.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
Expand Down Expand Up @@ -76,7 +77,8 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec<Vec<SpanBytes>>, usize)
///
/// # Returns
///
/// * `Ok(Vec<TracerPayloadV04>)` - A vector of decoded `Vec<SpanSlice>` objects if successful.
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
/// successful. and the number of bytes in the slice used by the decoder.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
Expand Down
11 changes: 9 additions & 2 deletions trace-utils/src/span/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use std::fmt;
use std::hash::Hash;
use std::str::FromStr;
use tinybytes::{Bytes, BytesString};
use v05::dict::SharedDict;

use crate::tracer_payload::TraceChunks;

#[derive(Debug, PartialEq)]
pub enum SpanKey {
Expand Down Expand Up @@ -60,9 +63,9 @@ impl FromStr for SpanKey {
/// Trait representing the requirements for a type to be used as a Span "string" type.
/// Note: Borrow<str> is not required by the derived traits, but allows to access HashMap elements
/// from a static str and check if the string is empty.
pub trait SpanText: Eq + Hash + Borrow<str> + Serialize {}
pub trait SpanText: Eq + Hash + Borrow<str> + Serialize + Default + Clone {}
/// Implement the SpanText trait for any type which satisfies the sub traits.
impl<T: Eq + Hash + Borrow<str> + Serialize> SpanText for T {}
impl<T: Eq + Hash + Borrow<str> + Serialize + Default + Clone> SpanText for T {}

/// Checks if the `value` represents an empty string. Used to skip serializing empty strings
/// with serde.
Expand Down Expand Up @@ -259,6 +262,10 @@ pub type SpanEventSlice<'a> = SpanEvent<&'a str>;
pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>;
pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>;

pub type TraceChunksBytes = TraceChunks<BytesString>;

pub type SharedDictBytes = SharedDict<BytesString>;

impl SpanSlice<'_> {
/// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal
/// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is
Expand Down
Loading
Loading