Skip to content

Commit 0bf5835

Browse files
tomusdrwTyera Eulberg
authored andcommitted
Call middleware (paritytech#318)
* Avoid Boxing in middleware. * Add on_call to the middleware. * Properly workaround FnOnce in the on_drop interface. * Re-export separator from tcp server. * Fix windows build again.
1 parent bbd67ff commit 0bf5835

File tree

15 files changed

+159
-89
lines changed

15 files changed

+159
-89
lines changed

core/examples/middlewares.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ impl Metadata for Meta {}
1414
struct MyMiddleware(AtomicUsize);
1515
impl Middleware<Meta> for MyMiddleware {
1616
type Future = FutureResponse;
17+
type CallFuture = middleware::NoopCallFuture;
1718

1819
fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X> where
1920
F: FnOnce(Request, Meta) -> X + Send,

core/src/io.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,34 @@ use types::{Request, Response, Call, Output};
1313
/// A type representing middleware or RPC response before serialization.
1414
pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
1515

16+
/// A type representing middleware or RPC call output.
17+
pub type FutureOutput = Box<Future<Item=Option<Output>, Error=()> + Send>;
18+
1619
/// A type representing future string response.
17-
pub type FutureResult<F> = future::Map<
18-
future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F>>,
20+
pub type FutureResult<F, G> = future::Map<
21+
future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F, G>>,
1922
fn(Option<Response>) -> Option<String>,
2023
>;
2124

2225
/// A type representing a result of a single method call.
23-
pub type FutureOutput = future::Either<
24-
Box<Future<Item=Option<Output>, Error=()> + Send>,
25-
future::FutureResult<Option<Output>, ()>,
26+
pub type FutureRpcOutput<F> = future::Either<
27+
F,
28+
future::Either<
29+
FutureOutput,
30+
future::FutureResult<Option<Output>, ()>,
31+
>,
2632
>;
2733

2834
/// A type representing an optional `Response` for RPC `Request`.
29-
pub type FutureRpcResult<F> = future::Either<
35+
pub type FutureRpcResult<F, G> = future::Either<
3036
F,
3137
future::Either<
3238
future::Map<
33-
FutureOutput,
39+
FutureRpcOutput<G>,
3440
fn(Option<Output>) -> Option<Response>,
3541
>,
3642
future::Map<
37-
future::JoinAll<Vec<FutureOutput>>,
43+
future::JoinAll<Vec<FutureRpcOutput<G>>>,
3844
fn(Vec<Option<Output>>) -> Option<Response>,
3945
>,
4046
>,
@@ -181,7 +187,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
181187
}
182188

183189
/// Handle given request asynchronously.
184-
pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future> {
190+
pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future, S::CallFuture> {
185191
use self::future::Either::{A, B};
186192
fn as_string(response: Option<Response>) -> Option<String> {
187193
let res = response.map(write_response);
@@ -203,7 +209,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
203209
}
204210

205211
/// Handle deserialized RPC request.
206-
pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future> {
212+
pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future, S::CallFuture> {
207213
use self::future::Either::{A, B};
208214

209215
fn output_as_response(output: Option<Output>) -> Option<Response> {
@@ -233,10 +239,10 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
233239
}
234240

235241
/// Handle single call asynchronously.
236-
pub fn handle_call(&self, call: Call, meta: T) -> FutureOutput {
242+
pub fn handle_call(&self, call: Call, meta: T) -> FutureRpcOutput<S::CallFuture> {
237243
use self::future::Either::{A, B};
238244

239-
match call {
245+
self.middleware.on_call(call, meta, |call, meta| match call {
240246
Call::MethodCall(method) => {
241247
let params = method.params;
242248
let id = method.id;
@@ -261,7 +267,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
261267
match result {
262268
Ok(result) => A(Box::new(
263269
result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
264-
)),
270+
) as _),
265271
Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
266272
}
267273
},
@@ -289,7 +295,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
289295
Call::Invalid { id } => {
290296
B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
291297
},
292-
}
298+
})
293299
}
294300
}
295301

@@ -312,17 +318,17 @@ impl IoHandler {
312318

313319
impl<M: Metadata + Default> IoHandler<M> {
314320
/// Handle given string request asynchronously.
315-
pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse> {
321+
pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse, FutureOutput> {
316322
self.0.handle_request(request, M::default())
317323
}
318324

319325
/// Handle deserialized RPC request asynchronously.
320-
pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse> {
326+
pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse, FutureOutput> {
321327
self.0.handle_rpc_request(request, M::default())
322328
}
323329

324330
/// Handle single Call asynchronously.
325-
pub fn handle_call(&self, call: Call) -> FutureOutput {
331+
pub fn handle_call(&self, call: Call) -> FutureRpcOutput<FutureOutput> {
326332
self.0.handle_call(call, M::default())
327333
}
328334

core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub extern crate serde_json;
3535
mod calls;
3636
mod io;
3737

38-
mod middleware;
38+
pub mod middleware;
3939
pub mod types;
4040

4141
/// A `Future` trait object.

core/src/middleware.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,56 @@
11
//! `IoHandler` middlewares
22
33
use calls::Metadata;
4-
use types::{Request, Response};
4+
use types::{Request, Response, Call, Output};
55
use futures::{future::Either, Future};
66

77
/// RPC middleware
88
pub trait Middleware<M: Metadata>: Send + Sync + 'static {
9-
/// A returned future.
9+
/// A returned request future.
1010
type Future: Future<Item=Option<Response>, Error=()> + Send + 'static;
1111

12+
/// A returned call future.
13+
type CallFuture: Future<Item=Option<Output>, Error=()> + Send + 'static;
14+
1215
/// Method invoked on each request.
1316
/// Allows you to either respond directly (without executing RPC call)
1417
/// or do any additional work before and/or after processing the request.
1518
fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<Self::Future, X> where
1619
F: FnOnce(Request, M) -> X + Send,
17-
X: Future<Item=Option<Response>, Error=()> + Send + 'static;
20+
X: Future<Item=Option<Response>, Error=()> + Send + 'static,
21+
{
22+
Either::B(next(request, meta))
23+
}
24+
25+
/// Method invoked on each call inside a request.
26+
///
27+
/// Allows you to either handle the call directly (without executing RPC call).
28+
fn on_call<F, X>(&self, call: Call, meta: M, next: F) -> Either<Self::CallFuture, X> where
29+
F: FnOnce(Call, M) -> X + Send,
30+
X: Future<Item=Option<Output>, Error=()> + Send + 'static,
31+
{
32+
Either::B(next(call, meta))
33+
}
1834
}
1935

36+
/// Dummy future used as a noop result of middleware.
37+
pub type NoopFuture = Box<Future<Item=Option<Response>, Error=()> + Send>;
38+
/// Dummy future used as a noop call result of middleware.
39+
pub type NoopCallFuture = Box<Future<Item=Option<Output>, Error=()> + Send>;
40+
2041
/// No-op middleware implementation
2142
#[derive(Debug, Default)]
2243
pub struct Noop;
2344
impl<M: Metadata> Middleware<M> for Noop {
24-
type Future = Box<Future<Item=Option<Response>, Error=()> + Send>;
25-
26-
fn on_request<F, X>(&self, request: Request, meta: M, process: F) -> Either<Self::Future, X> where
27-
F: FnOnce(Request, M) -> X + Send,
28-
X: Future<Item=Option<Response>, Error=()> + Send + 'static,
29-
{
30-
Either::B(process(request, meta))
31-
}
45+
type Future = NoopFuture;
46+
type CallFuture = NoopCallFuture;
3247
}
3348

3449
impl<M: Metadata, A: Middleware<M>, B: Middleware<M>>
3550
Middleware<M> for (A, B)
3651
{
3752
type Future = Either<A::Future, B::Future>;
53+
type CallFuture = Either<A::CallFuture, B::CallFuture>;
3854

3955
fn on_request<F, X>(&self, request: Request, meta: M, process: F) -> Either<Self::Future, X> where
4056
F: FnOnce(Request, M) -> X + Send,
@@ -44,12 +60,22 @@ impl<M: Metadata, A: Middleware<M>, B: Middleware<M>>
4460
self.1.on_request(request, meta, process)
4561
}))
4662
}
63+
64+
fn on_call<F, X>(&self, call: Call, meta: M, process: F) -> Either<Self::CallFuture, X> where
65+
F: FnOnce(Call, M) -> X + Send,
66+
X: Future<Item=Option<Output>, Error=()> + Send + 'static,
67+
{
68+
repack(self.0.on_call(call, meta, move |call, meta| {
69+
self.1.on_call(call, meta, process)
70+
}))
71+
}
4772
}
4873

4974
impl<M: Metadata, A: Middleware<M>, B: Middleware<M>, C: Middleware<M>>
5075
Middleware<M> for (A, B, C)
5176
{
5277
type Future = Either<A::Future, Either<B::Future, C::Future>>;
78+
type CallFuture = Either<A::CallFuture, Either<B::CallFuture, C::CallFuture>>;
5379

5480
fn on_request<F, X>(&self, request: Request, meta: M, process: F) -> Either<Self::Future, X> where
5581
F: FnOnce(Request, M) -> X + Send,
@@ -61,12 +87,24 @@ impl<M: Metadata, A: Middleware<M>, B: Middleware<M>, C: Middleware<M>>
6187
}))
6288
}))
6389
}
90+
91+
fn on_call<F, X>(&self, call: Call, meta: M, process: F) -> Either<Self::CallFuture, X> where
92+
F: FnOnce(Call, M) -> X + Send,
93+
X: Future<Item=Option<Output>, Error=()> + Send + 'static,
94+
{
95+
repack(self.0.on_call(call, meta, move |call, meta| {
96+
repack(self.1.on_call(call, meta, move |call, meta| {
97+
self.2.on_call(call, meta, process)
98+
}))
99+
}))
100+
}
64101
}
65102

66103
impl<M: Metadata, A: Middleware<M>, B: Middleware<M>, C: Middleware<M>, D: Middleware<M>>
67104
Middleware<M> for (A, B, C, D)
68105
{
69106
type Future = Either<A::Future, Either<B::Future, Either<C::Future, D::Future>>>;
107+
type CallFuture = Either<A::CallFuture, Either<B::CallFuture, Either<C::CallFuture, D::CallFuture>>>;
70108

71109
fn on_request<F, X>(&self, request: Request, meta: M, process: F) -> Either<Self::Future, X> where
72110
F: FnOnce(Request, M) -> X + Send,
@@ -80,6 +118,19 @@ impl<M: Metadata, A: Middleware<M>, B: Middleware<M>, C: Middleware<M>, D: Middl
80118
}))
81119
}))
82120
}
121+
122+
fn on_call<F, X>(&self, call: Call, meta: M, process: F) -> Either<Self::CallFuture, X> where
123+
F: FnOnce(Call, M) -> X + Send,
124+
X: Future<Item=Option<Output>, Error=()> + Send + 'static,
125+
{
126+
repack(self.0.on_call(call, meta, move |call, meta| {
127+
repack(self.1.on_call(call, meta, move |call, meta| {
128+
repack(self.2.on_call(call, meta, move |call, meta| {
129+
self.3.on_call(call, meta, process)
130+
}))
131+
}))
132+
}))
133+
}
83134
}
84135

85136
#[inline(always)]

http/src/handler.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use hyper::{self, service::Service, Body, Method};
77
use hyper::header::{self, HeaderMap, HeaderValue};
88

9-
use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware, FutureRpcResult};
9+
use jsonrpc::{self as core, middleware, FutureResult, Metadata, Middleware};
1010
use jsonrpc::futures::{Future, Poll, Async, Stream, future};
1111
use jsonrpc::serde_json;
1212
use response::Response;
@@ -15,7 +15,7 @@ use server_utils::cors;
1515
use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi};
1616

1717
/// jsonrpc http request handler.
18-
pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
18+
pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
1919
jsonrpc_handler: Rpc<M, S>,
2020
allowed_hosts: AllowedHosts,
2121
cors_domains: CorsDomains,
@@ -124,17 +124,19 @@ impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
124124
}
125125
}
126126

127-
enum RpcPollState<M, F> where
127+
enum RpcPollState<M, F, G> where
128128
F: Future<Item = Option<core::Response>, Error = ()>,
129+
G: Future<Item = Option<core::Output>, Error = ()>,
129130
{
130-
Ready(RpcHandlerState<M, F>),
131-
NotReady(RpcHandlerState<M, F>),
131+
Ready(RpcHandlerState<M, F, G>),
132+
NotReady(RpcHandlerState<M, F, G>),
132133
}
133134

134-
impl<M, F> RpcPollState<M, F> where
135+
impl<M, F, G> RpcPollState<M, F, G> where
135136
F: Future<Item = Option<core::Response>, Error = ()>,
137+
G: Future<Item = Option<core::Output>, Error = ()>,
136138
{
137-
fn decompose(self) -> (RpcHandlerState<M, F>, bool) {
139+
fn decompose(self) -> (RpcHandlerState<M, F, G>, bool) {
138140
use self::RpcPollState::*;
139141
match self {
140142
Ready(handler) => (handler, true),
@@ -143,14 +145,14 @@ impl<M, F> RpcPollState<M, F> where
143145
}
144146
}
145147

146-
type FutureResponse<F> = future::Map<
147-
future::Either<future::FutureResult<Option<core::Response>, ()>, FutureRpcResult<F>>,
148+
type FutureResponse<F, G> = future::Map<
149+
future::Either<future::FutureResult<Option<core::Response>, ()>, core::FutureRpcResult<F, G>>,
148150
fn(Option<core::Response>) -> Response,
149151
>;
150152

151-
152-
enum RpcHandlerState<M, F> where
153+
enum RpcHandlerState<M, F, G> where
153154
F: Future<Item = Option<core::Response>, Error = ()>,
155+
G: Future<Item = Option<core::Output>, Error = ()>,
154156
{
155157
ReadingHeaders {
156158
request: hyper::Request<Body>,
@@ -173,13 +175,14 @@ enum RpcHandlerState<M, F> where
173175
metadata: M,
174176
},
175177
Writing(Response),
176-
WaitingForResponse(FutureResponse<F>),
177-
Waiting(FutureResult<F>),
178+
Waiting(FutureResult<F, G>),
179+
WaitingForResponse(FutureResponse<F, G>),
178180
Done,
179181
}
180182

181-
impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
183+
impl<M, F, G> fmt::Debug for RpcHandlerState<M, F, G> where
182184
F: Future<Item = Option<core::Response>, Error = ()>,
185+
G: Future<Item = Option<core::Output>, Error = ()>,
183186
{
184187
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
185188
use self::RpcHandlerState::*;
@@ -199,7 +202,7 @@ impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
199202

200203
pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
201204
jsonrpc_handler: Rpc<M, S>,
202-
state: RpcHandlerState<M, S::Future>,
205+
state: RpcHandlerState<M, S::Future, S::CallFuture>,
203206
is_options: bool,
204207
cors_allow_origin: cors::AllowCors<header::HeaderValue>,
205208
cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
@@ -319,10 +322,11 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
319322
&self,
320323
request: hyper::Request<Body>,
321324
continue_on_invalid_cors: bool,
322-
) -> RpcHandlerState<M, S::Future> {
325+
) -> RpcHandlerState<M, S::Future, S::CallFuture> {
323326
if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
324327
return RpcHandlerState::Writing(Response::invalid_allow_origin());
325328
}
329+
326330
if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
327331
return RpcHandlerState::Writing(Response::invalid_allow_headers());
328332
}
@@ -377,7 +381,7 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
377381
&self,
378382
method: String,
379383
metadata: M,
380-
) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
384+
) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
381385
use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Output, Success, Failure};
382386

383387
// Create a request
@@ -412,7 +416,7 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
412416
&self,
413417
uri: hyper::Uri,
414418
metadata: M,
415-
) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
419+
) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
416420
use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Value};
417421

418422
// skip the initial /
@@ -450,7 +454,7 @@ impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
450454
mut request: Vec<u8>,
451455
uri: Option<hyper::Uri>,
452456
metadata: M,
453-
) -> Result<RpcPollState<M, S::Future>, BodyError> {
457+
) -> Result<RpcPollState<M, S::Future, S::CallFuture>, BodyError> {
454458
loop {
455459
match body.poll()? {
456460
Async::Ready(Some(chunk)) => {

0 commit comments

Comments
 (0)