Skip to content

Commit 525a29f

Browse files
seunlanlegeTyera Eulberg
authored andcommitted
Drop timeout if max concurrent timeouts reached (paritytech#324)
* closes paritytech#322 * refactored error handling * proper loop handling * schedule SuspendableStream on timer error
1 parent 527e25a commit 525a29f

File tree

1 file changed

+18
-14
lines changed

1 file changed

+18
-14
lines changed

server-utils/src/suspendable_stream.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,34 @@ impl<S, I> Stream for SuspendableStream<S>
4040
type Error = ();
4141

4242
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, ()> {
43-
if let Some(mut timeout) = self.timeout.take() {
44-
match timeout.poll() {
45-
Ok(Async::Ready(_)) => {}
46-
Ok(Async::NotReady) => {
47-
self.timeout = Some(timeout);
48-
return Ok(Async::NotReady);
43+
loop {
44+
if let Some(mut timeout) = self.timeout.take() {
45+
match timeout.poll() {
46+
Ok(Async::Ready(_)) => {}
47+
Ok(Async::NotReady) => {
48+
self.timeout = Some(timeout);
49+
return Ok(Async::NotReady);
50+
}
51+
Err(err) => {
52+
warn!("Timeout error {:?}", err);
53+
task::current().notify();
54+
return Ok(Async::NotReady);
55+
}
4956
}
50-
Err(_) => unreachable!("Polling a delay shouldn't yield any errors; qed")
5157
}
52-
}
5358

54-
loop {
5559
match self.stream.poll() {
5660
Ok(item) => {
5761
if self.next_delay > self.initial_delay {
5862
self.next_delay = self.initial_delay;
5963
}
6064
return Ok(item)
61-
},
62-
Err(ref e) => if connection_error(e) {
63-
warn!("Connection Error: {:?}", e);
64-
continue
6565
}
66-
Err(err) => {
66+
Err(ref err) => {
67+
if connection_error(err) {
68+
warn!("Connection Error: {:?}", err);
69+
continue
70+
}
6771
self.next_delay = if self.next_delay < self.max_delay {
6872
self.next_delay * 2
6973
} else {

0 commit comments

Comments
 (0)