-
Notifications
You must be signed in to change notification settings - Fork 199
[http server] Batch requests #292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f047879
ead0c4e
dd70b49
3b6e47d
3e7d3c9
d45be62
2db1028
a229deb
361e6cf
baf2d08
c106710
ee40258
8b6e213
ec84408
f5212a6
1fea05e
8debdfc
af873d0
78f3b25
e2cd294
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,10 +39,15 @@ use hyper::{ | |
use jsonrpsee_types::error::{Error, GenericTransportError, RpcError}; | ||
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; | ||
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; | ||
use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error}; | ||
use jsonrpsee_utils::{ | ||
hyper_helpers::read_response_to_body, | ||
server::{send_error, RpcSender}, | ||
}; | ||
use serde::Serialize; | ||
use serde_json::value::RawValue; | ||
use socket2::{Domain, Socket, Type}; | ||
use std::{ | ||
cmp, | ||
net::{SocketAddr, TcpListener}, | ||
sync::Arc, | ||
}; | ||
|
@@ -153,6 +158,30 @@ impl Server { | |
Ok::<_, HyperError>(service_fn(move |request| { | ||
let methods = methods.clone(); | ||
let access_control = access_control.clone(); | ||
|
||
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in | ||
// the params from the request. The result of the computation is sent back over the `tx` channel and | ||
// the result(s) are collected into a `String` and sent back over the wire. | ||
let execute = | ||
move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { | ||
if let Some(method) = methods.get(method_name) { | ||
let params = RpcParams::new(params.map(|params| params.get())); | ||
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. | ||
if let Err(err) = (method)(id, params, &tx, 0) { | ||
log::error!( | ||
"execution of method call '{}' failed: {:?}, request id={:?}", | ||
method_name, | ||
err, | ||
id | ||
); | ||
} | ||
} else { | ||
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); | ||
} | ||
}; | ||
|
||
// Run some validation on the http request, then read the body and try to deserialize it into one of | ||
// two cases: a single RPC request or a batch of RPC requests. | ||
async move { | ||
if let Err(e) = access_control_is_valid(&access_control, &request) { | ||
return Ok::<_, HyperError>(e); | ||
|
@@ -175,31 +204,48 @@ impl Server { | |
|
||
// NOTE(niklasad1): it's a channel because it's needed for batch requests. | ||
let (tx, mut rx) = mpsc::unbounded(); | ||
// Is this a single request or a batch (or error)? | ||
let mut single = true; | ||
|
||
match serde_json::from_slice::<JsonRpcRequest>(&body) { | ||
Ok(req) => { | ||
log::debug!("recv: {:?}", req); | ||
let params = RpcParams::new(req.params.map(|params| params.get())); | ||
if let Some(method) = methods.get(&*req.method) { | ||
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. | ||
if let Err(err) = (method)(req.id, params, &tx, 0) { | ||
log::error!("method_call: {} failed: {:?}", req.method, err); | ||
} | ||
} else { | ||
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into()); | ||
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be | ||
dvdplm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged | ||
// enum here and have to try each case individually: first the single request case, then the | ||
// batch case and lastly the error. For the worst case – unparseable input – we make three calls | ||
// to [`serde_json::from_slice`] which is pretty annoying. | ||
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296). | ||
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = | ||
serde_json::from_slice::<JsonRpcRequest>(&body) | ||
{ | ||
execute(id, &tx, &method_name, params); | ||
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) { | ||
if !batch.is_empty() { | ||
single = false; | ||
for JsonRpcRequest { id, method: method_name, params, .. } in batch { | ||
execute(id, &tx, &method_name, params); | ||
} | ||
} else { | ||
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into()); | ||
} | ||
Err(_e) => { | ||
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) { | ||
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), | ||
Err(_) => (None, JsonRpcErrorCode::ParseError), | ||
}; | ||
send_error(id, &tx, code.into()); | ||
} | ||
} else { | ||
log::error!( | ||
"[service_fn], Cannot parse request body={:?}", | ||
String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)]) | ||
); | ||
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) { | ||
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), | ||
Err(_) => (None, JsonRpcErrorCode::ParseError), | ||
}; | ||
send_error(id, &tx, code.into()); | ||
} | ||
// Closes the receiving half of a channel without dropping it. This prevents any further | ||
// messages from being sent on the channel. | ||
rx.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose this is performed to mark to that we are done sending any further messages?! I think it deserves a comment because it was not straightforward to me at least There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes exactly, I read this in the docs:
I don't know if it's necessary to close the channel in this case but thought maybe it's a "best practice" to do so and if we ever decide to execute batch requests on separate tasks (and threads) with deadlines it's good to make sure the channel can't be written to. I'll add a comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, I think so because otherwise you have to drop the sender to close the channel. Because we have control of over both the sender and receiver (the sender is just borrowed by the call closure) |
||
let response = if single { | ||
rx.next().await.expect("Sender is still alive managed by us above; qed") | ||
} else { | ||
collect_batch_responses(rx).await | ||
}; | ||
|
||
let response = rx.next().await.expect("Sender is still alive managed by us above; qed"); | ||
log::debug!("send: {:?}", response); | ||
log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); | ||
Ok::<_, HyperError>(response::ok_response(response)) | ||
} | ||
})) | ||
|
@@ -211,6 +257,24 @@ impl Server { | |
} | ||
} | ||
|
||
// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in | ||
// `[`/`]`. | ||
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String { | ||
let mut buf = String::with_capacity(2048); | ||
buf.push('['); | ||
let mut buf = rx | ||
.fold(buf, |mut acc, response| async { | ||
acc = [acc, response].concat(); | ||
acc.push(','); | ||
acc | ||
}) | ||
.await; | ||
// Remove trailing comma | ||
buf.pop(); | ||
buf.push(']'); | ||
buf | ||
} | ||
|
||
// Checks to that access control of the received request is the same as configured. | ||
fn access_control_is_valid( | ||
access_control: &AccessControl, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,18 @@ async fn single_method_call_works() { | |
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn invalid_single_method_call() { | ||
let _ = env_logger::try_init(); | ||
let addr = server().await; | ||
let uri = to_http_uri(addr); | ||
|
||
let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#; | ||
let response = http_request(req.into(), uri.clone()).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
assert_eq!(response.body, invalid_request(Id::Null)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn single_method_call_with_params() { | ||
let addr = server().await; | ||
|
@@ -50,6 +62,81 @@ async fn single_method_call_with_params() { | |
assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1))); | ||
} | ||
|
||
#[tokio::test] | ||
async fn valid_batched_method_calls() { | ||
let _ = env_logger::try_init(); | ||
|
||
let addr = server().await; | ||
let uri = to_http_uri(addr); | ||
|
||
let req = r#"[ | ||
{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}, | ||
{"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2}, | ||
{"jsonrpc":"2.0","method":"say_hello","id":3}, | ||
{"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4} | ||
]"#; | ||
let response = http_request(req.into(), uri).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
assert_eq!( | ||
response.body, | ||
r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"# | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn batched_notifications() { | ||
let _ = env_logger::try_init(); | ||
|
||
let addr = server().await; | ||
let uri = to_http_uri(addr); | ||
|
||
let req = r#"[ | ||
{"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]}, | ||
{"jsonrpc": "2.0", "method": "notif", "params": [7]} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this probably because |
||
]"#; | ||
let response = http_request(req.into(), uri).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
// Note: this is *not* according to spec. Response should be the empty string, `""`. | ||
assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#); | ||
} | ||
|
||
#[tokio::test] | ||
async fn invalid_batched_method_calls() { | ||
let _ = env_logger::try_init(); | ||
|
||
let addr = server().await; | ||
let uri = to_http_uri(addr); | ||
|
||
// batch with no requests | ||
let req = r#"[]"#; | ||
let response = http_request(req.into(), uri.clone()).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
assert_eq!(response.body, invalid_request(Id::Null)); | ||
|
||
// batch with invalid request | ||
let req = r#"[123]"#; | ||
let response = http_request(req.into(), uri.clone()).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
// Note: according to the spec the `id` should be `null` here, not 123. | ||
assert_eq!(response.body, invalid_request(Id::Num(123))); | ||
|
||
// batch with invalid request | ||
let req = r#"[1, 2, 3]"#; | ||
let response = http_request(req.into(), uri.clone()).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
// Note: according to the spec this should return an array of three `Invalid Request`s | ||
assert_eq!(response.body, parse_error(Id::Null)); | ||
|
||
// invalid JSON in batch | ||
let req = r#"[ | ||
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, | ||
{"jsonrpc": "2.0", "method" | ||
]"#; | ||
let response = http_request(req.into(), uri.clone()).await.unwrap(); | ||
assert_eq!(response.status, StatusCode::OK); | ||
assert_eq!(response.body, parse_error(Id::Null)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn should_return_method_not_found() { | ||
let addr = server().await; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make this a standalone function? the
start
fn is already fairly long, i think it would be good to move some code outThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, and I agree, and I tried but couldn't make it work; I'd have to look up the
method
from themethods
hash map instart()
and pass it along so it didn't really shorten up the code much.Lately I've started to shift my views on when it's right to refactor for briefness. The rule of thumb is still "terse is good", but for some cases I've started to think it's alright to keep the code long when it's "the main loop", whatever that means for each specific case, i.e. the most important thing that a given program does. Splitting things up too much can force the reader to jump around more than is ideal and even if it's long it is sometimes more readable to keep it all in one place. I know this is hand wavey (and for this case here I wanted to do exactly what you suggest), but yeah, just wanted to share those thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mainly because of the closure that is passed to hyper but yeah this changes made the code really quite hard to read, it wasn't great before either.
Maybe we split it the response handling to helper functions, basically you have to read the bytes here to take ownership over it which used to later to build a message to send to the background task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I found this more readable but it's still quite long ^^
(maybe because I wrote it lol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K, I tried another approach, moving
execute
to be a method onServer
so I can accessself.root
(i.e. the function pointers we call to run the request), but it doesn't work becauseself
is moved into the closure when callingmake_service_fn()
.On the client side it's a bit easier because we don't have to call anything. Or I'm too limited to see how to do it! Pointers welcome!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just add another parameter,
&Methods
(we'd have to import methods from jsonrpsee_util) to the execute function so execute becomes:and then call it like
execute(&methods, id, &tx, &method_name, params);
I guess the tradeoff is having a long function signature for
execute
, though.I think I agree with your analysis of
main_loops
though. Often it can be hard to make them shorter and would result in more confusing code than just keeping it all together. In this case I thinkexecute
fn is self-explanatory enough in that it just executes the right function for the rpc call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@insipx I could have sworn I tried that; your version works too. I can go either way here, @niklasad1 thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW the performance of @insipx's version is identical.