Skip to content

Commit 678c0f2

Browse files
authored
Switch from sending bytes to slice in TraceExporter (#1029)
1 parent 3dab0be commit 678c0f2

File tree

8 files changed

+179
-169
lines changed

8 files changed

+179
-169
lines changed

data-pipeline-ffi/src/trace_exporter.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
357357
None => return gen_error!(ErrorCode::InvalidArgument),
358358
};
359359

360-
// necessary that the trace be static for the life of the FFI function call as the caller
361-
// currently owns the memory.
362-
//APMSP-1621 - Properly fix this sharp-edge by allocating memory on the Rust side
363-
let static_trace: ByteSlice<'static> = std::mem::transmute(trace);
364-
match exporter.send(
365-
tinybytes::Bytes::from_static(static_trace.as_slice()),
366-
trace_count,
367-
) {
360+
match exporter.send(&trace, trace_count) {
368361
Ok(resp) => {
369362
if let Some(result) = response_out {
370363
result
@@ -381,7 +374,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
381374
mod tests {
382375
use super::*;
383376
use crate::error::ddog_trace_exporter_error_free;
384-
use datadog_trace_utils::span::SpanBytes;
377+
use datadog_trace_utils::span::SpanSlice;
385378
use httpmock::prelude::*;
386379
use httpmock::MockServer;
387380
use std::{borrow::Borrow, mem::MaybeUninit};
@@ -775,7 +768,7 @@ mod tests {
775768

776769
assert_eq!(ret, None);
777770

778-
let data = rmp_serde::to_vec_named::<Vec<Vec<SpanBytes>>>(&vec![vec![]]).unwrap();
771+
let data = rmp_serde::to_vec_named::<Vec<Vec<SpanSlice>>>(&vec![vec![]]).unwrap();
779772
let traces = ByteSlice::new(&data);
780773
ret = ddog_trace_exporter_send(
781774
Some(exporter.as_ref()),

data-pipeline/examples/send-traces-with-stats.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ fn main() {
5555
traces.push(trace);
5656
}
5757
let data = rmp_serde::to_vec_named(&traces).unwrap();
58-
let data_as_bytes = tinybytes::Bytes::from(data);
5958

60-
exporter.send(data_as_bytes, 100).unwrap();
59+
exporter.send(data.as_ref(), 100).unwrap();
6160
exporter.shutdown(None).unwrap();
6261
}

data-pipeline/src/span_concentrator/aggregation.rs

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
//! This includes the aggregation key to group spans together and the computation of stats from a
55
//! span.
66
use datadog_trace_protobuf::pb;
7-
use datadog_trace_utils::span::{trace_utils, SpanBytes};
7+
use datadog_trace_utils::span::trace_utils;
8+
use datadog_trace_utils::span::Span;
9+
use datadog_trace_utils::span::SpanText;
810
use std::borrow::Borrow;
911
use std::borrow::Cow;
1012
use std::collections::HashMap;
@@ -108,32 +110,35 @@ impl<'a> AggregationKey<'a> {
108110
///
109111
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
110112
/// key.
111-
pub(super) fn from_span(span: &'a SpanBytes, peer_tag_keys: &'a [String]) -> Self {
113+
pub(super) fn from_span<T>(span: &'a Span<T>, peer_tag_keys: &'a [String]) -> Self
114+
where
115+
T: SpanText,
116+
{
112117
let span_kind = span
113118
.meta
114119
.get(TAG_SPANKIND)
115-
.map(|s| s.as_str())
120+
.map(|s| s.borrow())
116121
.unwrap_or_default();
117122
let peer_tags = if client_or_producer(span_kind) {
118123
get_peer_tags(span, peer_tag_keys)
119124
} else {
120125
vec![]
121126
};
122127
Self {
123-
resource_name: span.resource.as_str().into(),
124-
service_name: span.service.as_str().into(),
125-
operation_name: span.name.as_str().into(),
126-
span_type: span.r#type.as_str().into(),
128+
resource_name: span.resource.borrow().into(),
129+
service_name: span.service.borrow().into(),
130+
operation_name: span.name.borrow().into(),
131+
span_type: span.r#type.borrow().into(),
127132
span_kind: span_kind.into(),
128133
http_status_code: get_status_code(span),
129134
is_synthetics_request: span
130135
.meta
131136
.get(TAG_ORIGIN)
132-
.is_some_and(|origin| origin.as_str().starts_with(TAG_SYNTHETICS)),
137+
.is_some_and(|origin| origin.borrow().starts_with(TAG_SYNTHETICS)),
133138
is_trace_root: span.parent_id == 0,
134139
peer_tags: peer_tags
135140
.into_iter()
136-
.map(|(k, v)| (k.into(), v.into()))
141+
.map(|(k, v)| (k.into(), v.borrow().into()))
137142
.collect(),
138143
}
139144
}
@@ -183,30 +188,39 @@ impl From<pb::ClientGroupedStats> for AggregationKey<'static> {
183188
}
184189

185190
/// Return the status code of a span based on the metrics and meta tags.
186-
fn get_status_code(span: &SpanBytes) -> u32 {
191+
fn get_status_code<T>(span: &Span<T>) -> u32
192+
where
193+
T: SpanText,
194+
{
187195
if let Some(status_code) = span.metrics.get(TAG_STATUS_CODE) {
188196
*status_code as u32
189197
} else if let Some(status_code) = span.meta.get(TAG_STATUS_CODE) {
190-
status_code.as_str().parse().unwrap_or(0)
198+
status_code.borrow().parse().unwrap_or(0)
191199
} else {
192200
0
193201
}
194202
}
195203

196204
/// Return true if the span kind is "client" or "producer"
197-
fn client_or_producer(span_kind: &str) -> bool {
198-
matches!(span_kind.to_lowercase().as_str(), "client" | "producer")
205+
fn client_or_producer<T>(span_kind: T) -> bool
206+
where
207+
T: SpanText,
208+
{
209+
matches!(
210+
span_kind.borrow().to_lowercase().as_str(),
211+
"client" | "producer"
212+
)
199213
}
200214

201215
/// Parse the meta tags of a span and return a list of the peer tags based on the list of
202216
/// `peer_tag_keys`
203-
fn get_peer_tags<'k, 'v>(
204-
span: &'v SpanBytes,
205-
peer_tag_keys: &'k [String],
206-
) -> Vec<(&'k str, &'v str)> {
217+
fn get_peer_tags<'k, 'v, T>(span: &'v Span<T>, peer_tag_keys: &'k [String]) -> Vec<(&'k str, &'v T)>
218+
where
219+
T: SpanText,
220+
{
207221
peer_tag_keys
208222
.iter()
209-
.filter_map(|key| Some((key.as_str(), span.meta.get(key.as_str())?.as_str())))
223+
.filter_map(|key| Some((key.as_str(), span.meta.get(key.as_str())?)))
210224
.collect()
211225
}
212226

@@ -223,7 +237,10 @@ pub(super) struct GroupedStats {
223237

224238
impl GroupedStats {
225239
/// Update the stats of a GroupedStats by inserting a span.
226-
fn insert(&mut self, value: &SpanBytes) {
240+
fn insert<T>(&mut self, value: &Span<T>)
241+
where
242+
T: SpanText,
243+
{
227244
self.hits += 1;
228245
self.duration += value.duration as u64;
229246

@@ -258,7 +275,10 @@ impl StatsBucket {
258275

259276
/// Insert a value as stats in the group corresponding to the aggregation key, if it does
260277
/// not exist it creates it.
261-
pub(super) fn insert(&mut self, key: AggregationKey<'_>, value: &SpanBytes) {
278+
pub(super) fn insert<T>(&mut self, key: AggregationKey<'_>, value: &Span<T>)
279+
where
280+
T: SpanText,
281+
{
262282
if let Some(grouped_stats) = self.data.get_mut(&key as &dyn BorrowableAggregationKey) {
263283
grouped_stats.insert(value);
264284
} else {
@@ -320,6 +340,8 @@ fn encode_grouped_stats(key: AggregationKey, group: GroupedStats) -> pb::ClientG
320340

321341
#[cfg(test)]
322342
mod tests {
343+
use datadog_trace_utils::span::{SpanBytes, SpanSlice};
344+
323345
use super::*;
324346

325347
#[test]
@@ -527,19 +549,16 @@ mod tests {
527549
"db.system".to_string(),
528550
];
529551

530-
let test_cases_with_peer_tags: Vec<(SpanBytes, AggregationKey)> = vec![
552+
let test_cases_with_peer_tags: Vec<(SpanSlice, AggregationKey)> = vec![
531553
// Span with peer tags with peertags aggregation enabled
532554
(
533-
SpanBytes {
534-
service: "service".into(),
535-
name: "op".into(),
536-
resource: "res".into(),
555+
SpanSlice {
556+
service: "service",
557+
name: "op",
558+
resource: "res",
537559
span_id: 1,
538560
parent_id: 0,
539-
meta: HashMap::from([
540-
("span.kind".into(), "client".into()),
541-
("aws.s3.bucket".into(), "bucket-a".into()),
542-
]),
561+
meta: HashMap::from([("span.kind", "client"), ("aws.s3.bucket", "bucket-a")]),
543562
..Default::default()
544563
},
545564
AggregationKey {
@@ -554,17 +573,17 @@ mod tests {
554573
),
555574
// Span with multiple peer tags with peertags aggregation enabled
556575
(
557-
SpanBytes {
558-
service: "service".into(),
559-
name: "op".into(),
560-
resource: "res".into(),
576+
SpanSlice {
577+
service: "service",
578+
name: "op",
579+
resource: "res",
561580
span_id: 1,
562581
parent_id: 0,
563582
meta: HashMap::from([
564-
("span.kind".into(), "producer".into()),
565-
("aws.s3.bucket".into(), "bucket-a".into()),
566-
("db.instance".into(), "dynamo.test.us1".into()),
567-
("db.system".into(), "dynamodb".into()),
583+
("span.kind", "producer"),
584+
("aws.s3.bucket", "bucket-a"),
585+
("db.instance", "dynamo.test.us1"),
586+
("db.system", "dynamodb"),
568587
]),
569588
..Default::default()
570589
},
@@ -585,17 +604,17 @@ mod tests {
585604
// Span with multiple peer tags with peertags aggregation enabled and span kind is
586605
// server
587606
(
588-
SpanBytes {
589-
service: "service".into(),
590-
name: "op".into(),
591-
resource: "res".into(),
607+
SpanSlice {
608+
service: "service",
609+
name: "op",
610+
resource: "res",
592611
span_id: 1,
593612
parent_id: 0,
594613
meta: HashMap::from([
595-
("span.kind".into(), "server".into()),
596-
("aws.s3.bucket".into(), "bucket-a".into()),
597-
("db.instance".into(), "dynamo.test.us1".into()),
598-
("db.system".into(), "dynamodb".into()),
614+
("span.kind", "server"),
615+
("aws.s3.bucket", "bucket-a"),
616+
("db.instance", "dynamo.test.us1"),
617+
("db.system", "dynamodb"),
599618
]),
600619
..Default::default()
601620
},

data-pipeline/src/span_concentrator/mod.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::HashMap;
55
use std::time::{self, Duration, SystemTime};
66

77
use datadog_trace_protobuf::pb;
8-
use datadog_trace_utils::span::{trace_utils, SpanBytes};
8+
use datadog_trace_utils::span::{trace_utils, Span, SpanText};
99

1010
use aggregation::{AggregationKey, StatsBucket};
1111

@@ -25,15 +25,21 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
2525
}
2626

2727
/// Return true if the span has a span.kind that is eligible for stats computation
28-
fn compute_stats_for_span_kind(span: &SpanBytes, span_kinds_stats_computed: &[String]) -> bool {
28+
fn compute_stats_for_span_kind<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
29+
where
30+
T: SpanText,
31+
{
2932
!span_kinds_stats_computed.is_empty()
3033
&& span.meta.get("span.kind").is_some_and(|span_kind| {
31-
span_kinds_stats_computed.contains(&span_kind.as_str().to_lowercase())
34+
span_kinds_stats_computed.contains(&span_kind.borrow().to_lowercase())
3235
})
3336
}
3437

3538
/// Return true if the span should be ignored for stats computation
36-
fn should_ignore_span(span: &SpanBytes, span_kinds_stats_computed: &[String]) -> bool {
39+
fn should_ignore_span<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
40+
where
41+
T: SpanText,
42+
{
3743
!(trace_utils::has_top_level(span)
3844
|| trace_utils::is_measured(span)
3945
|| compute_stats_for_span_kind(span, span_kinds_stats_computed))
@@ -113,7 +119,10 @@ impl SpanConcentrator {
113119

114120
/// Add a span into the concentrator, by computing stats if the span is elligible for stats
115121
/// computation.
116-
pub fn add_span(&mut self, span: &SpanBytes) {
122+
pub fn add_span<T>(&mut self, span: &Span<T>)
123+
where
124+
T: SpanText,
125+
{
117126
// If the span is elligible for stats computation
118127
if !should_ignore_span(span, self.span_kinds_stats_computed.as_slice()) {
119128
let mut bucket_timestamp =

0 commit comments

Comments
 (0)