Skip to content

feat: client rpc middleware #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
33ad778
feat: client rpc middleware
niklasad1 Jan 15, 2025
401dfc1
PoC works
niklasad1 Jan 29, 2025
11d7b4d
cargo fmt
niklasad1 Jan 29, 2025
622bffd
more refactoring
niklasad1 Feb 25, 2025
56be708
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Feb 26, 2025
93c9b6f
use Cow in Notification to avoid alloc
niklasad1 Feb 26, 2025
5929436
cleanup some todos
niklasad1 Feb 26, 2025
946afee
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Mar 10, 2025
ff64b91
rpc trait: return Result<MethodResponse, Err>
niklasad1 Mar 11, 2025
9515e40
remove infallible err
niklasad1 Mar 12, 2025
062ead3
make it compile
niklasad1 Mar 12, 2025
d52740b
fix tests
niklasad1 Mar 12, 2025
b508470
introduce client method response type
niklasad1 Mar 19, 2025
dec873f
fix faulty imports
niklasad1 Mar 20, 2025
3aa828d
minor cleanup
niklasad1 Mar 20, 2025
b1e3b41
introduce Batch/BatchEntry for middleware
niklasad1 Mar 21, 2025
46ef085
remove ignore for batch test
niklasad1 Mar 21, 2025
5759407
fix rustdocs
niklasad1 Mar 22, 2025
ee75d58
add rpc middleware for the async client
niklasad1 Mar 25, 2025
79216b5
remove serialize specific types
niklasad1 Mar 25, 2025
8849af5
commit missing file
niklasad1 Mar 25, 2025
6a6e3aa
no serde_json::Value
niklasad1 Mar 26, 2025
02fb917
more nit fixing
niklasad1 Mar 27, 2025
09b4e0b
more cleanup
niklasad1 Mar 27, 2025
763cad6
refactor method response client
niklasad1 Mar 28, 2025
0e2adb0
fix some nits
niklasad1 Mar 28, 2025
6b80964
add client middleware rpc
niklasad1 Mar 31, 2025
631d7c0
fix wasm build
niklasad1 Apr 1, 2025
4ddee29
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Apr 1, 2025
a7324d3
add client middleware example
niklasad1 Apr 1, 2025
38b5261
Update examples/examples/rpc_middleware_client.rs
niklasad1 Apr 1, 2025
9640517
ToJson -> RawValue
niklasad1 Apr 2, 2025
100bda0
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Apr 2, 2025
b051bd6
replace Future type with impl Trait
niklasad1 Apr 2, 2025
f03371a
revert changelog
niklasad1 Apr 2, 2025
cc08ebf
remove logger response future
niklasad1 Apr 2, 2025
9936607
some cleanup
niklasad1 Apr 3, 2025
4149181
move request timeout from transport to client
niklasad1 Apr 3, 2025
8161e01
more nit fixing
niklasad1 Apr 3, 2025
1f73b97
have pass over examples
niklasad1 Apr 4, 2025
de1461d
show proper batch middleware example
niklasad1 Apr 4, 2025
a148135
middleware: clean up batch type
niklasad1 Apr 4, 2025
5285fc4
fix wasm build
niklasad1 Apr 4, 2025
54c7fe3
Update Cargo.toml
niklasad1 Apr 5, 2025
cc7807e
doc: fix typo
niklasad1 Apr 5, 2025
f722acd
core: remove tracing mod
niklasad1 Apr 5, 2025
ed4fb5b
fix more clippy
niklasad1 Apr 5, 2025
52588f3
remove middleware error
niklasad1 Apr 9, 2025
39c1d19
address review grumbles
niklasad1 Apr 9, 2025
f6dada4
fix tests
niklasad1 Apr 9, 2025
6484bd2
fix tests
niklasad1 Apr 9, 2025
cd34d2c
ErrorResponse -> BatchEntryErr
niklasad1 Apr 9, 2025
ac5f60a
commit missing file
niklasad1 Apr 10, 2025
9feb968
move deserialize_with_ext to server again
niklasad1 Apr 11, 2025
2a67151
Update core/src/client/mod.rs
niklasad1 Apr 11, 2025
c9fb409
Merge branch 'master' into na-client-rpc-middleware
niklasad1 Apr 11, 2025
84b4e1c
client: enable default rpc logger
niklasad1 Apr 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ workspace = true
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
futures-util = { workspace = true }
hyper = { workspace = true, features = ["client", "http1", "http2"] }
hyper-rustls = { workspace = true, features = ["http1", "http2", "tls12", "logging", "ring"], optional = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "tokio", "http1", "http2"] }
Expand Down
154 changes: 76 additions & 78 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@
use std::sync::Arc;
use std::time::Duration;

use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder};
use crate::types::{NotificationSer, RequestSer, Response};
use crate::rpc_service::RpcService;
use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClientBuilder};
use crate::types::Response;
use crate::{HttpRequest, HttpResponse};
use async_trait::async_trait;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range,
};
use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, Notification, Request, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tokio::sync::Semaphore;
use tower::layer::util::Identity;
Expand Down Expand Up @@ -75,7 +77,7 @@
/// }
/// ```
#[derive(Clone, Debug)]
pub struct HttpClientBuilder<L = Identity> {
pub struct HttpClientBuilder<HttpMiddleware = Identity, RpcMiddleware = Identity> {
max_request_size: u32,
max_response_size: u32,
request_timeout: Duration,
Expand All @@ -84,12 +86,13 @@
id_kind: IdKind,
max_log_length: u32,
headers: HeaderMap,
service_builder: tower::ServiceBuilder<L>,
service_builder: tower::ServiceBuilder<HttpMiddleware>,
rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
tcp_no_delay: bool,
max_concurrent_requests: Option<usize>,
}

impl<L> HttpClientBuilder<L> {
impl<HttpMiddleware, RpcMiddleware> HttpClientBuilder<HttpMiddleware, RpcMiddleware> {
/// Set the maximum size of a request body in bytes. Default is 10 MiB.
pub fn max_request_size(mut self, size: u32) -> Self {
self.max_request_size = size;
Expand Down Expand Up @@ -215,8 +218,29 @@
self
}

/// Set the RPC middleware.
pub fn set_rpc_middleware<T>(self, rpc_builder: RpcServiceBuilder<T>) -> HttpClientBuilder<HttpMiddleware, T> {
HttpClientBuilder {
#[cfg(feature = "tls")]
certificate_store: self.certificate_store,
id_kind: self.id_kind,
headers: self.headers,
max_log_length: self.max_log_length,
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
service_builder: self.service_builder,
rpc_middleware: rpc_builder,
request_timeout: self.request_timeout,
tcp_no_delay: self.tcp_no_delay,
max_concurrent_requests: self.max_concurrent_requests,
}
}

/// Set custom tower middleware.
pub fn set_http_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
pub fn set_http_middleware<T>(
self,
service_builder: tower::ServiceBuilder<T>,
) -> HttpClientBuilder<T, RpcMiddleware> {
HttpClientBuilder {
#[cfg(feature = "tls")]
certificate_store: self.certificate_store,
Expand All @@ -226,23 +250,26 @@
max_request_size: self.max_request_size,
max_response_size: self.max_response_size,
service_builder,
rpc_middleware: self.rpc_middleware,
request_timeout: self.request_timeout,
tcp_no_delay: self.tcp_no_delay,
max_concurrent_requests: self.max_concurrent_requests,
}
}
}

impl<B, S, L> HttpClientBuilder<L>
impl<B, S, S2, HttpMiddleware, RpcMiddleware> HttpClientBuilder<HttpMiddleware, RpcMiddleware>
where
L: Layer<transport::HttpBackend, Service = S>,
RpcMiddleware: Layer<RpcService<S>, Service = S2>,
for<'a> <RpcMiddleware as Layer<RpcService<S>>>::Service: RpcServiceT<'a>,
HttpMiddleware: Layer<transport::HttpBackend, Service = S>,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient<S>, Error> {
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient<S2>, Error> {
let Self {
max_request_size,
max_response_size,
Expand All @@ -254,10 +281,11 @@
max_log_length,
service_builder,
tcp_no_delay,
rpc_middleware,
..
} = self;

let transport = HttpTransportClientBuilder {
let http = HttpTransportClientBuilder {
max_request_size,
max_response_size,
headers,
Expand All @@ -266,6 +294,7 @@
service_builder,
#[cfg(feature = "tls")]
certificate_store,
request_timeout,
}
.build(target)
.map_err(|e| Error::Transport(e.into()))?;
Expand All @@ -275,9 +304,8 @@
.map(|max_concurrent_requests| Arc::new(Semaphore::new(max_concurrent_requests)));

Ok(HttpClient {
transport,
transport: rpc_middleware.service(RpcService::new(http, max_response_size)),
id_manager: Arc::new(RequestIdManager::new(id_kind)),
request_timeout,
request_guard,
})
}
Expand All @@ -295,6 +323,7 @@
max_log_length: 4096,
headers: HeaderMap::new(),
service_builder: tower::ServiceBuilder::new(),
rpc_middleware: RpcServiceBuilder::default(),
tcp_no_delay: true,
max_concurrent_requests: None,
}
Expand All @@ -310,11 +339,9 @@

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
#[derive(Debug, Clone)]
pub struct HttpClient<S = HttpBackend> {
pub struct HttpClient<S> {
/// HTTP transport client.
transport: HttpTransportClient<S>,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
transport: S,
/// Request ID manager.
id_manager: Arc<RequestIdManager>,
/// Concurrent requests limit guard.
Expand All @@ -329,18 +356,14 @@

/// Returns configured request timeout.
pub fn request_timeout(&self) -> Duration {
self.request_timeout
todo!();
}
}

#[async_trait]
impl<B, S> ClientT for HttpClient<S>
impl<S> ClientT for HttpClient<S>
where
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Error: Into<BoxError>,
B::Data: Send,
for<'a> S: RpcServiceT<'a, Error = Error> + Send + Sync,
{
#[instrument(name = "notification", skip(self, params), level = "trace")]
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
Expand All @@ -351,17 +374,10 @@
Some(permit) => permit.acquire().await.ok(),
None => None,
};
let params = params.to_rpc_params()?;
let notif =
serde_json::to_string(&NotificationSer::borrowed(&method, params.as_deref())).map_err(Error::ParseError)?;

let fut = self.transport.send(notif);

match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
}
let params = params.to_rpc_params()?.map(StdCow::Owned);
let n = Notification { jsonrpc: TwoPointZero, method: method.into(), params };
self.transport.notification(n).await?;
Ok(())
}

#[instrument(name = "method_call", skip(self, params), level = "trace")]
Expand All @@ -377,23 +393,13 @@
let id = self.id_manager.next_request_id();
let params = params.to_rpc_params()?;

let request = RequestSer::borrowed(&id, &method, params.as_deref());
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;

let fut = self.transport.send_and_read_body(raw);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let request = Request::new(method.into(), params.as_deref(), id.clone());
let rp = self.transport.call(request).await?;

// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let response = ResponseSuccess::try_from(serde_json::from_slice::<Response<&JsonRawValue>>(&body)?)?;
// TODO: this is inefficient, we should avoid double parsing.
let response = ResponseSuccess::try_from(serde_json::from_str::<Response<&JsonRawValue>>(&rp.as_result())?)?;

Check warning on line 402 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:402:92 | 402 | let response = ResponseSuccess::try_from(serde_json::from_str::<Response<&JsonRawValue>>(&rp.as_result())?)?; | ^^^^^^^^^^^^^^^ help: change this to: `rp.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default

Check warning on line 402 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:402:92 | 402 | let response = ResponseSuccess::try_from(serde_json::from_str::<Response<&JsonRawValue>>(&rp.as_result())?)?; | ^^^^^^^^^^^^^^^ help: change this to: `rp.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default

Check warning on line 402 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:402:92 | 402 | let response = ResponseSuccess::try_from(serde_json::from_str::<Response<&JsonRawValue>>(&rp.as_result())?)?; | ^^^^^^^^^^^^^^^ help: change this to: `rp.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default

let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?;

Expand All @@ -420,51 +426,47 @@
let mut batch_request = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
batch_request.push(RequestSer {
batch_request.push(Request {
jsonrpc: TwoPointZero,
id,
method: method.into(),
params: params.map(StdCow::Owned),
id,
extensions: Default::default(),
});
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);

let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
let batch = self.transport.batch(batch_request).await?;
// TODO: this is inefficient, we should avoid double parsing.
let json_responses: Vec<Response<&JsonRawValue>> = serde_json::from_str(&batch.as_result())?;

Check warning on line 440 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:440:75 | 440 | let json_responses: Vec<Response<&JsonRawValue>> = serde_json::from_str(&batch.as_result())?; | ^^^^^^^^^^^^^^^^^^ help: change this to: `batch.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

Check warning on line 440 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:440:75 | 440 | let json_responses: Vec<Response<&JsonRawValue>> = serde_json::from_str(&batch.as_result())?; | ^^^^^^^^^^^^^^^^^^ help: change this to: `batch.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

Check warning on line 440 in client/http-client/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> client/http-client/src/client.rs:440:75 | 440 | let json_responses: Vec<Response<&JsonRawValue>> = serde_json::from_str(&batch.as_result())?; | ^^^^^^^^^^^^^^^^^^ help: change this to: `batch.as_result()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

let json_rps: Vec<Response<&JsonRawValue>> = serde_json::from_slice(&body).map_err(Error::ParseError)?;
let mut batch_response = Vec::new();
let mut success = 0;
let mut failed = 0;

let mut responses = Vec::with_capacity(json_rps.len());
let mut successful_calls = 0;
let mut failed_calls = 0;

for _ in 0..json_rps.len() {
responses.push(Err(ErrorObject::borrowed(0, "", None)));
// Fill the batch response with placeholder values.
for _ in 0..json_responses.len() {
batch_response.push(Err(ErrorObject::borrowed(0, "", None)));
}

for rp in json_rps {
for rp in json_responses.into_iter() {
let id = rp.id.try_parse_inner_as_number()?;

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
let result = serde_json::from_str(r.result.get())?;
successful_calls += 1;
Ok(result)
let v = serde_json::from_str(r.result.get()).map_err(Error::ParseError)?;
success += 1;
Ok(v)
}
Err(err) => {
failed_calls += 1;
failed += 1;
Err(err)
}
};

let maybe_elem = id
.checked_sub(id_range.start)
.and_then(|p| p.try_into().ok())
.and_then(|p: usize| responses.get_mut(p));
.and_then(|p: usize| batch_response.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = res;
Expand All @@ -473,18 +475,14 @@
}
}

Ok(BatchResponse::new(successful_calls, responses, failed_calls))
Ok(BatchResponse::new(success, batch_response, failed))
}
}

#[async_trait]
impl<B, S> SubscriptionClientT for HttpClient<S>
impl<S> SubscriptionClientT for HttpClient<S>
where
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
for<'a> S: RpcServiceT<'a, Error = Error> + Send + Sync,
{
/// Send a subscription request to the server. Not implemented for HTTP; will always return
/// [`Error::HttpNotImplemented`].
Expand Down
1 change: 1 addition & 0 deletions client/http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

mod client;
mod rpc_service;

/// HTTP transport.
pub mod transport;
Expand Down
Loading
Loading