Skip to content

Commit 9068dd2

Browse files
authored
[datadog] group exported spans by trace_id (#443)
While not technically required, not doing this causes issues with the datadog-trace-agent rate limits, and will lead to dropped spans when the span trace exceeds the agent-configured trace rate limit This also set the X-Datadog-Trace-Count header which is used by the agent to enforce this rate limit quickly, and to report to the exporter that the rate limit was exceeded https://github.com/DataDog/datadog-agent/blob/master/pkg/trace/api/api.go#L442
1 parent f06f38b commit 9068dd2

File tree

5 files changed

+167
-120
lines changed

5 files changed

+167
-120
lines changed

opentelemetry-contrib/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"]
2222
default = []
2323
base64_format = ["base64", "binary_propagator"]
2424
binary_propagator = []
25-
datadog = ["indexmap", "rmp", "async-trait", "thiserror", "opentelemetry-http"]
25+
datadog = ["indexmap", "rmp", "async-trait", "thiserror", "opentelemetry-http", "itertools"]
2626
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
2727
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
2828
surf-client = ["surf", "opentelemetry-http/surf"]
@@ -40,6 +40,7 @@ surf = { version = "2.0", optional = true }
4040
http = "0.2"
4141
base64 = { version = "0.13", optional = true }
4242
thiserror = { version = "1.0", optional = true }
43+
itertools = { version = "0.10", optional = true }
4344

4445
[dev-dependencies]
4546
base64 = "0.13"

opentelemetry-contrib/src/trace/exporter/datadog/mod.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ pub use model::Error;
127127

128128
use async_trait::async_trait;
129129
use http::{Method, Request, Uri};
130+
use itertools::Itertools;
130131
use opentelemetry::sdk::export::trace;
131132
use opentelemetry::sdk::export::trace::SpanData;
132133
use opentelemetry::trace::TraceError;
@@ -139,6 +140,9 @@ const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
139140
/// Default service name if no service is configured.
140141
const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry";
141142

143+
/// Header name used to inform the Datadog agent of the number of traces in the payload
144+
const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";
145+
142146
/// Datadog span exporter
143147
#[derive(Debug)]
144148
pub struct DatadogExporter {
@@ -270,15 +274,27 @@ impl DatadogPipelineBuilder {
270274
}
271275
}
272276

277+
fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
278+
spans
279+
.into_iter()
280+
.into_group_map_by(|span_data| span_data.span_context.trace_id())
281+
.into_iter()
282+
.map(|(_, trace)| trace)
283+
.collect()
284+
}
285+
273286
#[async_trait]
274287
impl trace::SpanExporter for DatadogExporter {
275288
/// Export spans to datadog-agent
276289
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
277-
let data = self.version.encode(&self.service_name, batch)?;
290+
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
291+
let trace_count = traces.len();
292+
let data = self.version.encode(&self.service_name, traces)?;
278293
let req = Request::builder()
279294
.method(Method::POST)
280295
.uri(self.request_url.clone())
281296
.header(http::header::CONTENT_TYPE, self.version.content_type())
297+
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
282298
.body(data)
283299
.map_err::<Error, _>(Into::into)?;
284300
self.client.send(req).await
@@ -289,3 +305,27 @@ impl trace::SpanExporter for DatadogExporter {
289305
#[must_use]
290306
#[derive(Debug)]
291307
pub struct Uninstall(global::TracerProviderGuard);
308+
309+
#[cfg(test)]
310+
mod tests {
311+
use super::*;
312+
313+
use crate::trace::exporter::datadog::model::tests::get_span;
314+
315+
#[test]
316+
fn test_out_of_order_group() -> Result<(), Box<dyn std::error::Error>> {
317+
let batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)];
318+
let expected = vec![
319+
vec![get_span(1, 1, 1), get_span(1, 1, 3)],
320+
vec![get_span(2, 2, 2)],
321+
];
322+
323+
let mut traces = group_into_traces(batch);
324+
// We need to sort the output in order to compare, but this is not required by the Datadog agent
325+
traces.sort_by_key(|t| t[0].span_context.trace_id().to_u128());
326+
327+
assert_eq!(traces, expected);
328+
329+
Ok(())
330+
}
331+
}

opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ impl ApiVersion {
6262
pub(crate) fn encode(
6363
self,
6464
service_name: &str,
65-
spans: Vec<trace::SpanData>,
65+
traces: Vec<Vec<trace::SpanData>>,
6666
) -> Result<Vec<u8>, Error> {
6767
match self {
68-
Self::Version03 => v03::encode(service_name, spans),
69-
Self::Version05 => v05::encode(service_name, spans),
68+
Self::Version03 => v03::encode(service_name, traces),
69+
Self::Version05 => v05::encode(service_name, traces),
7070
}
7171
}
7272
}
7373

7474
#[cfg(test)]
75-
mod tests {
75+
pub(crate) mod tests {
7676
use super::*;
7777
use opentelemetry::sdk;
7878
use opentelemetry::sdk::InstrumentationLibrary;
@@ -83,11 +83,11 @@ mod tests {
8383
use std::sync::Arc;
8484
use std::time::{Duration, SystemTime};
8585

86-
fn get_spans() -> Vec<trace::SpanData> {
87-
let parent_span_id = 1;
88-
let trace_id = 7;
89-
let span_id = 99;
86+
fn get_traces() -> Vec<Vec<trace::SpanData>> {
87+
vec![vec![get_span(7, 1, 99)]]
88+
}
9089

90+
pub(crate) fn get_span(trace_id: u128, parent_span_id: u64, span_id: u64) -> trace::SpanData {
9191
let span_context = SpanContext::new(
9292
TraceId::from_u128(trace_id),
9393
SpanId::from_u64(span_id),
@@ -106,7 +106,7 @@ mod tests {
106106
let message_events = sdk::trace::EvictedQueue::new(capacity);
107107
let links = sdk::trace::EvictedQueue::new(capacity);
108108

109-
let span_data = trace::SpanData {
109+
trace::SpanData {
110110
span_context,
111111
parent_span_id: SpanId::from_u64(parent_span_id),
112112
span_kind: SpanKind::Client,
@@ -120,15 +120,13 @@ mod tests {
120120
status_message: String::new(),
121121
resource: Arc::new(sdk::Resource::default()),
122122
instrumentation_lib: InstrumentationLibrary::new("component", None),
123-
};
124-
125-
vec![span_data]
123+
}
126124
}
127125

128126
#[test]
129127
fn test_encode_v03() -> Result<(), Box<dyn std::error::Error>> {
130-
let spans = get_spans();
131-
let encoded = base64::encode(ApiVersion::Version03.encode("service_name", spans)?);
128+
let traces = get_traces();
129+
let encoded = base64::encode(ApiVersion::Version03.encode("service_name", traces)?);
132130

133131
assert_eq!(encoded.as_str(), "kZGLpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAAAAAAABpXN0YXJ00wAAAAAAAAAAqGR1cmF0aW9u0wAAAAA7msoApWVycm9y0gAAAAGkbWV0YYGpc3Bhbi50eXBlo3dlYg==");
134132

@@ -137,8 +135,8 @@ mod tests {
137135

138136
#[test]
139137
fn test_encode_v05() -> Result<(), Box<dyn std::error::Error>> {
140-
let spans = get_spans();
141-
let encoded = base64::encode(ApiVersion::Version05.encode("service_name", spans)?);
138+
let traces = get_traces();
139+
let encoded = base64::encode(ApiVersion::Version05.encode("service_name", traces)?);
142140

143141
assert_eq!(encoded.as_str(), "kpWsc2VydmljZV9uYW1lo3dlYqljb21wb25lbnSocmVzb3VyY2Wpc3Bhbi50eXBlkZGczgAAAADOAAAAAs4AAAADzwAAAAAAAAAHzwAAAAAAAABjzwAAAAAAAAAB0wAAAAAAAAAA0wAAAAA7msoA0gAAAAGBzgAAAATOAAAAAYDOAAAAAQ==");
144142

opentelemetry-contrib/src/trace/exporter/datadog/model/v03.rs

Lines changed: 65 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,68 +3,72 @@ use opentelemetry::sdk::export::trace;
33
use opentelemetry::{Key, Value};
44
use std::time::SystemTime;
55

6-
pub(crate) fn encode(service_name: &str, spans: Vec<trace::SpanData>) -> Result<Vec<u8>, Error> {
6+
pub(crate) fn encode(
7+
service_name: &str,
8+
traces: Vec<Vec<trace::SpanData>>,
9+
) -> Result<Vec<u8>, Error> {
710
let mut encoded = Vec::new();
8-
rmp::encode::write_array_len(&mut encoded, spans.len() as u32)?;
9-
10-
for span in spans.into_iter() {
11-
// API supports but doesn't mandate grouping spans with the same trace ID
12-
rmp::encode::write_array_len(&mut encoded, 1)?;
13-
14-
// Safe until the year 2262 when Datadog will need to change their API
15-
let start = span
16-
.start_time
17-
.duration_since(SystemTime::UNIX_EPOCH)
18-
.unwrap()
19-
.as_nanos() as i64;
20-
21-
let duration = span
22-
.end_time
23-
.duration_since(span.start_time)
24-
.map(|x| x.as_nanos() as i64)
25-
.unwrap_or(0);
26-
27-
if let Some(Value::String(s)) = span.attributes.get(&Key::new("span.type")) {
28-
rmp::encode::write_map_len(&mut encoded, 11)?;
29-
rmp::encode::write_str(&mut encoded, "type")?;
30-
rmp::encode::write_str(&mut encoded, s.as_ref())?;
31-
} else {
32-
rmp::encode::write_map_len(&mut encoded, 10)?;
33-
}
34-
35-
// Datadog span name is OpenTelemetry component name - see module docs for more information
36-
rmp::encode::write_str(&mut encoded, "service")?;
37-
rmp::encode::write_str(&mut encoded, service_name)?;
38-
39-
rmp::encode::write_str(&mut encoded, "name")?;
40-
rmp::encode::write_str(&mut encoded, span.instrumentation_lib.name)?;
41-
42-
rmp::encode::write_str(&mut encoded, "resource")?;
43-
rmp::encode::write_str(&mut encoded, &span.name)?;
44-
45-
rmp::encode::write_str(&mut encoded, "trace_id")?;
46-
rmp::encode::write_u64(&mut encoded, span.span_context.trace_id().to_u128() as u64)?;
47-
48-
rmp::encode::write_str(&mut encoded, "span_id")?;
49-
rmp::encode::write_u64(&mut encoded, span.span_context.span_id().to_u64())?;
50-
51-
rmp::encode::write_str(&mut encoded, "parent_id")?;
52-
rmp::encode::write_u64(&mut encoded, span.parent_span_id.to_u64())?;
53-
54-
rmp::encode::write_str(&mut encoded, "start")?;
55-
rmp::encode::write_i64(&mut encoded, start)?;
56-
57-
rmp::encode::write_str(&mut encoded, "duration")?;
58-
rmp::encode::write_i64(&mut encoded, duration)?;
59-
60-
rmp::encode::write_str(&mut encoded, "error")?;
61-
rmp::encode::write_i32(&mut encoded, span.status_code as i32)?;
62-
63-
rmp::encode::write_str(&mut encoded, "meta")?;
64-
rmp::encode::write_map_len(&mut encoded, span.attributes.len() as u32)?;
65-
for (key, value) in span.attributes.iter() {
66-
rmp::encode::write_str(&mut encoded, key.as_str())?;
67-
rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?;
11+
rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?;
12+
13+
for trace in traces.into_iter() {
14+
rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?;
15+
16+
for span in trace.into_iter() {
17+
// Safe until the year 2262 when Datadog will need to change their API
18+
let start = span
19+
.start_time
20+
.duration_since(SystemTime::UNIX_EPOCH)
21+
.unwrap()
22+
.as_nanos() as i64;
23+
24+
let duration = span
25+
.end_time
26+
.duration_since(span.start_time)
27+
.map(|x| x.as_nanos() as i64)
28+
.unwrap_or(0);
29+
30+
if let Some(Value::String(s)) = span.attributes.get(&Key::new("span.type")) {
31+
rmp::encode::write_map_len(&mut encoded, 11)?;
32+
rmp::encode::write_str(&mut encoded, "type")?;
33+
rmp::encode::write_str(&mut encoded, s.as_ref())?;
34+
} else {
35+
rmp::encode::write_map_len(&mut encoded, 10)?;
36+
}
37+
38+
// Datadog span name is OpenTelemetry component name - see module docs for more information
39+
rmp::encode::write_str(&mut encoded, "service")?;
40+
rmp::encode::write_str(&mut encoded, service_name)?;
41+
42+
rmp::encode::write_str(&mut encoded, "name")?;
43+
rmp::encode::write_str(&mut encoded, span.instrumentation_lib.name)?;
44+
45+
rmp::encode::write_str(&mut encoded, "resource")?;
46+
rmp::encode::write_str(&mut encoded, &span.name)?;
47+
48+
rmp::encode::write_str(&mut encoded, "trace_id")?;
49+
rmp::encode::write_u64(&mut encoded, span.span_context.trace_id().to_u128() as u64)?;
50+
51+
rmp::encode::write_str(&mut encoded, "span_id")?;
52+
rmp::encode::write_u64(&mut encoded, span.span_context.span_id().to_u64())?;
53+
54+
rmp::encode::write_str(&mut encoded, "parent_id")?;
55+
rmp::encode::write_u64(&mut encoded, span.parent_span_id.to_u64())?;
56+
57+
rmp::encode::write_str(&mut encoded, "start")?;
58+
rmp::encode::write_i64(&mut encoded, start)?;
59+
60+
rmp::encode::write_str(&mut encoded, "duration")?;
61+
rmp::encode::write_i64(&mut encoded, duration)?;
62+
63+
rmp::encode::write_str(&mut encoded, "error")?;
64+
rmp::encode::write_i32(&mut encoded, span.status_code as i32)?;
65+
66+
rmp::encode::write_str(&mut encoded, "meta")?;
67+
rmp::encode::write_map_len(&mut encoded, span.attributes.len() as u32)?;
68+
for (key, value) in span.attributes.iter() {
69+
rmp::encode::write_str(&mut encoded, key.as_str())?;
70+
rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?;
71+
}
6872
}
6973
}
7074

0 commit comments

Comments
 (0)