Skip to content

Commit e08c14f

Browse files
authored
bench(bin/client): don't allocate upload payload upfront (#2200)
* bench(bin/client): don't allocate upload payload upfront When POSTing a large request to a server, don't allocate the entire request upfront, but instead, as is done in `neqo-bin/src/server/mod.rs`, iterate over a static buffer. Reuses the same logic from `neqo-bin/src/server/mod.rs`, i.e. `SendData`. See previous similar change on server side #2008. * Inline done()
1 parent 86d5796 commit e08c14f

File tree

6 files changed

+107
-130
lines changed

6 files changed

+107
-130
lines changed

neqo-bin/src/client/http3.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use neqo_transport::{
2828
use url::Url;
2929

3030
use super::{get_output_file, qlog_new, Args, CloseState, Res};
31-
use crate::STREAM_IO_BUFFER_SIZE;
31+
use crate::{send_data::SendData, STREAM_IO_BUFFER_SIZE};
3232

3333
pub struct Handler<'a> {
3434
#[allow(clippy::struct_field_names)]
@@ -312,9 +312,7 @@ impl StreamHandler for DownloadStreamHandler {
312312
}
313313

314314
struct UploadStreamHandler {
315-
data: Vec<u8>,
316-
offset: usize,
317-
chunk_size: usize,
315+
data: SendData,
318316
start: Instant,
319317
}
320318

@@ -344,21 +342,11 @@ impl StreamHandler for UploadStreamHandler {
344342
}
345343

346344
fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId) {
347-
while self.offset < self.data.len() {
348-
let end = self.offset + self.chunk_size.min(self.data.len() - self.offset);
349-
let chunk = &self.data[self.offset..end];
350-
match client.send_data(stream_id, chunk) {
351-
Ok(amount) => {
352-
if amount == 0 {
353-
break;
354-
}
355-
self.offset += amount;
356-
if self.offset == self.data.len() {
357-
client.stream_close_send(stream_id).unwrap();
358-
}
359-
}
360-
Err(_) => break,
361-
};
345+
let done = self
346+
.data
347+
.send(|chunk| client.send_data(stream_id, chunk).unwrap());
348+
if done {
349+
client.stream_close_send(stream_id).unwrap();
362350
}
363351
}
364352
}
@@ -416,9 +404,7 @@ impl UrlHandler<'_> {
416404
Box::new(DownloadStreamHandler { out_file })
417405
}
418406
"POST" => Box::new(UploadStreamHandler {
419-
data: vec![42; self.args.upload_size],
420-
offset: 0,
421-
chunk_size: STREAM_IO_BUFFER_SIZE,
407+
data: SendData::zeroes(self.args.upload_size),
422408
start: Instant::now(),
423409
}),
424410
_ => unimplemented!(),

neqo-bin/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use neqo_transport::{
2121
};
2222

2323
pub mod client;
24+
mod send_data;
2425
pub mod server;
2526
pub mod udp;
2627

neqo-bin/src/send_data.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4+
// option. This file may not be copied, modified, or distributed
5+
// except according to those terms.
6+
7+
use std::{borrow::Cow, cmp::min};
8+
9+
use crate::STREAM_IO_BUFFER_SIZE;
10+
11+
#[derive(Debug)]
12+
pub struct SendData {
13+
data: Cow<'static, [u8]>,
14+
offset: usize,
15+
remaining: usize,
16+
total: usize,
17+
}
18+
19+
impl From<&[u8]> for SendData {
20+
fn from(data: &[u8]) -> Self {
21+
Self::from(data.to_vec())
22+
}
23+
}
24+
25+
impl From<Vec<u8>> for SendData {
26+
fn from(data: Vec<u8>) -> Self {
27+
let remaining = data.len();
28+
Self {
29+
total: data.len(),
30+
data: Cow::Owned(data),
31+
offset: 0,
32+
remaining,
33+
}
34+
}
35+
}
36+
37+
impl From<&str> for SendData {
38+
fn from(data: &str) -> Self {
39+
Self::from(data.as_bytes())
40+
}
41+
}
42+
43+
impl SendData {
44+
pub const fn zeroes(total: usize) -> Self {
45+
const MESSAGE: &[u8] = &[0; STREAM_IO_BUFFER_SIZE];
46+
Self {
47+
data: Cow::Borrowed(MESSAGE),
48+
offset: 0,
49+
remaining: total,
50+
total,
51+
}
52+
}
53+
54+
fn slice(&self) -> &[u8] {
55+
let end = min(self.data.len(), self.offset + self.remaining);
56+
&self.data[self.offset..end]
57+
}
58+
59+
pub fn send(&mut self, mut f: impl FnMut(&[u8]) -> usize) -> bool {
60+
while self.remaining > 0 {
61+
match f(self.slice()) {
62+
0 => {
63+
return false;
64+
}
65+
sent => {
66+
self.remaining -= sent;
67+
self.offset = (self.offset + sent) % self.data.len();
68+
}
69+
}
70+
}
71+
72+
self.remaining == 0
73+
}
74+
75+
pub const fn len(&self) -> usize {
76+
self.total
77+
}
78+
}

neqo-bin/src/server/http09.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ use neqo_transport::{
1515
};
1616
use regex::Regex;
1717

18-
use super::{qns_read_response, Args, ResponseData};
19-
use crate::STREAM_IO_BUFFER_SIZE;
18+
use super::{qns_read_response, Args};
19+
use crate::{send_data::SendData, STREAM_IO_BUFFER_SIZE};
2020

2121
#[derive(Default)]
2222
struct HttpStreamState {
2323
writable: bool,
24-
data_to_send: Option<ResponseData>,
24+
data_to_send: Option<SendData>,
2525
}
2626

2727
pub struct HttpServer {
@@ -127,7 +127,7 @@ impl HttpServer {
127127
return;
128128
};
129129

130-
let resp: ResponseData = {
130+
let resp: SendData = {
131131
let path = path.as_str();
132132
qdebug!("Path = '{path}'");
133133
if self.is_qns_test {
@@ -140,7 +140,7 @@ impl HttpServer {
140140
}
141141
} else {
142142
let count = path.parse().unwrap();
143-
ResponseData::zeroes(count)
143+
SendData::zeroes(count)
144144
}
145145
};
146146

@@ -173,8 +173,8 @@ impl HttpServer {
173173

174174
stream_state.writable = true;
175175
if let Some(resp) = &mut stream_state.data_to_send {
176-
resp.send_h09(stream_id, conn);
177-
if resp.done() {
176+
let done = resp.send(|chunk| conn.borrow_mut().stream_send(stream_id, chunk).unwrap());
177+
if done {
178178
conn.borrow_mut().stream_close_send(stream_id).unwrap();
179179
self.write_state.remove(&stream_id);
180180
} else {

neqo-bin/src/server/http3.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ use neqo_http3::{
1919
};
2020
use neqo_transport::{server::ValidateAddress, ConnectionIdGenerator};
2121

22-
use super::{qns_read_response, Args, ResponseData};
22+
use super::{qns_read_response, Args};
23+
use crate::send_data::SendData;
2324

2425
pub struct HttpServer {
2526
server: Http3Server,
2627
/// Progress writing to each stream.
27-
remaining_data: HashMap<StreamId, ResponseData>,
28+
remaining_data: HashMap<StreamId, SendData>,
2829
posts: HashMap<Http3OrWebTransportStream, usize>,
2930
is_qns_test: bool,
3031
}
@@ -110,7 +111,7 @@ impl super::HttpServer for HttpServer {
110111

111112
let mut response = if self.is_qns_test {
112113
match qns_read_response(path.value()) {
113-
Ok(data) => ResponseData::from(data),
114+
Ok(data) => SendData::from(data),
114115
Err(e) => {
115116
qerror!("Failed to read {}: {e}", path.value());
116117
stream
@@ -123,19 +124,19 @@ impl super::HttpServer for HttpServer {
123124
} else if let Ok(count) =
124125
path.value().trim_matches(|p| p == '/').parse::<usize>()
125126
{
126-
ResponseData::zeroes(count)
127+
SendData::zeroes(count)
127128
} else {
128-
ResponseData::from(path.value())
129+
SendData::from(path.value())
129130
};
130131

131132
stream
132133
.send_headers(&[
133134
Header::new(":status", "200"),
134-
Header::new("content-length", response.remaining.to_string()),
135+
Header::new("content-length", response.len().to_string()),
135136
])
136137
.unwrap();
137-
response.send_h3(&stream);
138-
if response.done() {
138+
let done = response.send(|chunk| stream.send_data(chunk).unwrap());
139+
if done {
139140
stream.stream_close_send().unwrap();
140141
} else {
141142
self.remaining_data.insert(stream.stream_id(), response);
@@ -144,8 +145,8 @@ impl super::HttpServer for HttpServer {
144145
Http3ServerEvent::DataWritable { stream } => {
145146
if self.posts.get_mut(&stream).is_none() {
146147
if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) {
147-
remaining.send_h3(&stream);
148-
if remaining.done() {
148+
let done = remaining.send(|chunk| stream.send_data(chunk).unwrap());
149+
if done {
149150
self.remaining_data.remove(&stream.stream_id());
150151
stream.stream_close_send().unwrap();
151152
}

neqo-bin/src/server/mod.rs

Lines changed: 2 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
#![allow(clippy::future_not_send)]
88

99
use std::{
10-
borrow::Cow,
1110
cell::RefCell,
12-
cmp::min,
1311
fmt::{self, Display},
1412
fs, io,
1513
net::{SocketAddr, ToSocketAddrs},
@@ -30,11 +28,10 @@ use neqo_crypto::{
3028
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
3129
init_db, AntiReplay, Cipher,
3230
};
33-
use neqo_http3::{Http3OrWebTransportStream, StreamId};
34-
use neqo_transport::{server::ConnectionRef, Output, RandomConnectionIdGenerator, Version};
31+
use neqo_transport::{Output, RandomConnectionIdGenerator, Version};
3532
use tokio::time::Sleep;
3633

37-
use crate::{SharedArgs, STREAM_IO_BUFFER_SIZE};
34+
use crate::SharedArgs;
3835

3936
const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10);
4037

@@ -409,89 +406,3 @@ pub async fn server(mut args: Args) -> Res<()> {
409406
.run()
410407
.await
411408
}
412-
413-
#[derive(Debug)]
414-
struct ResponseData {
415-
data: Cow<'static, [u8]>,
416-
offset: usize,
417-
remaining: usize,
418-
}
419-
420-
impl From<&[u8]> for ResponseData {
421-
fn from(data: &[u8]) -> Self {
422-
Self::from(data.to_vec())
423-
}
424-
}
425-
426-
impl From<Vec<u8>> for ResponseData {
427-
fn from(data: Vec<u8>) -> Self {
428-
let remaining = data.len();
429-
Self {
430-
data: Cow::Owned(data),
431-
offset: 0,
432-
remaining,
433-
}
434-
}
435-
}
436-
437-
impl From<&str> for ResponseData {
438-
fn from(data: &str) -> Self {
439-
Self::from(data.as_bytes())
440-
}
441-
}
442-
443-
impl ResponseData {
444-
const fn zeroes(total: usize) -> Self {
445-
const MESSAGE: &[u8] = &[0; STREAM_IO_BUFFER_SIZE];
446-
Self {
447-
data: Cow::Borrowed(MESSAGE),
448-
offset: 0,
449-
remaining: total,
450-
}
451-
}
452-
453-
fn slice(&self) -> &[u8] {
454-
let end = min(self.data.len(), self.offset + self.remaining);
455-
&self.data[self.offset..end]
456-
}
457-
458-
fn send_h3(&mut self, stream: &Http3OrWebTransportStream) {
459-
while self.remaining > 0 {
460-
match stream.send_data(self.slice()) {
461-
Ok(0) => {
462-
return;
463-
}
464-
Ok(sent) => {
465-
self.remaining -= sent;
466-
self.offset = (self.offset + sent) % self.data.len();
467-
}
468-
Err(e) => {
469-
qwarn!("Error writing to stream {}: {:?}", stream, e);
470-
return;
471-
}
472-
}
473-
}
474-
}
475-
476-
fn send_h09(&mut self, stream_id: StreamId, conn: &ConnectionRef) {
477-
while self.remaining > 0 {
478-
match conn
479-
.borrow_mut()
480-
.stream_send(stream_id, self.slice())
481-
.unwrap()
482-
{
483-
0 => {
484-
return;
485-
}
486-
sent => {
487-
self.remaining -= sent;
488-
self.offset = (self.offset + sent) % self.data.len();
489-
}
490-
}
491-
}
492-
}
493-
494-
const fn done(&self) -> bool {
495-
self.remaining == 0
496-
}
497-
}

0 commit comments

Comments
 (0)