Skip to content

Commit c7384aa

Browse files
rochdevtlhunter
authored andcommitted
migrate to api compatible with hyper 1.0
1 parent 92863d1 commit c7384aa

File tree

1 file changed

+42
-36
lines changed

1 file changed

+42
-36
lines changed

collector/src/main.rs

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
use hyper::{Body, Error, Method, Server, Version};
1+
use hyper::{Body, Method, StatusCode, Error};
22
use hyper::http::Response;
3-
use hyper::service::{make_service_fn, service_fn};
3+
use hyper::server::conn::Http;
4+
use hyper::service::service_fn;
45
use rmp::encode;
56
use rmp::encode::ByteBuf;
67
use serde::Deserialize;
78
use serde_json::Value;
89
use std::collections::HashMap;
10+
use std::net::SocketAddr;
11+
use tokio::net::TcpListener;
912
use tokio::sync::mpsc;
1013
use tokio::sync::mpsc::{Receiver,Sender};
1114

@@ -46,7 +49,8 @@ struct Payload {
4649
}
4750

4851
// TODO: Decouple processing from transport.
49-
// TODO: Cleanup traces on connection close.
52+
// TODO: Stream the data somehow.
53+
// TODO: Make sure that traces are cleaned up on connection close.
5054
// TODO: Read MsgPack manually and copy bytes to span buffer directly.
5155
// TODO: Add support for more payload metadata (i.e. language).
5256
// TODO: Use string table.
@@ -62,8 +66,11 @@ struct Payload {
6266

6367
#[tokio::main]
6468
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
65-
let addr = ([127, 0, 0, 1], 8127).into();
66-
let make_svc = make_service_fn(|_conn| {
69+
let addr = SocketAddr::from(([127, 0, 0, 1], 8127));
70+
let listener = TcpListener::bind(addr).await?;
71+
72+
loop {
73+
let (stream, _) = listener.accept().await?;
6774
let (tx, mut rx): (Sender<Payload>, Receiver<Payload>) = mpsc::channel(100);
6875

6976
tokio::spawn(async move {
@@ -83,38 +90,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8390
}
8491
});
8592

86-
async move {
87-
Ok::<_, Error>(service_fn(move |mut req| {
88-
let tx = tx.clone();
89-
90-
async move {
91-
let method = req.method();
92-
let path = req.uri().path();
93-
let version = req.version();
94-
95-
if version == Version::HTTP_11 && method == Method::PUT && path == "/v0.1/events" {
96-
let bytes = hyper::body::to_bytes(req.body_mut()).await.unwrap();
97-
let data: Vec<u8> = bytes.try_into().unwrap();
98-
let payload: Payload = rmp_serde::from_slice(&data).unwrap();
99-
100-
tx.send(payload).await.unwrap();
101-
102-
Ok(Response::new(Body::from("")))
103-
} else {
104-
Err("Unsupported request") // TODO: not a 500
93+
tokio::spawn(async move {
94+
Http::new()
95+
.http1_only(true)
96+
.http1_keep_alive(true)
97+
.serve_connection(stream, service_fn(move |mut req| {
98+
let tx = tx.clone();
99+
100+
async move {
101+
let method = req.method();
102+
let path = req.uri().path();
103+
let body;
104+
105+
if method == Method::PUT && path == "/v0.1/events" {
106+
let bytes = hyper::body::to_bytes(req.body_mut()).await.unwrap();
107+
let data: Vec<u8> = bytes.try_into().unwrap();
108+
let payload: Payload = rmp_serde::from_slice(&data).unwrap();
109+
110+
tx.send(payload).await.unwrap();
111+
112+
body = Response::new(Body::from(""));
113+
} else {
114+
body = Response::builder().status(StatusCode::NOT_FOUND).body(Body::from("")).unwrap()
115+
}
116+
117+
Ok::<_, Error>(body)
105118
}
106-
}
107-
}))
108-
}
109-
});
110-
111-
let server = Server::bind(&addr).serve(make_svc);
112-
113-
println!("Listening on http://{}", addr);
114-
115-
server.await?;
116-
117-
Ok(())
119+
}))
120+
.await
121+
.unwrap();
122+
});
123+
}
118124
}
119125

120126
fn process_event(traces: &mut Traces, event: Value, metadata: &Metadata) {

0 commit comments

Comments
 (0)