Skip to content

Commit f0ec0a8

Browse files
rochdevtlhunter
authored andcommitted
split collector as a multi-module crate
1 parent 1d76ec0 commit f0ec0a8

File tree

9 files changed

+503
-396
lines changed

9 files changed

+503
-396
lines changed

collector/Cargo.lock

Lines changed: 41 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

collector/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "dd-trace-collector"
2+
name = "ddcollector"
33
version = "0.1.0"
44
edition = "2021"
55

@@ -13,6 +13,8 @@ strip = true
1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1414

1515
[dependencies]
16+
async-trait = "0.1.64"
17+
hashbrown = "0.13.2"
1618
hyper = { version = "0.14.24", features = ["full"] }
1719
rmp = "0.8.11"
1820
tokio = { version = "1.25.0", features = ["full"] }

collector/src/exporting.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use crate::tracing::{Traces};
2+
use async_trait::async_trait;
3+
4+
pub mod agent;
5+
6+
#[async_trait]
7+
pub trait Exporter {
8+
async fn export(&self, traces: Traces);
9+
}

collector/src/exporting/agent.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
use crate::tracing::{Trace, Traces};
2+
use super::Exporter;
3+
use async_trait::async_trait;
4+
use rmp::encode;
5+
use rmp::encode::ByteBuf;
6+
use hashbrown::HashMap;
7+
8+
pub struct AgentExporter {}
9+
10+
#[async_trait]
11+
impl Exporter for AgentExporter {
12+
async fn export(&self, traces: Traces) {
13+
let mut wr = ByteBuf::new();
14+
let trace_count = traces.len();
15+
16+
if trace_count > 0 {
17+
// println!("{:#?}", traces);
18+
19+
self.encode_traces(&mut wr, traces);
20+
21+
let client = hyper::Client::new();
22+
let data: Vec<u8> = wr.as_vec().to_vec();
23+
let req = hyper::Request::builder()
24+
.method(hyper::Method::PUT)
25+
.uri("http://localhost:8126/v0.4/traces")
26+
.header("Content-Type", "application/msgpack")
27+
.header("X-Datadog-Trace-Count", trace_count.to_string())
28+
// .header("Datadog-Meta-Tracer-Version", "")
29+
// .header("Datadog-Meta-Lang", "")
30+
// .header("Datadog-Meta-Lang-Version", "")
31+
// .header("Datadog-Meta-Lang-Interpreter", "")
32+
.body(hyper::Body::from(data))
33+
.unwrap();
34+
35+
client.request(req).await.unwrap();
36+
}
37+
}
38+
}
39+
40+
impl Default for AgentExporter {
41+
fn default() -> Self {
42+
Self::new()
43+
}
44+
}
45+
46+
impl AgentExporter {
47+
pub fn new() -> Self {
48+
Self {}
49+
}
50+
51+
fn encode_traces(&self, wr: &mut ByteBuf, traces: Traces) {
52+
encode::write_array_len(wr, traces.len() as u32).unwrap();
53+
54+
for trace in traces.values() {
55+
self.encode_trace(wr, trace);
56+
}
57+
}
58+
59+
fn encode_trace(&self, wr: &mut ByteBuf, trace: &Trace) {
60+
encode::write_array_len(wr, trace.spans.len() as u32).unwrap();
61+
62+
for span in trace.spans.values() {
63+
match &span.span_type {
64+
Some(span_type) => {
65+
encode::write_map_len(wr, 12).unwrap();
66+
encode::write_str(wr, "type").unwrap();
67+
encode::write_str(wr, span_type.as_str()).unwrap();
68+
},
69+
None => {
70+
encode::write_map_len(wr, 11).unwrap();
71+
}
72+
}
73+
74+
encode::write_str(wr, "trace_id").unwrap();
75+
encode::write_uint(wr, span.trace_id).unwrap();
76+
encode::write_str(wr, "span_id").unwrap();
77+
encode::write_uint(wr, span.span_id).unwrap();
78+
encode::write_str(wr, "parent_id").unwrap();
79+
encode::write_uint(wr, span.parent_id).unwrap();
80+
encode::write_str(wr, "name").unwrap();
81+
encode::write_str(wr, span.name.as_str()).unwrap();
82+
encode::write_str(wr, "resource").unwrap();
83+
encode::write_str(wr, span.resource.as_str()).unwrap();
84+
encode::write_str(wr, "service").unwrap();
85+
encode::write_str(wr, span.service.as_str()).unwrap();
86+
encode::write_str(wr, "error").unwrap();
87+
encode::write_uint(wr, span.error).unwrap();
88+
encode::write_str(wr, "start").unwrap();
89+
encode::write_uint(wr, span.start).unwrap();
90+
encode::write_str(wr, "duration").unwrap();
91+
encode::write_uint(wr, span.duration + 1).unwrap();
92+
93+
self.encode_meta(wr, &span.meta);
94+
self.encode_metrics(wr, &span.metrics);
95+
}
96+
}
97+
98+
fn encode_meta(&self, wr: &mut ByteBuf, map: &HashMap<String, String>) {
99+
encode::write_str(wr, "meta").unwrap();
100+
encode::write_map_len(wr, map.len() as u32).unwrap();
101+
102+
for (k, v) in map {
103+
encode::write_str(wr, k.as_str()).unwrap();
104+
encode::write_str(wr, v.as_str()).unwrap();
105+
}
106+
}
107+
108+
fn encode_metrics(&self, wr: &mut ByteBuf, map: &HashMap<String, f64>) {
109+
encode::write_str(wr, "metrics").unwrap();
110+
encode::write_map_len(wr, map.len() as u32).unwrap();
111+
112+
for (k, v) in map {
113+
encode::write_str(wr, k.as_str()).unwrap();
114+
encode::write_f64(wr, *v).unwrap();
115+
}
116+
}
117+
}

collector/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pub mod exporting;
2+
pub mod msgpack;
3+
pub mod processing;
4+
pub mod tracing;

0 commit comments

Comments
 (0)