Skip to content

Commit d44d248

Browse files
rochdevtlhunter
authored andcommitted
move hyper client to its own crate along with tokio/hyper
1 parent 7fbed50 commit d44d248

File tree

12 files changed

+82
-54
lines changed

12 files changed

+82
-54
lines changed

collector/Cargo.lock

Lines changed: 10 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"common",
4+
"hyper-client",
45
"server"
56
]
67

collector/common/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,5 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
async-trait = "0.1.64"
109
hashbrown = "0.13.2"
11-
hyper = { version = "0.14.24", features = ["full"] }
1210
rmp = "0.8.11"
13-
tokio = { version = "1.25.0", features = ["full"] }

collector/common/src/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// TODO: Support streaming with a writer instead of slice.
2+
pub trait Client {
3+
fn request(&self, data: Vec<u8>);
4+
}

collector/common/src/exporting.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use crate::tracing::{Traces};
2-
use async_trait::async_trait;
1+
use crate::tracing::Traces;
32

43
pub mod agent;
54

6-
#[async_trait]
75
pub trait Exporter {
8-
async fn export(&self, traces: Traces);
6+
fn export(&self, traces: Traces);
97
}

collector/common/src/exporting/agent.rs

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
1+
use crate::client::Client;
12
use crate::tracing::{Trace, Traces};
23
use super::Exporter;
3-
use async_trait::async_trait;
4-
use hyper::{Body, Client, Request};
5-
use hyper::client::HttpConnector;
64
use rmp::encode;
75
use rmp::encode::ByteBuf;
86
use hashbrown::HashMap;
97

108
pub struct AgentExporter {
11-
client: Client<HttpConnector>
9+
client: Box<dyn Client + Send + Sync>
1210
}
1311

14-
#[async_trait]
1512
impl Exporter for AgentExporter {
16-
async fn export(&self, traces: Traces) {
13+
fn export(&self, traces: Traces) {
1714
let mut wr = ByteBuf::new();
1815
let trace_count = traces.len();
1916

@@ -23,33 +20,18 @@ impl Exporter for AgentExporter {
2320
self.encode_traces(&mut wr, traces);
2421

2522
let data: Vec<u8> = wr.as_vec().to_vec();
26-
let req = Request::builder()
27-
.method(hyper::Method::PUT)
28-
.uri("http://localhost:8126/v0.4/traces")
29-
.header("Content-Type", "application/msgpack")
30-
.header("X-Datadog-Trace-Count", trace_count.to_string())
31-
// .header("Datadog-Meta-Tracer-Version", "")
32-
// .header("Datadog-Meta-Lang", "")
33-
// .header("Datadog-Meta-Lang-Version", "")
34-
// .header("Datadog-Meta-Lang-Interpreter", "")
35-
.body(Body::from(data))
36-
.unwrap();
37-
38-
self.client.request(req).await.unwrap();
39-
}
40-
}
41-
}
4223

43-
impl Default for AgentExporter {
44-
fn default() -> Self {
45-
Self::new()
24+
// TODO: Get the response somehow (with a channel?)
25+
// TODO: Make client reusable between requests (with a channel?)
26+
self.client.request(data);
27+
}
4628
}
4729
}
4830

4931
impl AgentExporter {
50-
pub fn new() -> Self {
32+
pub fn new(client: Box<dyn Client + Send + Sync>) -> Self {
5133
Self {
52-
client: Client::new()
34+
client
5335
}
5436
}
5537

collector/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod client;
12
pub mod exporting;
23
pub mod msgpack;
34
pub mod processing;

collector/common/src/processing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ impl Processor {
4343
}
4444
}
4545

46-
pub async fn flush(&mut self) {
46+
pub fn flush(&mut self) {
4747
let finished_traces: HashMap<u64, Trace> = self.traces
4848
.drain_filter(|_, v| v.started == v.finished)
4949
.collect();
5050

51-
self.exporter.export(finished_traces).await;
51+
self.exporter.export(finished_traces);
5252
}
5353

5454
fn process_event<R: Read>(&mut self, strings: &mut Vec<String>, mut rd: R) {

collector/hyper-client/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "hyper-client"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
common = { path = "../common" }
10+
hyper = { version = "0.14.24", features = ["full"] }
11+
tokio = { version = "1.25.0", features = ["full"] }

collector/hyper-client/src/lib.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use common::client::Client;
2+
3+
pub struct HyperClient {}
4+
5+
impl HyperClient {
6+
pub fn new() -> Self {
7+
Self {}
8+
}
9+
}
10+
11+
impl Default for HyperClient {
12+
fn default() -> Self {
13+
Self::new()
14+
}
15+
}
16+
17+
impl Client for HyperClient {
18+
fn request(&self, data: Vec<u8>) {
19+
// TODO: configuration options
20+
let req = hyper::Request::builder()
21+
.method(hyper::Method::PUT)
22+
.uri("http://localhost:8126/v0.5/traces")
23+
.header("Content-Type", "application/msgpack")
24+
// .header("X-Datadog-Trace-Count", trace_count.to_string())
25+
// .header("Datadog-Meta-Tracer-Version", "")
26+
// .header("Datadog-Meta-Lang", "")
27+
// .header("Datadog-Meta-Lang-Version", "")
28+
// .header("Datadog-Meta-Lang-Interpreter", "")
29+
.body(hyper::Body::from(data))
30+
.unwrap();
31+
32+
tokio::spawn(async move {
33+
hyper::Client::new().request(req).await.unwrap();
34+
});
35+
}
36+
}

0 commit comments

Comments
 (0)