|
7 | 7 | // option. This file may not be copied, modified, or distributed
|
8 | 8 | // except according to those terms.
|
9 | 9 |
|
| 10 | +use bytes::buf::BufExt; |
10 | 11 | use parking_lot::Mutex;
|
11 | 12 | use std::cell::RefCell;
|
12 | 13 | use std::collections::HashMap;
|
13 | 14 | use std::convert::TryInto;
|
14 | 15 | use std::fmt;
|
15 | 16 | use std::fs;
|
| 17 | +use std::io::Read; |
16 | 18 | use std::net::SocketAddr;
|
17 | 19 | use std::path::Path;
|
18 | 20 | use std::str;
|
@@ -783,9 +785,96 @@ fn get_self_profile_data(
|
783 | 785 | Ok(profile)
|
784 | 786 | }
|
785 | 787 |
|
| 788 | +pub async fn handle_self_profile_raw_download( |
| 789 | + body: self_profile_raw::Request, |
| 790 | + data: &InputData, |
| 791 | +) -> Response { |
| 792 | + let res = handle_self_profile_raw(body, data, false).await; |
| 793 | + let url = match res { |
| 794 | + Ok(v) => v.url, |
| 795 | + Err(e) => { |
| 796 | + let mut resp = Response::new(e.into()); |
| 797 | + *resp.status_mut() = StatusCode::BAD_REQUEST; |
| 798 | + return resp; |
| 799 | + } |
| 800 | + }; |
| 801 | + log::trace!("downloading {}", url); |
| 802 | + |
| 803 | + let resp = match reqwest::get(&url).await { |
| 804 | + Ok(r) => r, |
| 805 | + Err(e) => { |
| 806 | + let mut resp = Response::new(format!("{:?}", e).into()); |
| 807 | + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; |
| 808 | + return resp; |
| 809 | + } |
| 810 | + }; |
| 811 | + |
| 812 | + if !resp.status().is_success() { |
| 813 | + let mut resp = |
| 814 | + Response::new(format!("upstream status {:?} is not successful", resp.status()).into()); |
| 815 | + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; |
| 816 | + return resp; |
| 817 | + } |
| 818 | + |
| 819 | + let (sender, body) = hyper::Body::channel(); |
| 820 | + let mut server_resp = Response::new(body); |
| 821 | + let mut header = vec![]; |
| 822 | + ContentType::octet_stream().encode(&mut header); |
| 823 | + server_resp |
| 824 | + .headers_mut() |
| 825 | + .insert(hyper::header::CONTENT_TYPE, header.pop().unwrap()); |
| 826 | + server_resp.headers_mut().insert( |
| 827 | + hyper::header::CONTENT_DISPOSITION, |
| 828 | + hyper::header::HeaderValue::from_maybe_shared(format!( |
| 829 | + "attachment; filename=\"self-profile.tar\"" |
| 830 | + )) |
| 831 | + .expect("valid header"), |
| 832 | + ); |
| 833 | + *server_resp.status_mut() = StatusCode::OK; |
| 834 | + tokio::spawn(tarball(resp, sender)); |
| 835 | + server_resp |
| 836 | +} |
| 837 | + |
| 838 | +async fn tarball(resp: reqwest::Response, mut sender: hyper::body::Sender) { |
| 839 | + // Ideally, we would stream the response though the snappy decoding, but |
| 840 | + // snappy doesn't support that AFAICT -- we'd need it to implement AsyncRead |
| 841 | + // or correctly handle WouldBlock, and neither is true. |
| 842 | + let input = match resp.bytes().await { |
| 843 | + Ok(b) => b, |
| 844 | + Err(e) => { |
| 845 | + log::error!("failed to receive data: {:?}", e); |
| 846 | + sender.abort(); |
| 847 | + return; |
| 848 | + } |
| 849 | + }; |
| 850 | + let mut decoder = snap::read::FrameDecoder::new(input.reader()); |
| 851 | + let mut buffer = vec![0; 32 * 1024]; |
| 852 | + loop { |
| 853 | + match decoder.read(&mut buffer[..]) { |
| 854 | + Ok(0) => return, |
| 855 | + Ok(length) => { |
| 856 | + if let Err(e) = sender |
| 857 | + .send_data(bytes::Bytes::copy_from_slice(&buffer[..length])) |
| 858 | + .await |
| 859 | + { |
| 860 | + log::error!("failed to send data: {:?}", e); |
| 861 | + sender.abort(); |
| 862 | + return; |
| 863 | + } |
| 864 | + } |
| 865 | + Err(e) => { |
| 866 | + log::error!("failed to fill buffer: {:?}", e); |
| 867 | + sender.abort(); |
| 868 | + return; |
| 869 | + } |
| 870 | + } |
| 871 | + } |
| 872 | +} |
| 873 | + |
786 | 874 | pub async fn handle_self_profile_raw(
|
787 | 875 | body: self_profile_raw::Request,
|
788 | 876 | data: &InputData,
|
| 877 | + validate: bool, |
789 | 878 | ) -> ServerResult<self_profile_raw::Response> {
|
790 | 879 | log::info!("handle_self_profile_raw({:?})", body);
|
791 | 880 | let mut it = body.benchmark.rsplitn(2, '-');
|
@@ -835,16 +924,18 @@ pub async fn handle_self_profile_raw(
|
835 | 924 | cid
|
836 | 925 | );
|
837 | 926 |
|
838 |
| - let resp = reqwest::Client::new() |
839 |
| - .head(&url) |
840 |
| - .send() |
841 |
| - .await |
842 |
| - .map_err(|e| format!("fetching artifact: {:?}", e))?; |
843 |
| - if !resp.status().is_success() { |
844 |
| - return Err(format!( |
845 |
| - "Artifact did not resolve successfully: {:?} received", |
846 |
| - resp.status() |
847 |
| - )); |
| 927 | + if validate { |
| 928 | + let resp = reqwest::Client::new() |
| 929 | + .head(&url) |
| 930 | + .send() |
| 931 | + .await |
| 932 | + .map_err(|e| format!("fetching artifact: {:?}", e))?; |
| 933 | + if !resp.status().is_success() { |
| 934 | + return Err(format!( |
| 935 | + "Artifact did not resolve successfully: {:?} received", |
| 936 | + resp.status() |
| 937 | + )); |
| 938 | + } |
848 | 939 | }
|
849 | 940 |
|
850 | 941 | Ok(self_profile_raw::Response {
|
@@ -1195,6 +1286,61 @@ async fn serve_req(ctx: Arc<Server>, req: Request) -> Result<Response, ServerErr
|
1195 | 1286 | if req.uri().path() == "/perf/onpush" {
|
1196 | 1287 | return Ok(ctx.handle_push(req).await);
|
1197 | 1288 | }
|
| 1289 | + if req.uri().path() == "/perf/download-raw-self-profile" { |
| 1290 | + // FIXME: how should this look? |
| 1291 | + let url = match url::Url::parse(&format!("http://example.com{}", req.uri())) { |
| 1292 | + Ok(v) => v, |
| 1293 | + Err(e) => { |
| 1294 | + error!("failed to parse url {}: {:?}", req.uri(), e); |
| 1295 | + return Ok(http::Response::builder() |
| 1296 | + .header_typed(ContentType::text_utf8()) |
| 1297 | + .status(StatusCode::BAD_REQUEST) |
| 1298 | + .body(hyper::Body::from(format!( |
| 1299 | + "failed to parse url {}: {:?}", |
| 1300 | + req.uri(), |
| 1301 | + e |
| 1302 | + ))) |
| 1303 | + .unwrap()); |
| 1304 | + } |
| 1305 | + }; |
| 1306 | + let mut parts = url |
| 1307 | + .query_pairs() |
| 1308 | + .into_owned() |
| 1309 | + .collect::<HashMap<String, String>>(); |
| 1310 | + macro_rules! key_or_error { |
| 1311 | + ($ident:ident) => { |
| 1312 | + if let Some(v) = parts.remove(stringify!($ident)) { |
| 1313 | + v |
| 1314 | + } else { |
| 1315 | + error!( |
| 1316 | + "failed to deserialize request {}: missing {} in query string", |
| 1317 | + req.uri(), |
| 1318 | + stringify!($ident) |
| 1319 | + ); |
| 1320 | + return Ok(http::Response::builder() |
| 1321 | + .header_typed(ContentType::text_utf8()) |
| 1322 | + .status(StatusCode::BAD_REQUEST) |
| 1323 | + .body(hyper::Body::from(format!( |
| 1324 | + "failed to deserialize request {}: missing {} in query string", |
| 1325 | + req.uri(), |
| 1326 | + stringify!($ident) |
| 1327 | + ))) |
| 1328 | + .unwrap()); |
| 1329 | + } |
| 1330 | + }; |
| 1331 | + } |
| 1332 | + let data: Arc<InputData> = ctx.data.read().as_ref().unwrap().clone(); |
| 1333 | + return Ok(handle_self_profile_raw_download( |
| 1334 | + self_profile_raw::Request { |
| 1335 | + commit: key_or_error!(commit), |
| 1336 | + benchmark: key_or_error!(benchmark), |
| 1337 | + run_name: key_or_error!(run_name), |
| 1338 | + cid: None, |
| 1339 | + }, |
| 1340 | + &data, |
| 1341 | + ) |
| 1342 | + .await); |
| 1343 | + } |
1198 | 1344 |
|
1199 | 1345 | let (req, mut body_stream) = req.into_parts();
|
1200 | 1346 | let p = req.uri.path();
|
@@ -1277,7 +1423,7 @@ async fn serve_req(ctx: Arc<Server>, req: Request) -> Result<Response, ServerErr
|
1277 | 1423 | ))
|
1278 | 1424 | } else if p == "/perf/self-profile-raw" {
|
1279 | 1425 | Ok(to_response(
|
1280 |
| - handle_self_profile_raw(body!(parse_body(&body)), &data).await, |
| 1426 | + handle_self_profile_raw(body!(parse_body(&body)), &data, true).await, |
1281 | 1427 | ))
|
1282 | 1428 | } else {
|
1283 | 1429 | return Ok(http::Response::builder()
|
|
0 commit comments