Skip to content

Commit 2cae10b

Browse files
dvdplminsipx
andauthored
[http server] Batch requests (#292)
* WIP * Implement draft of batch requests * fmt * cleanup * Explain why we don't use an untagged enum * Avoid allocating a Vec for single requets * Add comment * Add a benchmark for batch requests * Add more tests, noting where we diverge from the spec Fix empty batch case, i.e. `[]` * Obey the fmt * Update benches/bench.rs Co-authored-by: Andrew Plaza <aplaza@liquidthink.net> * Update http-server/src/server.rs Co-authored-by: Andrew Plaza <aplaza@liquidthink.net> * Add link to issue * Explain why we're closing the receiving end of the channel. * Limit logging of requests and response to 1kb Add more comments Factor out batch response collection * Wrap comment * tweak log line * Benchmark batch request over different batch sizes * fmt Co-authored-by: Andrew Plaza <aplaza@liquidthink.net>
1 parent 11ac030 commit 2cae10b

File tree

4 files changed

+195
-24
lines changed

4 files changed

+195
-24
lines changed

benches/bench.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::runtime::Runtime as TokioRuntime;
1313

1414
mod helpers;
1515

16-
criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2);
16+
criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2);
1717
criterion_main!(benches);
1818

1919
fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String {
@@ -47,6 +47,13 @@ pub fn http_requests(crit: &mut Criterion) {
4747
run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip");
4848
}
4949

50+
pub fn batched_http_requests(crit: &mut Criterion) {
51+
let rt = TokioRuntime::new().unwrap();
52+
let url = rt.block_on(helpers::http_server());
53+
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
54+
run_round_trip_with_batch(&rt, crit, client.clone(), "http batch requests");
55+
}
56+
5057
pub fn websocket_requests(crit: &mut Criterion) {
5158
let rt = TokioRuntime::new().unwrap();
5259
let url = rt.block_on(helpers::ws_server());
@@ -66,6 +73,19 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Clie
6673
});
6774
}
6875

76+
/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
77+
fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
78+
let mut group = crit.benchmark_group(name);
79+
for batch_size in [2, 5, 10, 50, 100usize].iter() {
80+
let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size];
81+
group.throughput(Throughput::Elements(*batch_size as u64));
82+
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
83+
b.iter(|| rt.block_on(async { client.batch_request::<String>(batch.clone()).await.unwrap() }))
84+
});
85+
}
86+
group.finish();
87+
}
88+
6989
fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
7090
rt: &TokioRuntime,
7191
crit: &mut Criterion,

http-server/src/server.rs

Lines changed: 86 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,15 @@ use hyper::{
3939
use jsonrpsee_types::error::{Error, GenericTransportError};
4040
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
4141
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
42-
use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error};
42+
use jsonrpsee_utils::{
43+
hyper_helpers::read_response_to_body,
44+
server::{send_error, RpcSender},
45+
};
4346
use serde::Serialize;
47+
use serde_json::value::RawValue;
4448
use socket2::{Domain, Socket, Type};
4549
use std::{
50+
cmp,
4651
net::{SocketAddr, TcpListener},
4752
sync::Arc,
4853
};
@@ -153,6 +158,30 @@ impl Server {
153158
Ok::<_, HyperError>(service_fn(move |request| {
154159
let methods = methods.clone();
155160
let access_control = access_control.clone();
161+
162+
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
163+
// the params from the request. The result of the computation is sent back over the `tx` channel and
164+
// the result(s) are collected into a `String` and sent back over the wire.
165+
let execute =
166+
move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| {
167+
if let Some(method) = methods.get(method_name) {
168+
let params = RpcParams::new(params.map(|params| params.get()));
169+
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
170+
if let Err(err) = (method)(id, params, &tx, 0) {
171+
log::error!(
172+
"execution of method call '{}' failed: {:?}, request id={:?}",
173+
method_name,
174+
err,
175+
id
176+
);
177+
}
178+
} else {
179+
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
180+
}
181+
};
182+
183+
// Run some validation on the http request, then read the body and try to deserialize it into one of
184+
// two cases: a single RPC request or a batch of RPC requests.
156185
async move {
157186
if let Err(e) = access_control_is_valid(&access_control, &request) {
158187
return Ok::<_, HyperError>(e);
@@ -175,31 +204,48 @@ impl Server {
175204

176205
// NOTE(niklasad1): it's a channel because it's needed for batch requests.
177206
let (tx, mut rx) = mpsc::unbounded();
207+
// Is this a single request or a batch (or error)?
208+
let mut single = true;
178209

179-
match serde_json::from_slice::<JsonRpcRequest>(&body) {
180-
Ok(req) => {
181-
log::debug!("recv: {:?}", req);
182-
let params = RpcParams::new(req.params.map(|params| params.get()));
183-
if let Some(method) = methods.get(&*req.method) {
184-
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
185-
if let Err(err) = (method)(req.id, params, &tx, 0) {
186-
log::error!("method_call: {} failed: {:?}", req.method, err);
187-
}
188-
} else {
189-
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
210+
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be
211+
// used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged
212+
// enum here and have to try each case individually: first the single request case, then the
213+
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
214+
// to [`serde_json::from_slice`] which is pretty annoying.
215+
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
216+
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) =
217+
serde_json::from_slice::<JsonRpcRequest>(&body)
218+
{
219+
execute(id, &tx, &method_name, params);
220+
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
221+
if !batch.is_empty() {
222+
single = false;
223+
for JsonRpcRequest { id, method: method_name, params, .. } in batch {
224+
execute(id, &tx, &method_name, params);
190225
}
226+
} else {
227+
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
191228
}
192-
Err(_e) => {
193-
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
194-
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
195-
Err(_) => (None, JsonRpcErrorCode::ParseError),
196-
};
197-
send_error(id, &tx, code.into());
198-
}
229+
} else {
230+
log::error!(
231+
"[service_fn], Cannot parse request body={:?}",
232+
String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)])
233+
);
234+
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
235+
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
236+
Err(_) => (None, JsonRpcErrorCode::ParseError),
237+
};
238+
send_error(id, &tx, code.into());
239+
}
240+
// Closes the receiving half of a channel without dropping it. This prevents any further
241+
// messages from being sent on the channel.
242+
rx.close();
243+
let response = if single {
244+
rx.next().await.expect("Sender is still alive managed by us above; qed")
245+
} else {
246+
collect_batch_responses(rx).await
199247
};
200-
201-
let response = rx.next().await.expect("Sender is still alive managed by us above; qed");
202-
log::debug!("send: {:?}", response);
248+
log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
203249
Ok::<_, HyperError>(response::ok_response(response))
204250
}
205251
}))
@@ -211,6 +257,24 @@ impl Server {
211257
}
212258
}
213259

260+
// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in
261+
// `[`/`]`.
262+
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
263+
let mut buf = String::with_capacity(2048);
264+
buf.push('[');
265+
let mut buf = rx
266+
.fold(buf, |mut acc, response| async {
267+
acc = [acc, response].concat();
268+
acc.push(',');
269+
acc
270+
})
271+
.await;
272+
// Remove trailing comma
273+
buf.pop();
274+
buf.push(']');
275+
buf
276+
}
277+
214278
// Checks to that access control of the received request is the same as configured.
215279
fn access_control_is_valid(
216280
access_control: &AccessControl,

http-server/src/tests.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ async fn single_method_call_works() {
3737
}
3838
}
3939

40+
#[tokio::test]
41+
async fn invalid_single_method_call() {
42+
let _ = env_logger::try_init();
43+
let addr = server().await;
44+
let uri = to_http_uri(addr);
45+
46+
let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#;
47+
let response = http_request(req.into(), uri.clone()).await.unwrap();
48+
assert_eq!(response.status, StatusCode::OK);
49+
assert_eq!(response.body, invalid_request(Id::Null));
50+
}
51+
4052
#[tokio::test]
4153
async fn single_method_call_with_params() {
4254
let addr = server().await;
@@ -50,6 +62,81 @@ async fn single_method_call_with_params() {
5062
assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1)));
5163
}
5264

65+
#[tokio::test]
66+
async fn valid_batched_method_calls() {
67+
let _ = env_logger::try_init();
68+
69+
let addr = server().await;
70+
let uri = to_http_uri(addr);
71+
72+
let req = r#"[
73+
{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1},
74+
{"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2},
75+
{"jsonrpc":"2.0","method":"say_hello","id":3},
76+
{"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4}
77+
]"#;
78+
let response = http_request(req.into(), uri).await.unwrap();
79+
assert_eq!(response.status, StatusCode::OK);
80+
assert_eq!(
81+
response.body,
82+
r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"#
83+
);
84+
}
85+
86+
#[tokio::test]
87+
async fn batched_notifications() {
88+
let _ = env_logger::try_init();
89+
90+
let addr = server().await;
91+
let uri = to_http_uri(addr);
92+
93+
let req = r#"[
94+
{"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]},
95+
{"jsonrpc": "2.0", "method": "notif", "params": [7]}
96+
]"#;
97+
let response = http_request(req.into(), uri).await.unwrap();
98+
assert_eq!(response.status, StatusCode::OK);
99+
// Note: this is *not* according to spec. Response should be the empty string, `""`.
100+
assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#);
101+
}
102+
103+
#[tokio::test]
104+
async fn invalid_batched_method_calls() {
105+
let _ = env_logger::try_init();
106+
107+
let addr = server().await;
108+
let uri = to_http_uri(addr);
109+
110+
// batch with no requests
111+
let req = r#"[]"#;
112+
let response = http_request(req.into(), uri.clone()).await.unwrap();
113+
assert_eq!(response.status, StatusCode::OK);
114+
assert_eq!(response.body, invalid_request(Id::Null));
115+
116+
// batch with invalid request
117+
let req = r#"[123]"#;
118+
let response = http_request(req.into(), uri.clone()).await.unwrap();
119+
assert_eq!(response.status, StatusCode::OK);
120+
// Note: according to the spec the `id` should be `null` here, not 123.
121+
assert_eq!(response.body, invalid_request(Id::Num(123)));
122+
123+
// batch with invalid request
124+
let req = r#"[1, 2, 3]"#;
125+
let response = http_request(req.into(), uri.clone()).await.unwrap();
126+
assert_eq!(response.status, StatusCode::OK);
127+
// Note: according to the spec this should return an array of three `Invalid Request`s
128+
assert_eq!(response.body, parse_error(Id::Null));
129+
130+
// invalid JSON in batch
131+
let req = r#"[
132+
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
133+
{"jsonrpc": "2.0", "method"
134+
]"#;
135+
let response = http_request(req.into(), uri.clone()).await.unwrap();
136+
assert_eq!(response.status, StatusCode::OK);
137+
assert_eq!(response.body, parse_error(Id::Null));
138+
}
139+
53140
#[tokio::test]
54141
async fn should_return_method_not_found() {
55142
let addr = server().await;

types/src/v2/params.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl<'a> RpcParams<'a> {
102102
/// If your type implement `Into<JsonValue>` call that favor of `serde_json::to:value` to
103103
/// construct the parameters. Because `serde_json::to_value` serializes the type which
104104
/// allocates whereas `Into<JsonValue>` doesn't in most cases.
105-
#[derive(Serialize, Debug)]
105+
#[derive(Serialize, Debug, Clone)]
106106
#[serde(untagged)]
107107
pub enum JsonRpcParams<'a> {
108108
/// No params.

0 commit comments

Comments
 (0)