Skip to content

Commit 4933feb

Browse files
tomusdrwascjones
authored andcommitted
Add asynchronous methods to assign_id or reject in pubsub. (paritytech#401)
* Add asynchronous methods to assign_id or reject in pubsub. * Make the channel impl more clear. * Fix typos.
1 parent e891334 commit 4933feb

File tree

15 files changed

+219
-47
lines changed

15 files changed

+219
-47
lines changed
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

macros/src/auto_args.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ macro_rules! metadata {
7070
};
7171
}
7272

73+
/// Build an RPC trait definition.
7374
#[macro_export]
7475
macro_rules! build_rpc_trait {
7576
(

macros/src/pubsub.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use jsonrpc_pubsub as pubsub;
77
use serde;
88
use util::to_value;
99

10-
use self::core::futures::{self, Sink as FuturesSink, sync};
10+
use self::core::futures::{self, Future, Sink as FuturesSink, sync};
1111

1212
pub use self::pubsub::SubscriptionId;
1313

@@ -30,7 +30,7 @@ impl<T, E> Subscriber<T, E> {
3030
/// Create new subscriber for tests.
3131
pub fn new_test<M: Into<String>>(method: M) -> (
3232
Self,
33-
sync::oneshot::Receiver<Result<SubscriptionId, core::Error>>,
33+
pubsub::oneshot::Receiver<Result<SubscriptionId, core::Error>>,
3434
sync::mpsc::Receiver<String>,
3535
) {
3636
let (subscriber, id, subscription) = pubsub::Subscriber::new_test(method);
@@ -42,18 +42,37 @@ impl<T, E> Subscriber<T, E> {
4242
self.subscriber.reject(error)
4343
}
4444

45+
/// Reject subscription with given error.
46+
pub fn reject_async(self, error: core::Error) -> impl Future<Item = (), Error = ()> {
47+
self.subscriber.reject_async(error)
48+
}
49+
4550
/// Assign id to this subscriber.
4651
/// This method consumes `Subscriber` and returns `Sink`
4752
/// if the connection is still open or error otherwise.
4853
pub fn assign_id(self, id: SubscriptionId) -> Result<Sink<T, E>, ()> {
49-
let sink = self.subscriber.assign_id(id.clone())?;
50-
Ok(Sink {
51-
id,
52-
sink,
53-
buffered: None,
54-
_data: PhantomData,
55-
})
54+
self.subscriber.assign_id(id.clone())
55+
.map(|sink| Sink {
56+
id,
57+
sink,
58+
buffered: None,
59+
_data: PhantomData,
60+
})
61+
}
62+
63+
/// Assign id to this subscriber.
64+
/// This method consumes `Subscriber` and returns `Sink`
65+
/// if the connection is still open or error otherwise.
66+
pub fn assign_id_async(self, id: SubscriptionId) -> impl Future<Item = Sink<T, E>, Error = ()> {
67+
self.subscriber.assign_id_async(id.clone())
68+
.map(|sink| Sink {
69+
id,
70+
sink,
71+
buffered: None,
72+
_data: PhantomData,
73+
})
5674
}
75+
5776
}
5877

5978
/// Subscriber sink.

pubsub/examples/pubsub.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ fn main() {
3434
return;
3535
}
3636

37-
let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
38-
// or subscriber.reject(Error {} );
39-
// or drop(subscriber)
4037
let is_done = is_done.clone();
4138
thread::spawn(move || {
39+
let sink = subscriber.assign_id_async(SubscriptionId::Number(5)).wait().unwrap();
40+
// or subscriber.reject(Error {} );
41+
// or drop(subscriber)
42+
4243
loop {
4344
if is_done.load(atomic::Ordering::AcqRel) {
4445
return;

pubsub/examples/pubsub_simple.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ fn main() {
3232
return;
3333
}
3434

35-
let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
36-
// or subscriber.reject(Error {} );
37-
// or drop(subscriber)
3835
thread::spawn(move || {
36+
let sink = subscriber.assign_id_async(SubscriptionId::Number(5)).wait().unwrap();
37+
// or subscriber.reject(Error {} );
38+
// or drop(subscriber)
39+
3940
loop {
4041
thread::sleep(time::Duration::from_millis(100));
4142
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {

pubsub/more-examples/examples/pubsub_ipc.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ fn main() {
3636
return;
3737
}
3838

39-
let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
40-
// or subscriber.reject(Error {} );
41-
// or drop(subscriber)
4239
thread::spawn(move || {
40+
let sink = subscriber.assign_id_async(SubscriptionId::Number(5)).wait().unwrap();
41+
// or subscriber.reject(Error {} );
42+
// or drop(subscriber)
43+
4344
loop {
4445
thread::sleep(time::Duration::from_millis(100));
4546
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {

0 commit comments

Comments
 (0)