diff --git a/.editorconfig b/.editorconfig
index 28b112186..5e5b724ee 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -9,8 +9,16 @@ trim_trailing_whitespace=true
max_line_length=120
insert_final_newline=true
-[.travis.yml]
+[{.travis.yml,appveyor.yml}]
indent_style=space
indent_size=2
tab_width=8
end_of_line=lf
+
+[*.stderr]
+indent_style=none
+indent_size=none
+end_of_line=none
+charset=none
+trim_trailing_whitespace=none
+insert_final_newline=none
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
new file mode 100644
index 000000000..7cc7c9725
--- /dev/null
+++ b/.gitlab-ci.yml
@@ -0,0 +1,65 @@
+stages:
+ - checkstyle
+ - test
+variables: &default-vars
+ GIT_STRATEGY: fetch
+ GIT_DEPTH: 100
+ CARGO_INCREMENTAL: 0
+
+.test_and_build: &test_and_build
+ script:
+ - cargo build --all
+ - cargo test --all
+
+.only: &only
+ only:
+ - triggers
+ - tags
+ - master
+ - schedules
+ - web
+ - /^[0-9]+$/
+
+.docker-env: &docker-env
+ image: paritytech/ci-linux:production
+ before_script:
+ - rustup show
+ - cargo --version
+ - sccache -s
+ variables:
+ <<: *default-vars
+ CARGO_TARGET_DIR: "/ci-cache/${CI_PROJECT_NAME}/targets/${CI_COMMIT_REF_NAME}/${CI_JOB_NAME}"
+ retry:
+ max: 2
+ when:
+ - runner_system_failure
+ - unknown_failure
+ - api_failure
+ interruptible: true
+ tags:
+ - linux-docker
+
+# check style
+checkstyle-linux-stable:
+ stage: checkstyle
+ <<: *only
+ <<: *docker-env
+ script:
+ - rustup component add rustfmt clippy
+ - cargo fmt --all -- --check
+ - cargo clippy
+ allow_failure: true
+
+# test rust stable
+test-linux-stable:
+ stage: test
+ <<: *docker-env
+ <<: *only
+ <<: *test_and_build
+
+test-mac-stable:
+ stage: test
+ <<: *test_and_build
+ <<: *only
+ tags:
+ - osx
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 061da8d0e..000000000
--- a/.travis.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-sudo: false
-language: rust
-branches:
- only:
- - master
- - /^parity-.*$/
-
-cache: cargo
-
-matrix:
- fast_finish: false
- include:
- - os: linux
- rust: stable
- - os: linux
- rust: beta
- - os: linux
- rust: nightly
- - os: osx
- rust: stable
- - os: windows
- rust: stable
- allow_failures:
- - rust: nightly
-
-script:
- - cargo build --all
- - cargo test --all
-
-after_success: |
- [ $TRAVIS_OS_NAME == 'linux' ] &&
- [ $TRAVIS_BRANCH = master ] &&
- [ $TRAVIS_PULL_REQUEST = false ] &&
- [ $TRAVIS_RUST_VERSION = stable ] &&
- cargo doc --all --no-deps &&
- echo '' > target/doc/index.html &&
- pip install --user ghp-import &&
- /home/travis/.local/bin/ghp-import -n target/doc &&
- git push -fq https://${GH_TOKEN}@github.com/${TRAVIS_REPO_SLUG}.git gh-pages
-
-env:
- global:
- - secure: "QA4Rw78VSsP1vH2Yve1eAVpjG32HH9DZZ79xrhxZXp34wKoemp+aGqaFN/8uXPfsXshlYxtMCTT6M9OiWTTLvku5tI5kBsDneu8mLut7eBZHVniYSp2SbKpTeqfpGMDHoCR0WD9AlWDn9Elm6txbghXjrxhCMg8gkhhsLGnQt/ARFF1wRHnXT0TjJg8fQtd+/OK0TaRfknx1RptruaznxfUi3DBwzDdzaMMZfd3VjWR1hPFRpDSL0mM+l6OjNrLbCeiR//k3lV4rpIhedsz0ODjfW2Hdk63qCaLJsXCkG1Bcuf/FYbYC+osm5SrHhGA1j2EgazWcLA6Wkzt15KPOR/HirNj+PCiS0YbGKM5Ac5LT6m6q0iYSF/pq1+jDurcSwBwYrTOY6X2FZCZQBfTP/4qnSjWgGPOkzBSMS6BNEBDQZgdc3xCASXadj7waF4Y4UGD0bDPuBtXopI4ppKLqSa7CsvKz6TX2yW0UVgUuQ5/jz/S+fkcz74o016d5x027yjaxAu/Z8fQFLSaBtiFU8sBzA+MDU3apFgjsYXiaGYZ8gDrp7WjbfHNYfBAMEHHKY4toywB5Vi8zJxF+Wn1n4hkvb/kDqSV9giFmWEg321U+pAGNAH4yY25tIJqS8gT89cz4oQJp7aWjA3Ke01e104yqqZU+N+CSyZHEeksdPt8="
diff --git a/Cargo.toml b/Cargo.toml
index 4e73179d7..b509827d5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,10 +1,11 @@
[workspace]
members = [
"core",
+ "core-client",
+ "core-client/transports",
"http",
"ipc",
- "macros",
- "minihttp",
+ "derive",
"pubsub",
"pubsub/more-examples",
"server-utils",
diff --git a/README.md b/README.md
index 72cc82648..45a018116 100644
--- a/README.md
+++ b/README.md
@@ -3,33 +3,38 @@
Rust implementation of JSON-RPC 2.0 Specification.
Transport-agnostic `core` and transport servers for `http`, `ipc`, `websockets` and `tcp`.
-[![Build Status][travis-image]][travis-url]
+**New!** Support for [clients](#Client-support).
-[travis-image]: https://travis-ci.org/paritytech/jsonrpc.svg?branch=master
-[travis-url]: https://travis-ci.org/paritytech/jsonrpc
-
-[Documentation](http://paritytech.github.io/jsonrpc/)
+[Documentation](https://docs.rs/jsonrpc-core/)
## Sub-projects
- [jsonrpc-core](./core) [![crates.io][core-image]][core-url]
+- [jsonrpc-core-client](./core-client) [![crates.io][core-client-image]][core-client-url]
- [jsonrpc-http-server](./http) [![crates.io][http-server-image]][http-server-url]
-- [jsonrpc-minihttp-server](./minihttp)
-- [jsonrpc-ipc-server](./ipc)
+- [jsonrpc-ipc-server](./ipc) [![crates.io][ipc-server-image]][ipc-server-url]
- [jsonrpc-tcp-server](./tcp) [![crates.io][tcp-server-image]][tcp-server-url]
-- [jsonrpc-ws-server](./ws)
-- [jsonrpc-stdio-server](./stdio)
-- [jsonrpc-macros](./macros) [![crates.io][macros-image]][macros-url]
+- [jsonrpc-ws-server](./ws) [![crates.io][ws-server-image]][ws-server-url]
+- [jsonrpc-stdio-server](./stdio) [![crates.io][stdio-server-image]][stdio-server-url]
+- [jsonrpc-derive](./derive) [![crates.io][derive-image]][derive-url]
- [jsonrpc-server-utils](./server-utils) [![crates.io][server-utils-image]][server-utils-url]
- [jsonrpc-pubsub](./pubsub) [![crates.io][pubsub-image]][pubsub-url]
[core-image]: https://img.shields.io/crates/v/jsonrpc-core.svg
[core-url]: https://crates.io/crates/jsonrpc-core
+[core-client-image]: https://img.shields.io/crates/v/jsonrpc-core-client.svg
+[core-client-url]: https://crates.io/crates/jsonrpc-core-client
[http-server-image]: https://img.shields.io/crates/v/jsonrpc-http-server.svg
[http-server-url]: https://crates.io/crates/jsonrpc-http-server
+[ipc-server-image]: https://img.shields.io/crates/v/jsonrpc-ipc-server.svg
+[ipc-server-url]: https://crates.io/crates/jsonrpc-ipc-server
[tcp-server-image]: https://img.shields.io/crates/v/jsonrpc-tcp-server.svg
[tcp-server-url]: https://crates.io/crates/jsonrpc-tcp-server
-[macros-image]: https://img.shields.io/crates/v/jsonrpc-macros.svg
-[macros-url]: https://crates.io/crates/jsonrpc-macros
+[ws-server-image]: https://img.shields.io/crates/v/jsonrpc-ws-server.svg
+[ws-server-url]: https://crates.io/crates/jsonrpc-ws-server
+[stdio-server-image]: https://img.shields.io/crates/v/jsonrpc-stdio-server.svg
+[stdio-server-url]: https://crates.io/crates/jsonrpc-stdio-server
+[derive-image]: https://img.shields.io/crates/v/jsonrpc-derive.svg
+[derive-url]: https://crates.io/crates/jsonrpc-derive
[server-utils-image]: https://img.shields.io/crates/v/jsonrpc-server-utils.svg
[server-utils-url]: https://crates.io/crates/jsonrpc-server-utils
[pubsub-image]: https://img.shields.io/crates/v/jsonrpc-pubsub.svg
@@ -38,22 +43,19 @@ Transport-agnostic `core` and transport servers for `http`, `ipc`, `websockets`
## Examples
- [core](./core/examples)
-- [macros](./macros/examples)
+- [derive](./derive/examples)
- [pubsub](./pubsub/examples)
### Basic Usage (with HTTP transport)
```rust
-extern crate jsonrpc_core;
-extern crate jsonrpc_minihttp_server;
-
-use jsonrpc_core::{IoHandler, Value, Params};
-use jsonrpc_minihttp_server::{ServerBuilder};
+use jsonrpc_http_server::jsonrpc_core::{IoHandler, Value, Params};
+use jsonrpc_http_server::ServerBuilder;
fn main() {
- let mut io = IoHandler::new();
- io.add_method("say_hello", |_params: Params| {
- Ok(Value::String("hello".to_string()))
+ let mut io = IoHandler::default();
+ io.add_method("say_hello", |_params: Params| async {
+ Ok(Value::String("hello".to_owned()))
});
let server = ServerBuilder::new(io)
@@ -61,25 +63,21 @@ fn main() {
.start_http(&"127.0.0.1:3030".parse().unwrap())
.unwrap();
- server.wait().unwrap();
+ server.wait();
}
```
-### Basic usage with macros
+### Basic usage with derive
```rust
-extern crate jsonrpc_core;
-#[macro_use]
-extern crate jsonrpc_macros;
-
use jsonrpc_core::Result;
+use jsonrpc_derive::rpc;
-build_rpc_trait! {
- pub trait Rpc {
- /// Adds two numbers and returns a result
- #[rpc(name = "add")]
- fn add(&self, u64, u64) -> Result;
- }
+#[rpc]
+pub trait Rpc {
+ /// Adds two numbers and returns a result
+ #[rpc(name = "add")]
+ fn add(&self, u64, u64) -> Result;
}
pub struct RpcImpl;
@@ -89,8 +87,59 @@ impl Rpc for RpcImpl {
}
}
-
fn main() {
let mut io = jsonrpc_core::IoHandler::new();
io.extend_with(RpcImpl.to_delegate())
}
+```
+
+### Client support
+
+```rust
+use jsonrpc_core_client::transports::local;
+use jsonrpc_core::{Error, IoHandler, Result};
+use jsonrpc_derive::rpc;
+
+/// Rpc trait
+#[rpc]
+pub trait Rpc {
+ /// Returns a protocol version
+ #[rpc(name = "protocolVersion")]
+ fn protocol_version(&self) -> Result;
+
+ /// Adds two numbers and returns a result
+ #[rpc(name = "add", alias("callAsyncMetaAlias"))]
+ fn add(&self, a: u64, b: u64) -> Result;
+
+ /// Performs asynchronous operation
+ #[rpc(name = "callAsync")]
+ fn call(&self, a: u64) -> FutureResult;
+}
+
+struct RpcImpl;
+
+impl Rpc for RpcImpl {
+ fn protocol_version(&self) -> Result {
+ Ok("version1".into())
+ }
+
+ fn add(&self, a: u64, b: u64) -> Result {
+ Ok(a + b)
+ }
+
+ fn call(&self, _: u64) -> FutureResult {
+ future::ok("OK".to_owned())
+ }
+}
+
+fn main() {
+ let mut io = IoHandler::new();
+ io.extend_with(RpcImpl.to_delegate());
+
+ let fut = {
+ let (client, server) = local::connect::(io);
+ client.add(5, 6).map(|res| println!("5 + 6 = {}", res)).join(server)
+ };
+ fut.wait().unwrap();
+}
+```
diff --git a/_automate/bump_version.sh b/_automate/bump_version.sh
new file mode 100755
index 000000000..a078520fc
--- /dev/null
+++ b/_automate/bump_version.sh
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+set -xeu
+
+VERSION=$1
+PREV_DEPS=$2
+NEW_DEPS=$3
+
+ack "^version = \"" -l | \
+ grep toml | \
+ xargs sed -i "s/^version = \".*/version = \"$VERSION\"/"
+
+ack "{ version = \"$PREV_DEPS" -l | \
+ grep toml | \
+ xargs sed -i "s/{ version = \"$PREV_DEPS/{ version = \"$NEW_DEPS/"
+
+ack " = \"$PREV_DEPS" -l | \
+ grep md | \
+ xargs sed -i "s/ = \"$PREV_DEPS/ = \"$NEW_DEPS/"
+
+cargo check
diff --git a/_automate/publish.sh b/_automate/publish.sh
new file mode 100755
index 000000000..ec389b923
--- /dev/null
+++ b/_automate/publish.sh
@@ -0,0 +1,117 @@
+#!/bin/bash
+
+set -eu
+
+ORDER=(core server-utils tcp ws http ipc stdio pubsub core-client/transports core-client derive test)
+
+function read_toml () {
+ NAME=""
+ VERSION=""
+ NAME=$(grep "^name" ./Cargo.toml | sed -e 's/.*"\(.*\)"/\1/')
+ VERSION=$(grep "^version" ./Cargo.toml | sed -e 's/.*"\(.*\)"/\1/')
+}
+function remote_version () {
+ REMOTE_VERSION=""
+ REMOTE_VERSION=$(cargo search "$NAME" | grep "^$NAME =" | sed -e 's/.*"\(.*\)".*/\1/')
+}
+
+# First display the plan
+for CRATE_DIR in ${ORDER[@]}; do
+ cd $CRATE_DIR > /dev/null
+ read_toml
+ echo "$NAME@$VERSION"
+ cd - > /dev/null
+done
+
+read -p ">>>> Really publish?. Press [enter] to continue. "
+
+set -x
+
+cargo clean
+
+set +x
+
+# Then actually perform publishing.
+for CRATE_DIR in ${ORDER[@]}; do
+ cd $CRATE_DIR > /dev/null
+ read_toml
+ remote_version
+ # Seems the latest version matches, skip by default.
+ if [ "$REMOTE_VERSION" = "$VERSION" ] || [[ "$REMOTE_VERSION" > "$VERSION" ]]; then
+ RET=""
+ echo "Seems like $NAME@$REMOTE_VERSION is already published. Continuing in 5s. "
+ read -t 5 -p ">>>> Type [r][enter] to retry, or [enter] to continue... " RET || true
+ if [ "$RET" != "r" ]; then
+ echo "Skipping $NAME@$VERSION"
+ cd - > /dev/null
+ continue
+ fi
+ fi
+
+ # Attempt to publish (allow retries)
+ while : ; do
+ # give the user an opportunity to abort or skip before publishing
+ echo "🚀 Publishing $NAME@$VERSION..."
+ sleep 3
+
+ set +e && set -x
+ cargo publish $@
+ RES=$?
+ set +x && set -e
+ # Check if it succeeded
+ if [ "$RES" != "0" ]; then
+ CHOICE=""
+ echo "##### Publishing $NAME failed"
+ read -p ">>>>> Type [s][enter] to skip, or [enter] to retry.. " CHOICE
+ if [ "$CHOICE" = "s" ]; then
+ break
+ fi
+ else
+ break
+ fi
+ done
+
+ # Wait again to make sure that the new version is published and available.
+ echo "Waiting for $NAME@$VERSION to become available at the registry..."
+ while : ; do
+ sleep 3
+ remote_version
+ if [ "$REMOTE_VERSION" = "$VERSION" ]; then
+ echo "🥳 $NAME@$VERSION published succesfully."
+ sleep 3
+ break
+ else
+ echo "#### Got $NAME@$REMOTE_VERSION but expected $NAME@$VERSION. Retrying..."
+ fi
+ done
+ cd - > /dev/null
+done
+
+# Make tags in one go
+set -x
+git fetch --tags
+set +x
+
+for CRATE_DIR in ${ORDER[@]}; do
+ cd $CRATE_DIR > /dev/null
+ read_toml
+ echo "Tagging $NAME@$VERSION"
+ set -x
+ git tag -a "$NAME-$VERSION" -m "$NAME $VERSION" || true
+ set +x
+ cd - > /dev/null
+done
+
+set -x
+sleep 3
+git push --tags
+set +x
+
+cd core > /dev/null
+read_toml
+cd - > /dev/null
+echo "Tagging jsonrpc@$VERSION"
+set -x
+git tag -a v$VERSION -m "Version $VERSION"
+sleep 3
+git push --tags
diff --git a/core-client/Cargo.toml b/core-client/Cargo.toml
new file mode 100644
index 000000000..e4340540a
--- /dev/null
+++ b/core-client/Cargo.toml
@@ -0,0 +1,33 @@
+[package]
+authors = ["Parity Technologies "]
+description = "Transport agnostic JSON-RPC 2.0 client implementation."
+documentation = "https://docs.rs/jsonrpc-core-client/"
+edition = "2018"
+homepage = "https://github.com/paritytech/jsonrpc"
+keywords = ["jsonrpc", "json-rpc", "json", "rpc", "serde"]
+license = "MIT"
+name = "jsonrpc-core-client"
+repository = "https://github.com/paritytech/jsonrpc"
+version = "18.0.0"
+
+categories = [
+ "asynchronous",
+ "network-programming",
+ "web-programming::http-client",
+ "web-programming::http-server",
+ "web-programming::websocket",
+]
+
+[features]
+tls = ["jsonrpc-client-transports/tls"]
+http = ["jsonrpc-client-transports/http"]
+ws = ["jsonrpc-client-transports/ws"]
+ipc = ["jsonrpc-client-transports/ipc"]
+arbitrary_precision = ["jsonrpc-client-transports/arbitrary_precision"]
+
+[dependencies]
+jsonrpc-client-transports = { version = "18.0.0", path = "./transports", default-features = false }
+futures = { version = "0.3", features = [ "compat" ] }
+
+[badges]
+travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
diff --git a/core-client/src/lib.rs b/core-client/src/lib.rs
new file mode 100644
index 000000000..0ffd970b7
--- /dev/null
+++ b/core-client/src/lib.rs
@@ -0,0 +1,11 @@
+//! JSON-RPC client implementation primitives.
+//!
+//! By default this crate does not implement any transports,
+//! use corresponding features (`tls`, `http` or `ws`) to opt-in for them.
+//!
+//! See documentation of [`jsonrpc-client-transports`](https://docs.rs/jsonrpc-client-transports) for more details.
+
+#![deny(missing_docs)]
+
+pub use futures;
+pub use jsonrpc_client_transports::*;
diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml
new file mode 100644
index 000000000..3ca892697
--- /dev/null
+++ b/core-client/transports/Cargo.toml
@@ -0,0 +1,63 @@
+[package]
+authors = ["Parity Technologies "]
+description = "Transport agnostic JSON-RPC 2.0 client implementation."
+documentation = "https://docs.rs/jsonrpc-client-transports/"
+edition = "2018"
+homepage = "https://github.com/paritytech/jsonrpc"
+keywords = ["jsonrpc", "json-rpc", "json", "rpc", "serde"]
+license = "MIT"
+name = "jsonrpc-client-transports"
+repository = "https://github.com/paritytech/jsonrpc"
+version = "18.0.0"
+
+categories = [
+ "asynchronous",
+ "network-programming",
+ "web-programming::http-client",
+ "web-programming::http-server",
+ "web-programming::websocket",
+]
+
+[features]
+default = ["http", "tls", "ws"]
+tls = ["hyper-tls", "http"]
+http = ["hyper", "tokio/full"]
+ws = [
+ "websocket",
+ "tokio",
+ "futures/compat"
+]
+ipc = [
+ "parity-tokio-ipc",
+ "jsonrpc-server-utils",
+ "tokio",
+]
+arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"]
+
+[dependencies]
+derive_more = "0.99"
+futures = "0.3"
+jsonrpc-core = { version = "18.0.0", path = "../../core" }
+jsonrpc-pubsub = { version = "18.0.0", path = "../../pubsub" }
+log = "0.4"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+url = "1.7"
+
+hyper = { version = "0.14", features = ["client", "http1"], optional = true }
+hyper-tls = { version = "0.5", optional = true }
+jsonrpc-server-utils = { version = "18.0.0", path = "../../server-utils", optional = true }
+parity-tokio-ipc = { version = "0.9", optional = true }
+tokio = { version = "1", optional = true }
+websocket = { version = "0.24", optional = true }
+flate2 = "0.2"
+
+[dev-dependencies]
+assert_matches = "1.1"
+jsonrpc-http-server = { version = "18.0.0", path = "../../http" }
+jsonrpc-ipc-server = { version = "18.0.0", path = "../../ipc" }
+lazy_static = "1.0"
+env_logger = "0.7"
+
+[badges]
+travis-ci = { repository = "paritytech/jsonrpc", branch = "master" }
diff --git a/core-client/transports/src/lib.rs b/core-client/transports/src/lib.rs
new file mode 100644
index 000000000..8b1112945
--- /dev/null
+++ b/core-client/transports/src/lib.rs
@@ -0,0 +1,479 @@
+//! JSON-RPC client implementation.
+
+#![deny(missing_docs)]
+
+use jsonrpc_core::futures::channel::{mpsc, oneshot};
+use jsonrpc_core::futures::{
+ self,
+ task::{Context, Poll},
+ Future, Stream, StreamExt,
+};
+use jsonrpc_core::{Error, Params};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use serde_json::Value;
+use std::marker::PhantomData;
+use std::pin::Pin;
+
+pub mod transports;
+
+#[cfg(test)]
+mod logger;
+
+/// The errors returned by the client.
+#[derive(Debug, derive_more::Display)]
+pub enum RpcError {
+ /// An error returned by the server.
+ #[display(fmt = "Server returned rpc error {}", _0)]
+ JsonRpcError(Error),
+ /// Failure to parse server response.
+ #[display(fmt = "Failed to parse server response as {}: {}", _0, _1)]
+ ParseError(String, Box),
+ /// Request timed out.
+ #[display(fmt = "Request timed out")]
+ Timeout,
+ /// A general client error.
+ #[display(fmt = "Client error: {}", _0)]
+ Client(String),
+ /// Not rpc specific errors.
+ #[display(fmt = "{}", _0)]
+ Other(Box),
+}
+
+impl std::error::Error for RpcError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match *self {
+ Self::JsonRpcError(ref e) => Some(e),
+ Self::ParseError(_, ref e) => Some(&**e),
+ Self::Other(ref e) => Some(&**e),
+ _ => None,
+ }
+ }
+}
+
+impl From for RpcError {
+ fn from(error: Error) -> Self {
+ RpcError::JsonRpcError(error)
+ }
+}
+
+/// A result returned by the client.
+pub type RpcResult = Result;
+
+/// An RPC call message.
+struct CallMessage {
+ /// The RPC method name.
+ method: String,
+ /// The RPC method parameters.
+ params: Params,
+ /// The oneshot channel to send the result of the rpc
+ /// call to.
+ sender: oneshot::Sender>,
+}
+
+/// An RPC notification.
+struct NotifyMessage {
+ /// The RPC method name.
+ method: String,
+ /// The RPC method paramters.
+ params: Params,
+}
+
+/// An RPC subscription.
+struct Subscription {
+ /// The subscribe method name.
+ subscribe: String,
+ /// The subscribe method parameters.
+ subscribe_params: Params,
+ /// The name of the notification.
+ notification: String,
+ /// The unsubscribe method name.
+ unsubscribe: String,
+}
+
+/// An RPC subscribe message.
+struct SubscribeMessage {
+ /// The subscription to subscribe to.
+ subscription: Subscription,
+ /// The channel to send notifications to.
+ sender: mpsc::UnboundedSender>,
+}
+
+/// A message sent to the `RpcClient`.
+enum RpcMessage {
+ /// Make an RPC call.
+ Call(CallMessage),
+ /// Send a notification.
+ Notify(NotifyMessage),
+ /// Subscribe to a notification.
+ Subscribe(SubscribeMessage),
+}
+
+impl From for RpcMessage {
+ fn from(msg: CallMessage) -> Self {
+ RpcMessage::Call(msg)
+ }
+}
+
+impl From for RpcMessage {
+ fn from(msg: NotifyMessage) -> Self {
+ RpcMessage::Notify(msg)
+ }
+}
+
+impl From for RpcMessage {
+ fn from(msg: SubscribeMessage) -> Self {
+ RpcMessage::Subscribe(msg)
+ }
+}
+
+/// A channel to a `RpcClient`.
+#[derive(Clone)]
+pub struct RpcChannel(mpsc::UnboundedSender);
+
+impl RpcChannel {
+ fn send(&self, msg: RpcMessage) -> Result<(), mpsc::TrySendError> {
+ self.0.unbounded_send(msg)
+ }
+}
+
+impl From> for RpcChannel {
+ fn from(sender: mpsc::UnboundedSender) -> Self {
+ RpcChannel(sender)
+ }
+}
+
+/// The future returned by the rpc call.
+pub type RpcFuture = oneshot::Receiver>;
+
+/// The stream returned by a subscribe.
+pub type SubscriptionStream = mpsc::UnboundedReceiver>;
+
+/// A typed subscription stream.
+pub struct TypedSubscriptionStream {
+ _marker: PhantomData,
+ returns: &'static str,
+ stream: SubscriptionStream,
+}
+
+impl TypedSubscriptionStream {
+ /// Creates a new `TypedSubscriptionStream`.
+ pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
+ TypedSubscriptionStream {
+ _marker: PhantomData,
+ returns,
+ stream,
+ }
+ }
+}
+
+impl Stream for TypedSubscriptionStream {
+ type Item = RpcResult;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll