Skip to content

Commit ddddfb5

Browse files
Mikhail ZabaluevNicolasDP
authored andcommitted
Convert the client task to async, use bounded channels in intercom (#1178)
* Convert the client task to async Remove the future wait calls in the client task, closing the gap between async Blockchain API and async streams to and from the network task. As a collateral fix, intercom::ReplyStreamHandle is converted to use a bounded sender under the hood, removing one cause of unbounded RAM usage. * Simplify async_msg::SendTask No need for a loop here. * blockchain: Relax back the error on get_ref()
1 parent df3f83e commit ddddfb5

File tree

9 files changed

+461
-241
lines changed

9 files changed

+461
-241
lines changed

jormungandr/src/client.rs

Lines changed: 157 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use crate::blockcfg::{Block, Header, HeaderHash};
22
use crate::blockchain::{Storage, Tip};
3-
use crate::intercom::{do_stream_reply, ClientMsg, Error, ReplyStreamHandle};
4-
use crate::utils::task::{Input, ThreadServiceInfo};
3+
use crate::intercom::{ClientMsg, Error, ReplyStreamHandle};
4+
use crate::utils::task::{Input, TokioServiceInfo};
55
use chain_core::property::HasHeader;
6-
use chain_storage::store;
76

8-
use futures::future::Either;
7+
use futures::future::{Either, FutureResult};
98
use tokio::prelude::*;
109

1110
pub struct TaskData {
@@ -14,165 +13,201 @@ pub struct TaskData {
1413
pub blockchain_tip: Tip,
1514
}
1615

17-
pub fn handle_input(_info: &ThreadServiceInfo, task_data: &mut TaskData, input: Input<ClientMsg>) {
16+
enum TaskAction<
17+
GetBlockTip,
18+
GetHeaders,
19+
GetHeadersRange,
20+
GetBlocks,
21+
GetBlocksRange,
22+
PullBlocksToTip,
23+
> {
24+
Shutdown(FutureResult<(), ()>),
25+
GetBlockTip(GetBlockTip),
26+
GetHeaders(GetHeaders),
27+
GetHeadersRange(GetHeadersRange),
28+
GetBlocks(GetBlocks),
29+
GetBlocksRange(GetBlocksRange),
30+
PullBlocksToTip(PullBlocksToTip),
31+
}
32+
33+
impl<
34+
GetBlockTip: Future<Item = (), Error = ()>,
35+
GetHeaders: Future<Item = (), Error = ()>,
36+
GetHeadersRange: Future<Item = (), Error = ()>,
37+
GetBlocks: Future<Item = (), Error = ()>,
38+
GetBlocksRange: Future<Item = (), Error = ()>,
39+
PullBlocksToTip: Future<Item = (), Error = ()>,
40+
> Future
41+
for TaskAction<
42+
GetBlockTip,
43+
GetHeaders,
44+
GetHeadersRange,
45+
GetBlocks,
46+
GetBlocksRange,
47+
PullBlocksToTip,
48+
>
49+
{
50+
type Item = ();
51+
type Error = ();
52+
53+
fn poll(&mut self) -> Poll<(), ()> {
54+
use self::TaskAction::*;
55+
56+
match self {
57+
Shutdown(fut) => fut.poll(),
58+
GetBlockTip(fut) => fut.poll(),
59+
GetHeaders(fut) => fut.poll(),
60+
GetHeadersRange(fut) => fut.poll(),
61+
GetBlocks(fut) => fut.poll(),
62+
GetBlocksRange(fut) => fut.poll(),
63+
PullBlocksToTip(fut) => fut.poll(),
64+
}
65+
}
66+
}
67+
68+
pub fn handle_input(
69+
_info: &TokioServiceInfo,
70+
task_data: &mut TaskData,
71+
input: Input<ClientMsg>,
72+
) -> impl Future<Item = (), Error = ()> {
1873
let cquery = match input {
19-
Input::Shutdown => return,
74+
Input::Shutdown => return TaskAction::Shutdown(Ok(()).into()),
2075
Input::Input(msg) => msg,
2176
};
2277

2378
match cquery {
24-
ClientMsg::GetBlockTip(handler) => {
25-
handler.reply(handle_get_block_tip(&task_data.blockchain_tip))
79+
ClientMsg::GetBlockTip(handle) => {
80+
TaskAction::GetBlockTip(handle.async_reply(get_block_tip(&task_data.blockchain_tip)))
2681
}
27-
ClientMsg::GetHeaders(ids, handler) => do_stream_reply(handler, |handler| {
28-
handle_get_headers(&task_data.storage, ids, handler)
29-
}),
30-
ClientMsg::GetHeadersRange(checkpoints, to, handler) => {
31-
do_stream_reply(handler, |handler| {
32-
handle_get_headers_range(&task_data.storage, checkpoints, to, handler)
33-
})
82+
ClientMsg::GetHeaders(ids, handle) => {
83+
TaskAction::GetHeaders(handle.async_reply(get_headers(task_data.storage.clone(), ids)))
84+
}
85+
ClientMsg::GetHeadersRange(checkpoints, to, handle) => TaskAction::GetHeadersRange(
86+
handle_get_headers_range(task_data.storage.clone(), checkpoints, to, handle),
87+
),
88+
ClientMsg::GetBlocks(ids, handle) => {
89+
TaskAction::GetBlocks(handle.async_reply(get_blocks(task_data.storage.clone(), ids)))
90+
}
91+
ClientMsg::GetBlocksRange(from, to, handle) => TaskAction::GetBlocksRange(
92+
handle_get_blocks_range(&task_data.storage, from, to, handle),
93+
),
94+
ClientMsg::PullBlocksToTip(from, handle) => {
95+
TaskAction::PullBlocksToTip(handle_pull_blocks_to_tip(
96+
task_data.storage.clone(),
97+
task_data.blockchain_tip.clone(),
98+
from,
99+
handle,
100+
))
34101
}
35-
ClientMsg::GetBlocks(ids, handler) => do_stream_reply(handler, |handler| {
36-
handle_get_blocks(&task_data.storage, ids, handler)
37-
}),
38-
ClientMsg::GetBlocksRange(from, to, handler) => do_stream_reply(handler, |handler| {
39-
handle_get_blocks_range(&task_data.storage, from, to, handler)
40-
}),
41-
ClientMsg::PullBlocksToTip(from, handler) => do_stream_reply(handler, |handler| {
42-
handle_pull_blocks_to_tip(&task_data.storage, &task_data.blockchain_tip, from, handler)
43-
}),
44102
}
45103
}
46104

47-
fn handle_get_block_tip(blockchain_tip: &Tip) -> Result<Header, Error> {
48-
let blockchain_tip = blockchain_tip.get_ref::<Error>().wait().unwrap();
49-
50-
Ok(blockchain_tip.header().clone())
105+
fn get_block_tip(blockchain_tip: &Tip) -> impl Future<Item = Header, Error = Error> {
106+
blockchain_tip
107+
.get_ref()
108+
.and_then(|tip| Ok(tip.header().clone()))
51109
}
52110

53-
const MAX_HEADERS: u64 = 2000;
54-
55111
fn handle_get_headers_range(
56-
storage: &Storage,
112+
storage: Storage,
57113
checkpoints: Vec<HeaderHash>,
58114
to: HeaderHash,
59-
reply: &mut ReplyStreamHandle<Header>,
60-
) -> Result<(), Error> {
61-
let future = storage
115+
handle: ReplyStreamHandle<Header>,
116+
) -> impl Future<Item = (), Error = ()> {
117+
storage
62118
.find_closest_ancestor(checkpoints, to)
63-
.map_err(|e| e.into())
119+
.map_err(Into::into)
64120
.and_then(move |maybe_ancestor| match maybe_ancestor {
65-
Some(from) => Either::A(storage.stream_from_to(from, to).map_err(|e| e.into())),
121+
Some(from) => Either::A(storage.stream_from_to(from, to).map_err(Into::into)),
66122
None => Either::B(future::err(Error::not_found(
67123
"none of the checkpoints found in the local storage \
68124
are ancestors of the requested end block",
69125
))),
70126
})
71-
.and_then(move |stream| {
72-
// Send headers up to the maximum
73-
stream
74-
.map_err(|e| e.into())
75-
.take(MAX_HEADERS)
76-
.for_each(move |block| {
77-
reply
78-
.send(block.header())
79-
.map_err(|_| Error::failed("failed to send reply"))?;
80-
Ok(())
81-
})
82-
});
83-
84-
future.wait()
127+
.then(move |res| match res {
128+
Ok(stream) => {
129+
let stream = stream.map_err(Into::into).map(move |block| block.header());
130+
Either::A(handle.async_reply(stream))
131+
}
132+
Err(e) => Either::B(handle.async_error(e)),
133+
})
85134
}
86135

87136
fn handle_get_blocks_range(
88137
storage: &Storage,
89138
from: HeaderHash,
90139
to: HeaderHash,
91-
reply: &mut ReplyStreamHandle<Block>,
92-
) -> Result<(), Error> {
93-
// FIXME: remove double locking
94-
let storage = storage.get_inner().wait().unwrap();
95-
96-
// FIXME: include the from block
97-
98-
for x in store::iterate_range(&*storage, &from, &to)? {
99-
let info = x?;
100-
let (blk, _) = storage.get_block(&info.block_hash)?;
101-
if let Err(_) = reply.send(blk) {
102-
break;
140+
handle: ReplyStreamHandle<Block>,
141+
) -> impl Future<Item = (), Error = ()> {
142+
storage.stream_from_to(from, to).then(move |res| match res {
143+
Ok(stream) => {
144+
let stream = stream.map_err(Into::into);
145+
Either::A(handle.async_reply(stream))
103146
}
104-
}
105-
106-
Ok(())
147+
Err(e) => Either::B(handle.async_error(e.into())),
148+
})
107149
}
108150

109-
fn handle_get_blocks(
110-
storage: &Storage,
111-
ids: Vec<HeaderHash>,
112-
reply: &mut ReplyStreamHandle<Block>,
113-
) -> Result<(), Error> {
114-
for id in ids.into_iter() {
115-
if let Some(blk) = storage.get(id).wait()? {
116-
if let Err(_) = reply.send(blk) {
117-
break;
118-
}
119-
} else {
120-
// TODO: reply this hash was not found?
121-
}
122-
}
123-
124-
Ok(())
151+
fn get_blocks(storage: Storage, ids: Vec<HeaderHash>) -> impl Stream<Item = Block, Error = Error> {
152+
stream::iter_ok(ids).and_then(move |id| {
153+
storage
154+
.get(id)
155+
.map_err(Into::into)
156+
.and_then(move |maybe_block| match maybe_block {
157+
Some(block) => Ok(block),
158+
None => Err(Error::not_found(format!(
159+
"block {} is not known to this node",
160+
id
161+
))),
162+
})
163+
})
125164
}
126165

127-
fn handle_get_headers(
128-
storage: &Storage,
166+
fn get_headers(
167+
storage: Storage,
129168
ids: Vec<HeaderHash>,
130-
reply: &mut ReplyStreamHandle<Header>,
131-
) -> Result<(), Error> {
132-
for id in ids.into_iter() {
133-
if let Some(blk) = storage.get(id).wait()? {
134-
if let Err(_) = reply.send(blk.header()) {
135-
break;
136-
}
137-
} else {
138-
// TODO: reply this hash was not found?
139-
}
140-
}
141-
142-
Ok(())
169+
) -> impl Stream<Item = Header, Error = Error> {
170+
stream::iter_ok(ids).and_then(move |id| {
171+
storage
172+
.get(id)
173+
.map_err(Into::into)
174+
.and_then(move |maybe_block| match maybe_block {
175+
Some(block) => Ok(block.header()),
176+
None => Err(Error::not_found(format!(
177+
"block {} is not known to this node",
178+
id
179+
))),
180+
})
181+
})
143182
}
144183

145184
fn handle_pull_blocks_to_tip(
146-
storage: &Storage,
147-
blockchain_tip: &Tip,
185+
storage: Storage,
186+
blockchain_tip: Tip,
148187
checkpoints: Vec<HeaderHash>,
149-
reply: &mut ReplyStreamHandle<Block>,
150-
) -> Result<(), Error> {
151-
let tip = blockchain_tip.get_ref::<Error>().wait().unwrap();
152-
let tip_hash = tip.hash();
153-
154-
let future = storage
155-
.find_closest_ancestor(checkpoints, tip_hash)
156-
.map_err(|e| e.into())
157-
.and_then(move |maybe_ancestor| match maybe_ancestor {
158-
Some(from) => Either::A(storage.stream_from_to(from, tip_hash).map_err(|e| e.into())),
188+
handle: ReplyStreamHandle<Block>,
189+
) -> impl Future<Item = (), Error = ()> {
190+
blockchain_tip
191+
.get_ref()
192+
.and_then(move |tip| {
193+
let tip_hash = tip.hash();
194+
storage
195+
.find_closest_ancestor(checkpoints, tip_hash)
196+
.map_err(Into::into)
197+
.map(move |maybe_ancestor| (storage, maybe_ancestor, tip_hash))
198+
})
199+
.and_then(move |(storage, maybe_ancestor, to)| match maybe_ancestor {
200+
Some(from) => Either::A(storage.stream_from_to(from, to).map_err(Into::into)),
159201
None => Either::B(future::err(Error::not_found(
160202
"none of the checkpoints found in the local storage \
161203
are ancestors of the current tip",
162204
))),
163205
})
164-
.and_then(move |stream| {
165-
// Send headers up to the maximum
166-
stream
167-
.map_err(|e| e.into())
168-
.take(MAX_HEADERS)
169-
.for_each(move |block| {
170-
reply
171-
.send(block)
172-
.map_err(|_| Error::failed("failed to send reply"))?;
173-
Ok(())
174-
})
175-
});
176-
177-
future.wait()
206+
.then(move |res| match res {
207+
Ok(stream) => {
208+
let stream = stream.map_err(Into::into);
209+
Either::A(handle.async_reply(stream))
210+
}
211+
Err(e) => Either::B(handle.async_error(e)),
212+
})
178213
}

0 commit comments

Comments
 (0)