Skip to content

Commit d45e13a

Browse files
committed
async: upgrade tokio to 1.0
upgrade tokio from 0.2 to 1.0 Signed-off-by: Tim Zhang <tim@hyper.sh>
1 parent a9c9032 commit d45e13a

File tree

7 files changed

+80
-28
lines changed

7 files changed

+80
-28
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ byteorder = "1.3.2"
2020
thiserror = "1.0"
2121

2222
async-trait = { version = "0.1.31", optional = true }
23-
tokio = { version = "0.2", features = ["rt-threaded", "sync", "uds", "stream", "macros", "io-util", "time"], optional = true }
23+
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
2424
futures = { version = "0.3", optional = true }
25-
tokio-vsock = { version = "0.2.1", optional = true }
25+
tokio-vsock = { version = "0.3", optional = true }
2626

2727
[build-dependencies]
2828
protobuf-codegen-pure = "2.14.0"

src/asynchronous/client.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ use crate::asynchronous::stream::{receive, to_req_buf};
1818
use crate::r#async::utils;
1919
use tokio::{
2020
self,
21-
io::split,
22-
prelude::*,
21+
io::{split, AsyncWriteExt},
2322
sync::mpsc::{channel, Receiver, Sender},
2423
sync::Notify,
24+
time::timeout,
2525
};
2626

2727
type RequestSender = Sender<(Vec<u8>, Sender<Result<Vec<u8>>>)>;
@@ -54,7 +54,7 @@ impl Client {
5454
tokio::spawn(async move {
5555
let mut stream_id: u32 = 1;
5656

57-
while let Some((body, mut resp_tx)) = rx.recv().await {
57+
while let Some((body, resp_tx)) = rx.recv().await {
5858
let current_stream_id = stream_id;
5959
stream_id += 2;
6060

@@ -84,7 +84,7 @@ impl Client {
8484

8585
// rx.recv will abort when client.req_tx and client is dropped.
8686
// notify the response-receiver to quit at this time.
87-
notify.notify();
87+
notify.notify_one();
8888
});
8989

9090
// Response receiver
@@ -99,7 +99,7 @@ impl Client {
9999
match res {
100100
Ok((header, body)) => {
101101
tokio::spawn(async move {
102-
let mut resp_tx2;
102+
let resp_tx2;
103103
{
104104
let mut map = req_map.lock().unwrap();
105105
let resp_tx = match map.get(&header.stream_id) {
@@ -163,14 +163,13 @@ impl Client {
163163
.await
164164
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?
165165
} else {
166-
let timeout = tokio::time::delay_for(Duration::from_nanos(req.timeout_nano as u64));
167-
168-
tokio::select! {
169-
result = rx.recv() => {
170-
result.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?
171-
}
172-
_ = timeout => {
173-
return Err(Error::Others("Recive packet from recver error: timeout".to_string()));
166+
match timeout(Duration::from_nanos(req.timeout_nano as u64), rx.recv()).await {
167+
Ok(result) => result
168+
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?,
169+
Err(_) => {
170+
return Err(Error::Others(
171+
"Recive packet from recver error: timeout".to_string(),
172+
))
174173
}
175174
}
176175
};

src/asynchronous/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod server;
1010
pub mod stream;
1111
#[macro_use]
1212
pub mod utils;
13+
mod unix_incoming;
1314

1415
#[doc(inline)]
1516
pub use crate::r#async::client::Client;

src/asynchronous/server.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,21 @@ use std::os::unix::io::RawFd;
1010
use std::sync::Arc;
1111

1212
use crate::asynchronous::stream::{receive, respond, respond_with_status};
13+
use crate::asynchronous::unix_incoming::UnixIncoming;
1314
use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
1415
use crate::error::{get_status, Error, Result};
1516
use crate::r#async::{MethodHandler, TtrpcContext};
1617
use crate::ttrpc::{Code, Request};
1718
use crate::MessageHeader;
19+
use futures::stream::Stream;
1820
use futures::StreamExt as _;
1921
use std::marker::Unpin;
2022
use std::os::unix::io::{AsRawFd, FromRawFd};
2123
use std::os::unix::net::UnixListener as SysUnixListener;
2224
use tokio::{
2325
self,
24-
io::split,
26+
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt},
2527
net::UnixListener,
26-
prelude::*,
27-
stream::Stream,
2828
sync::mpsc::{channel, Receiver, Sender},
2929
sync::watch,
3030
};
@@ -116,9 +116,15 @@ impl Server {
116116
unsafe {
117117
sys_unix_listener = SysUnixListener::from_raw_fd(listenfd);
118118
}
119-
let unix_listener = UnixListener::from_std(sys_unix_listener).unwrap();
119+
sys_unix_listener
120+
.set_nonblocking(true)
121+
.map_err(err_to_others_err!(e, "set_nonblocking error "))?;
122+
let unix_listener = UnixListener::from_std(sys_unix_listener)
123+
.map_err(err_to_others_err!(e, "from_std error "))?;
120124

121-
self.do_start(listenfd, unix_listener).await
125+
let incoming = UnixIncoming::new(unix_listener);
126+
127+
self.do_start(listenfd, incoming).await
122128
}
123129
Some(Domain::Vsock) => {
124130
let incoming;
@@ -201,10 +207,10 @@ impl Server {
201207
}
202208
}
203209
}
204-
v = close_conn_rx.recv() => {
210+
v = close_conn_rx.changed() => {
205211
// 0 is the init value of this watch, not a valid signal
206-
// is_none means the tx was dropped.
207-
if v.is_none() || v.unwrap() != 0 {
212+
// is_err means the tx was dropped.
213+
if v.is_err() || *close_conn_rx.borrow() != 0 {
208214
info!("Stop accepting new connections.");
209215
break;
210216
}
@@ -227,7 +233,7 @@ impl Server {
227233
}
228234
}
229235
fd_tx = stop_listen_rx.recv() => {
230-
if let Some(mut fd_tx) = fd_tx {
236+
if let Some(fd_tx) = fd_tx {
231237
// dup fd to keep the listener open
232238
// or the listener will be closed when the incoming was dropped.
233239
let dup_fd = unistd::dup(incoming.as_raw_fd()).unwrap();
@@ -254,7 +260,7 @@ impl Server {
254260

255261
pub async fn disconnect(&mut self) {
256262
if let Some(tx) = self.disconnect_tx.take() {
257-
tx.broadcast(1).ok();
263+
tx.send(1).ok();
258264
}
259265

260266
if let Some(mut rx) = self.all_conn_done_rx.take() {
@@ -263,7 +269,7 @@ impl Server {
263269
}
264270

265271
pub async fn stop_listen(&mut self) {
266-
if let Some(mut tx) = self.stop_listen_tx.take() {
272+
if let Some(tx) = self.stop_listen_tx.take() {
267273
let (fd_tx, mut fd_rx) = channel(1);
268274
tx.send(fd_tx).await.unwrap();
269275

src/asynchronous/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ fn get_response_body(res: &Response) -> Result<Vec<u8>> {
120120
}
121121

122122
pub async fn respond(
123-
mut tx: tokio::sync::mpsc::Sender<Vec<u8>>,
123+
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
124124
stream_id: u32,
125125
body: Vec<u8>,
126126
) -> Result<()> {
@@ -132,7 +132,7 @@ pub async fn respond(
132132
}
133133

134134
pub async fn respond_with_status(
135-
mut tx: tokio::sync::mpsc::Sender<Vec<u8>>,
135+
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
136136
stream_id: u32,
137137
status: Status,
138138
) -> Result<()> {

src/asynchronous/unix_incoming.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2021 Ant Group
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
//! Because Tokio has removed UnixIncoming since version 0.3,
7+
//! we define the UnixIncoming and implement the Stream for UnixIncoming.
8+
9+
use std::io;
10+
use std::os::unix::io::{AsRawFd, RawFd};
11+
use std::pin::Pin;
12+
use std::task::{Context, Poll};
13+
14+
use futures::{ready, Stream};
15+
use tokio::net::{UnixListener, UnixStream};
16+
17+
/// Stream of listeners
18+
#[derive(Debug)]
19+
#[must_use = "streams do nothing unless polled"]
20+
pub struct UnixIncoming {
21+
inner: UnixListener,
22+
}
23+
24+
impl UnixIncoming {
25+
pub fn new(listener: UnixListener) -> Self {
26+
Self { inner: listener }
27+
}
28+
}
29+
30+
impl Stream for UnixIncoming {
31+
type Item = io::Result<UnixStream>;
32+
33+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34+
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
35+
Poll::Ready(Some(Ok(socket)))
36+
}
37+
}
38+
39+
impl AsRawFd for UnixIncoming {
40+
fn as_raw_fd(&self) -> RawFd {
41+
self.inner.as_raw_fd()
42+
}
43+
}

src/asynchronous/utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,8 @@ pub fn new_unix_stream_from_raw_fd(fd: RawFd) -> UnixStream {
121121
unsafe {
122122
std_stream = std::os::unix::net::UnixStream::from_raw_fd(fd);
123123
}
124+
// Notice: There is a big change between tokio 1.0 and 0.2
125+
// we must set nonblocking by ourselves in tokio 1.0
126+
std_stream.set_nonblocking(true).unwrap();
124127
UnixStream::from_std(std_stream).unwrap()
125128
}

0 commit comments

Comments
 (0)