Skip to content

Commit 6e123c8

Browse files
authored
[datadog] Add DatadogPropagator (#440)
1 parent 9068dd2 commit 6e123c8

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
use opentelemetry::{
2+
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
3+
trace::{
4+
SpanContext, SpanId, TraceContextExt, TraceId, TraceState, TRACE_FLAG_DEFERRED,
5+
TRACE_FLAG_NOT_SAMPLED, TRACE_FLAG_SAMPLED,
6+
},
7+
Context,
8+
};
9+
10+
const DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
11+
const DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
12+
const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
13+
14+
lazy_static::lazy_static! {
15+
static ref DATADOG_HEADER_FIELDS: [String; 3] = [
16+
DATADOG_TRACE_ID_HEADER.to_string(),
17+
DATADOG_PARENT_ID_HEADER.to_string(),
18+
DATADOG_SAMPLING_PRIORITY_HEADER.to_string(),
19+
];
20+
}
21+
22+
enum SamplingPriority {
23+
UserReject = -1,
24+
AutoReject = 0,
25+
AutoKeep = 1,
26+
UserKeep = 2,
27+
}
28+
29+
#[derive(Debug)]
30+
enum ExtractError {
31+
TraceId,
32+
SpanId,
33+
SamplingPriority,
34+
}
35+
36+
/// Extracts and injects `SpanContext`s into `Extractor`s or `Injector`s using Datadog's header format.
37+
///
38+
/// The Datadog header format does not have an explicit spec, but can be divined from the client libraries,
39+
/// such as [dd-trace-go]
40+
///
41+
/// ## Example
42+
///
43+
/// ```
44+
/// use opentelemetry::global;
45+
/// use opentelemetry_contrib::trace::propagator::DatadogPropagator;
46+
///
47+
/// global::set_text_map_propagator(DatadogPropagator::default());
48+
/// ```
49+
///
50+
/// [dd-trace-go]: https://github.com/DataDog/dd-trace-go/blob/v1.28.0/ddtrace/tracer/textmap.go#L293
51+
#[derive(Clone, Debug, Default)]
52+
pub struct DatadogPropagator {
53+
_private: (),
54+
}
55+
56+
impl DatadogPropagator {
57+
/// Creates a new `DatadogPropagator`.
58+
pub fn new() -> Self {
59+
DatadogPropagator::default()
60+
}
61+
62+
fn extract_trace_id(&self, trace_id: &str) -> Result<TraceId, ExtractError> {
63+
u64::from_str_radix(trace_id, 10)
64+
.map(|id| TraceId::from_u128(id as u128))
65+
.map_err(|_| ExtractError::TraceId)
66+
}
67+
68+
fn extract_span_id(&self, span_id: &str) -> Result<SpanId, ExtractError> {
69+
u64::from_str_radix(span_id, 10)
70+
.map(SpanId::from_u64)
71+
.map_err(|_| ExtractError::SpanId)
72+
}
73+
74+
fn extract_sampling_priority(
75+
&self,
76+
sampling_priority: &str,
77+
) -> Result<SamplingPriority, ExtractError> {
78+
let i = i32::from_str_radix(sampling_priority, 10)
79+
.map_err(|_| ExtractError::SamplingPriority)?;
80+
81+
match i {
82+
-1 => Ok(SamplingPriority::UserReject),
83+
0 => Ok(SamplingPriority::AutoReject),
84+
1 => Ok(SamplingPriority::AutoKeep),
85+
2 => Ok(SamplingPriority::UserKeep),
86+
_ => Err(ExtractError::SamplingPriority),
87+
}
88+
}
89+
90+
fn extract_span_context(&self, extractor: &dyn Extractor) -> Result<SpanContext, ExtractError> {
91+
let trace_id =
92+
self.extract_trace_id(extractor.get(DATADOG_TRACE_ID_HEADER).unwrap_or(""))?;
93+
// If we have a trace_id but can't get the parent span, we default it to invalid instead of completely erroring
94+
// out so that the rest of the spans aren't completely lost
95+
let span_id = self
96+
.extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or(""))
97+
.unwrap_or_else(|_| SpanId::invalid());
98+
let sampling_priority = self.extract_sampling_priority(
99+
extractor
100+
.get(DATADOG_SAMPLING_PRIORITY_HEADER)
101+
.unwrap_or(""),
102+
);
103+
let sampled = match sampling_priority {
104+
Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => {
105+
TRACE_FLAG_NOT_SAMPLED
106+
}
107+
Ok(SamplingPriority::UserKeep) | Ok(SamplingPriority::AutoKeep) => TRACE_FLAG_SAMPLED,
108+
// Treat the sampling as DEFERRED instead of erroring on extracting the span context
109+
Err(_) => TRACE_FLAG_DEFERRED,
110+
};
111+
112+
let trace_state = TraceState::default();
113+
114+
Ok(SpanContext::new(
115+
trace_id,
116+
span_id,
117+
sampled,
118+
true,
119+
trace_state,
120+
))
121+
}
122+
}
123+
124+
impl TextMapPropagator for DatadogPropagator {
125+
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
126+
let span_context = cx.span().span_context();
127+
if span_context.is_valid() {
128+
injector.set(
129+
DATADOG_TRACE_ID_HEADER,
130+
(span_context.trace_id().to_u128() as u64).to_string(),
131+
);
132+
injector.set(
133+
DATADOG_PARENT_ID_HEADER,
134+
span_context.span_id().to_u64().to_string(),
135+
);
136+
137+
if !span_context.is_deferred() {
138+
let sampling_priority = if span_context.is_sampled() {
139+
SamplingPriority::AutoKeep
140+
} else {
141+
SamplingPriority::AutoReject
142+
};
143+
144+
injector.set(
145+
DATADOG_SAMPLING_PRIORITY_HEADER,
146+
(sampling_priority as i32).to_string(),
147+
);
148+
}
149+
}
150+
}
151+
152+
fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
153+
let extracted = self
154+
.extract_span_context(extractor)
155+
.unwrap_or_else(|_| SpanContext::empty_context());
156+
157+
cx.with_remote_span_context(extracted)
158+
}
159+
160+
fn fields(&self) -> FieldIter<'_> {
161+
FieldIter::new(DATADOG_HEADER_FIELDS.as_ref())
162+
}
163+
}
164+
165+
#[cfg(test)]
166+
mod tests {
167+
use super::*;
168+
use opentelemetry::testing::trace::TestSpan;
169+
use opentelemetry::trace::TraceState;
170+
use std::collections::HashMap;
171+
172+
#[rustfmt::skip]
173+
fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
174+
vec![
175+
(vec![], SpanContext::empty_context()),
176+
(vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
177+
(vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
178+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::invalid(), TRACE_FLAG_DEFERRED, true, TraceState::default())),
179+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
180+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_NOT_SAMPLED, true, TraceState::default())),
181+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_SAMPLED, true, TraceState::default())),
182+
]
183+
}
184+
185+
#[rustfmt::skip]
186+
fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
187+
vec![
188+
(vec![], SpanContext::empty_context()),
189+
(vec![], SpanContext::new(TraceId::from_hex("garbage"), SpanId::invalid(), TRACE_FLAG_DEFERRED, true, TraceState::default())),
190+
(vec![], SpanContext::new(TraceId::from_hex("1234"), SpanId::invalid(), TRACE_FLAG_DEFERRED, true, TraceState::default())),
191+
(vec![], SpanContext::new(TraceId::from_hex("1234"), SpanId::invalid(), TRACE_FLAG_SAMPLED, true, TraceState::default())),
192+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
193+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_NOT_SAMPLED, true, TraceState::default())),
194+
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_SAMPLED, true, TraceState::default())),
195+
]
196+
}
197+
198+
#[test]
199+
fn test_extract() {
200+
for (header_list, expected) in extract_test_data() {
201+
let map: HashMap<String, String> = header_list
202+
.into_iter()
203+
.map(|(k, v)| (k.to_string(), v.to_string()))
204+
.collect();
205+
206+
let propagator = DatadogPropagator::default();
207+
let context = propagator.extract(&map);
208+
assert_eq!(context.remote_span_context(), Some(&expected));
209+
}
210+
}
211+
212+
#[test]
213+
fn test_extract_empty() {
214+
let map: HashMap<String, String> = HashMap::new();
215+
let propagator = DatadogPropagator::default();
216+
let context = propagator.extract(&map);
217+
assert_eq!(
218+
context.remote_span_context(),
219+
Some(&SpanContext::empty_context())
220+
)
221+
}
222+
223+
#[test]
224+
fn test_inject() {
225+
let propagator = DatadogPropagator::default();
226+
for (header_values, span_context) in inject_test_data() {
227+
let mut injector: HashMap<String, String> = HashMap::new();
228+
propagator.inject_context(
229+
&Context::current_with_span(TestSpan(span_context)),
230+
&mut injector,
231+
);
232+
233+
if header_values.is_empty() {
234+
assert!(injector.is_empty());
235+
} else {
236+
for (k, v) in header_values.into_iter() {
237+
let injected_value: Option<&String> = injector.get(k);
238+
assert_eq!(injected_value, Some(&v.to_string()));
239+
injector.remove(k);
240+
}
241+
assert!(injector.is_empty());
242+
}
243+
}
244+
}
245+
}

opentelemetry-contrib/src/trace/propagator/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#[cfg(feature = "aws-xray")]
1313
mod aws;
1414
pub mod binary;
15+
#[cfg(feature = "datadog")]
16+
mod datadog;
1517

1618
#[cfg(feature = "aws-xray")]
1719
pub use aws::XrayPropagator;
20+
#[cfg(feature = "datadog")]
21+
pub use datadog::DatadogPropagator;

0 commit comments

Comments
 (0)