Skip to content

Commit 825cd03

Browse files
committed
Merge branch 'improve-error-handling'
2 parents 717229d + 8c8b03f commit 825cd03

File tree

2 files changed

+59
-16
lines changed

2 files changed

+59
-16
lines changed

http/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,12 @@ impl HttpTransport {
229229
Ok((mut core, request_tx, future)) => {
230230
tx.send(Ok(HttpTransport::new_internal(request_tx)))
231231
.unwrap();
232-
let _ = core.run(future);
232+
if let Err(_) = core.run(future) {
233+
error!("JSON-RPC processing thread had an error");
234+
}
235+
debug!("Standalone HttpTransport thread exiting");
233236
}
234237
}
235-
debug!("Standalone HttpTransport thread exiting");
236238
});
237239

238240
rx.recv().unwrap()
@@ -291,6 +293,7 @@ fn create_request_processing_future<CC: hyper::client::Connect>(
291293
client: Client<CC, hyper::Body>,
292294
) -> Box<Future<Item = (), Error = ()>> {
293295
let f = request_rx.for_each(move |(request, response_tx)| {
296+
trace!("Sending request to {}", request.uri());
294297
client
295298
.request(request)
296299
.from_err()
@@ -306,10 +309,10 @@ fn create_request_processing_future<CC: hyper::client::Connect>(
306309
})
307310
.map(|response_chunk| response_chunk.to_vec())
308311
.then(move |response_result| {
309-
response_tx.send(response_result).map_err(|_| {
312+
if let Err(_) = response_tx.send(response_result) {
310313
warn!("Unable to send response back to caller");
311-
()
312-
})
314+
}
315+
Ok(())
313316
})
314317
});
315318
Box::new(f) as Box<Future<Item = (), Error = ()>>

http/tests/localhost.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,49 @@ extern crate jsonrpc_http_server;
2020
extern crate jsonrpc_macros;
2121

2222
use futures::Future;
23-
23+
use futures::future::Either;
2424
use jsonrpc_client_http::HttpTransport;
2525
use jsonrpc_core::{Error, IoHandler};
2626
use jsonrpc_http_server::ServerBuilder;
27+
use std::time::Duration;
28+
use tokio_core::reactor::{Core, Timeout};
2729

2830

2931
// Generate server API trait. Actual implementation at bottom of file.
3032
build_rpc_trait! {
3133
pub trait ServerApi {
3234
#[rpc(name = "to_upper")]
3335
fn to_upper(&self, String) -> Result<String, Error>;
36+
37+
#[rpc(name = "sleep")]
38+
fn sleep(&self, u64) -> Result<(), Error>;
3439
}
3540
}
3641

3742
// Generate client struct with same API as server.
38-
jsonrpc_client!(pub struct ToUpperClient {
43+
jsonrpc_client!(pub struct TestClient {
3944
pub fn to_upper(&mut self, string: &str) -> RpcRequest<String>;
45+
pub fn sleep(&mut self, time: u64) -> RpcRequest<()>;
4046
});
4147

4248

4349
#[test]
4450
fn localhost_ping_pong() {
45-
env_logger::init().unwrap();
51+
let _ = env_logger::init();
4652

4753
// Spawn a server hosting the `ServerApi` API.
48-
let server = spawn_server();
49-
50-
let uri = format!("http://{}", server.address());
54+
let (_server, uri) = spawn_server();
5155
println!("Testing towards server at {}", uri);
5256

5357
// Create the Tokio Core event loop that will drive the RPC client and the async requests.
54-
let mut core = tokio_core::reactor::Core::new().unwrap();
58+
let mut core = Core::new().unwrap();
5559

5660
// Create the HTTP transport handle and create a RPC client with that handle.
5761
let transport = HttpTransport::shared(&core.handle())
5862
.unwrap()
5963
.handle(&uri)
6064
.unwrap();
61-
let mut client = ToUpperClient::new(transport);
65+
let mut client = TestClient::new(transport);
6266

6367
// Just calling the method gives back a `RpcRequest`, which is a future
6468
// that can be used to execute the actual RPC call.
@@ -73,6 +77,34 @@ fn localhost_ping_pong() {
7377
assert_eq!("FOOBAR", result2);
7478
}
7579

80+
#[test]
81+
fn dropped_rpc_request_should_not_crash_transport() {
82+
let _ = env_logger::init();
83+
84+
let (_server, uri) = spawn_server();
85+
86+
let mut core = Core::new().unwrap();
87+
let transport = HttpTransport::shared(&core.handle())
88+
.unwrap()
89+
.handle(&uri)
90+
.unwrap();
91+
let mut client = TestClient::new(transport);
92+
93+
let rpc = client.sleep(5).map_err(|e| e.to_string());
94+
let timeout = Timeout::new(Duration::from_millis(100), &core.handle()).unwrap();
95+
match core.run(rpc.select2(timeout.map_err(|e| e.to_string()))) {
96+
Ok(Either::B(((), _rpc))) => (),
97+
_ => panic!("The timeout did not finish first"),
98+
}
99+
100+
// Now, sending a second request should still work. This is a regression test catching a
101+
// previous error where a dropped `RpcRequest` would crash the future running on the event loop.
102+
match core.run(client.sleep(0)) {
103+
Ok(()) => (),
104+
_ => panic!("Sleep did not return as it should"),
105+
}
106+
}
107+
76108

77109
/// Simple struct that will implement the RPC API defined at the top of this file.
78110
struct Server;
@@ -81,14 +113,22 @@ impl ServerApi for Server {
81113
fn to_upper(&self, s: String) -> Result<String, Error> {
82114
Ok(s.to_uppercase())
83115
}
116+
117+
fn sleep(&self, time: u64) -> Result<(), Error> {
118+
println!("Sleeping on server");
119+
::std::thread::sleep(Duration::from_secs(time));
120+
Ok(())
121+
}
84122
}
85123

86-
fn spawn_server() -> jsonrpc_http_server::Server {
124+
fn spawn_server() -> (jsonrpc_http_server::Server, String) {
87125
let server = Server;
88126
let mut io = IoHandler::new();
89127
io.extend_with(server.to_delegate());
90128

91-
ServerBuilder::new(io)
129+
let server = ServerBuilder::new(io)
92130
.start_http(&"127.0.0.1:0".parse().unwrap())
93-
.unwrap()
131+
.unwrap();
132+
let uri = format!("http://{}", server.address());
133+
(server, uri)
94134
}

0 commit comments

Comments
 (0)