Skip to content

Commit c92a5d4

Browse files
pinkisemilsTyera Eulberg
authored andcommitted
Ensure IPC server can handle requests concurrently.
1 parent 29e8776 commit c92a5d4

File tree

1 file changed

+35
-24
lines changed

1 file changed

+35
-24
lines changed

ipc/src/server.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use tokio_service::{self, Service as TokioService};
77
use jsonrpc::futures::{future, Future, Stream, Sink};
88
use jsonrpc::futures::sync::{mpsc, oneshot};
9-
use jsonrpc::{FutureResult, Metadata, MetaIoHandler, Middleware, NoopMiddleware};
9+
use jsonrpc::{Metadata, MetaIoHandler, Middleware, NoopMiddleware};
1010
use server_utils::{
1111
tokio_codec::Framed,
1212
tokio::{self, runtime::TaskExecutor, reactor::Handle},
@@ -36,13 +36,25 @@ impl<M: Metadata, S: Middleware<M>> tokio_service::Service for Service<M, S> {
3636
type Request = String;
3737
type Response = Option<String>;
3838

39-
type Error = ();
39+
type Error = std::io::Error;
4040

41-
type Future = FutureResult<S::Future>;
41+
type Future = Box<Future<Item=Self::Response,Error=Self::Error> + Send>;
4242

4343
fn call(&self, req: Self::Request) -> Self::Future {
4444
trace!(target: "ipc", "Received request: {}", req);
45-
self.handler.handle_request(&req, self.meta.clone())
45+
let fut = self.handler
46+
.handle_request(&req, self.meta.clone())
47+
.then(|result| {
48+
match result {
49+
Err(_) => {
50+
future::ok(None)
51+
}
52+
Ok(some_result) => future::ok(some_result),
53+
}
54+
})
55+
.map_err(|_:()| std::io::ErrorKind::Other.into());
56+
57+
Box::new(fut)
4658
}
4759
}
4860

@@ -55,6 +67,7 @@ pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
5567
incoming_separator: codecs::Separator,
5668
outgoing_separator: codecs::Separator,
5769
security_attributes: SecurityAttributes,
70+
client_buffer_size: usize,
5871
}
5972

6073
impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S> {
@@ -80,6 +93,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
8093
incoming_separator: codecs::Separator::Empty,
8194
outgoing_separator: codecs::Separator::default(),
8295
security_attributes: SecurityAttributes::empty(),
96+
client_buffer_size: 5,
8397
}
8498
}
8599

@@ -116,6 +130,12 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
116130
self
117131
}
118132

133+
/// Sets how many concurrent requests per client can be processed at any one time. Set to 5 by default.
134+
pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
135+
self.client_buffer_size = buffer_size;
136+
self
137+
}
138+
119139
/// Creates a new server from the given endpoint.
120140
pub fn start(self, path: &str) -> std::io::Result<Server> {
121141
let executor = self.executor.initialize()?;
@@ -129,6 +149,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
129149
let (start_signal, start_receiver) = oneshot::channel();
130150
let (wait_signal, wait_receiver) = oneshot::channel();
131151
let security_attributes = self.security_attributes;
152+
let client_buffer_size = self.client_buffer_size;
132153

133154
executor.spawn(future::lazy(move || {
134155
let mut endpoint = Endpoint::new(endpoint_addr);
@@ -173,27 +194,17 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
173194
outgoing_separator.clone(),
174195
),
175196
).split();
176-
let responses = reader.and_then(move |req| {
177-
service.call(req).then(move |response| match response {
178-
Err(e) => {
179-
warn!(target: "ipc", "Error while processing request: {:?}", e);
180-
future::ok(None)
181-
},
182-
Ok(None) => {
183-
future::ok(None)
184-
},
185-
Ok(Some(response_data)) => {
186-
trace!(target: "ipc", "Sent response: {}", &response_data);
187-
future::ok(Some(response_data))
188-
}
197+
let responses = reader
198+
.map(move |req| {
199+
service.call(req)
189200
})
190-
})
191-
.filter_map(|x| x)
192-
// we use `select_with_weak` here, instead of `select`, to close the stream
193-
// as soon as the ipc pipe is closed
194-
.select_with_weak(receiver.map_err(|e| {
195-
warn!(target: "ipc", "Notification error: {:?}", e);
196-
std::io::ErrorKind::Other.into()
201+
.buffer_unordered(client_buffer_size)
202+
.filter_map(|x| x)
203+
// we use `select_with_weak` here, instead of `select`, to close the stream
204+
// as soon as the ipc pipe is closed
205+
.select_with_weak(receiver.map_err(|e| {
206+
warn!(target: "ipc", "Notification error: {:?}", e);
207+
std::io::ErrorKind::Other.into()
197208
}));
198209

199210
let writer = writer.send_all(responses).then(move |_| {

0 commit comments

Comments
 (0)