Skip to content

Commit 527e25a

Browse files
seunlanlegeTyera Eulberg
authored andcommitted
Augmented tokio::net::Incoming with Suspendable stream (paritytech#321)
* Augmented tokio::net::Incoming with Suspendable stream * exponential delay increase, removed duplicate polling logic * exempt connection errors from timeout * reset timer * performance improvements, added concise documentation * reset initial_delay to 10ms
1 parent 2dc39d6 commit 527e25a

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

http/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use server_utils::reactor::{Executor, UninitializedExecutor};
5353

5454
pub use server_utils::hosts::{Host, DomainsValidation};
5555
pub use server_utils::cors::{self, AccessControlAllowOrigin, Origin, AllowCors};
56-
pub use server_utils::tokio;
56+
pub use server_utils::{tokio, SuspendableStream};
5757
pub use handler::ServerHandler;
5858
pub use utils::{is_host_allowed, cors_allow_origin, cors_allow_headers};
5959
pub use response::Response;
@@ -503,8 +503,9 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
503503

504504
let mut http = server::conn::Http::new();
505505
http.keep_alive(keep_alive);
506+
let tcp_stream = SuspendableStream::new(listener.incoming());
506507

507-
listener.incoming()
508+
tcp_stream
508509
.for_each(move |socket| {
509510
let service = ServerHandler::new(
510511
jsonrpc_handler.clone(),

server-utils/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ pub mod session;
2222
pub mod reactor;
2323
mod matcher;
2424
mod stream_codec;
25+
mod suspendable_stream;
2526

27+
pub use suspendable_stream::SuspendableStream;
2628
pub use matcher::Pattern;
2729

2830
/// Codecs utilities
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use std::time::{Duration, Instant};
2+
use tokio::timer::Delay;
3+
use std::io;
4+
use tokio::prelude::*;
5+
6+
/// `Incoming` is a stream of incoming sockets
7+
/// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit)
8+
/// we use for_each combinator which:
9+
/// 1. Runs for every Ok(socket)
10+
/// 2. Stops on the FIRST Err()
11+
/// So any temporary io::Error will cause the entire server to terminate.
12+
/// This wrapper type for tokio::Incoming stops accepting new connections
13+
/// for a specified amount of time once an io::Error is encountered
14+
pub struct SuspendableStream<S> {
15+
stream: S,
16+
next_delay: Duration,
17+
initial_delay: Duration,
18+
max_delay: Duration,
19+
timeout: Option<Delay>,
20+
}
21+
22+
impl<S> SuspendableStream<S> {
23+
/// construct a new Suspendable stream, given tokio::Incoming
24+
/// and the amount of time to pause for.
25+
pub fn new(stream: S) -> Self {
26+
SuspendableStream {
27+
stream,
28+
next_delay: Duration::from_millis(20),
29+
initial_delay: Duration::from_millis(10),
30+
max_delay: Duration::from_secs(5),
31+
timeout: None,
32+
}
33+
}
34+
}
35+
36+
impl<S, I> Stream for SuspendableStream<S>
37+
where S: Stream<Item=I, Error=io::Error>
38+
{
39+
type Item = I;
40+
type Error = ();
41+
42+
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, ()> {
43+
if let Some(mut timeout) = self.timeout.take() {
44+
match timeout.poll() {
45+
Ok(Async::Ready(_)) => {}
46+
Ok(Async::NotReady) => {
47+
self.timeout = Some(timeout);
48+
return Ok(Async::NotReady);
49+
}
50+
Err(_) => unreachable!("Polling a delay shouldn't yield any errors; qed")
51+
}
52+
}
53+
54+
loop {
55+
match self.stream.poll() {
56+
Ok(item) => {
57+
if self.next_delay > self.initial_delay {
58+
self.next_delay = self.initial_delay;
59+
}
60+
return Ok(item)
61+
},
62+
Err(ref e) => if connection_error(e) {
63+
warn!("Connection Error: {:?}", e);
64+
continue
65+
}
66+
Err(err) => {
67+
self.next_delay = if self.next_delay < self.max_delay {
68+
self.next_delay * 2
69+
} else {
70+
self.next_delay
71+
};
72+
warn!("Error accepting connection: {}", err);
73+
warn!("The server will stop accepting connections for {:?}", self.next_delay);
74+
self.timeout = Some(Delay::new(Instant::now() + self.next_delay));
75+
}
76+
}
77+
}
78+
}
79+
}
80+
81+
82+
/// assert that the error was a connection error
83+
fn connection_error(e: &io::Error) -> bool {
84+
e.kind() == io::ErrorKind::ConnectionRefused ||
85+
e.kind() == io::ErrorKind::ConnectionAborted ||
86+
e.kind() == io::ErrorKind::ConnectionReset
87+
}
88+

tcp/examples/tcp.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
extern crate solana_jsonrpc_tcp_server as jsonrpc_tcp_server;
2+
extern crate env_logger;
23

34
use jsonrpc_tcp_server::ServerBuilder;
45
use jsonrpc_tcp_server::jsonrpc_core::*;
56

67
fn main() {
8+
env_logger::init();
79
let mut io = IoHandler::default();
810
io.add_method("say_hello", |_params| {
911
println!("Processing");

tcp/src/server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use jsonrpc::futures::sync::{mpsc, oneshot};
1010
use server_utils::{
1111
tokio_codec::Framed,
1212
tokio, reactor, codecs,
13+
SuspendableStream
1314
};
1415

1516
use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
@@ -86,7 +87,7 @@ impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
8687
executor.spawn(future::lazy(move || {
8788
let start = move || {
8889
let listener = tokio::net::TcpListener::bind(&address)?;
89-
let connections = listener.incoming();
90+
let connections = SuspendableStream::new(listener.incoming());
9091

9192
let server = connections.for_each(move |socket| {
9293
let peer_addr = socket.peer_addr().expect("Unable to determine socket peer address");
@@ -152,7 +153,7 @@ impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
152153
Ok(server)
153154
};
154155

155-
let stop = stop_rx.map_err(|_| std::io::ErrorKind::Interrupted.into());
156+
let stop = stop_rx.map_err(|_| ());
156157
match start() {
157158
Ok(server) => {
158159
tx.send(Ok(())).expect("Rx is blocking parent thread.");

0 commit comments

Comments
 (0)