Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Commit af30afe

Browse files
authored
Tokio current thread runtime (#3)
* tokio current thread Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * tokio current thread benches Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
1 parent b986cb2 commit af30afe

File tree

3 files changed

+95
-12
lines changed

3 files changed

+95
-12
lines changed

benches/tokio.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ mod common;
99
mod tokio {
1010
benchmark_suite!(runtime_tokio::Tokio);
1111
}
12+
13+
mod tokio_current_thread {
14+
benchmark_suite!(runtime_tokio::TokioCurrentThread);
15+
}

runtime-tokio/src/lib.rs

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,32 @@ use lazy_static::lazy_static;
2020
use std::io;
2121
use std::net::SocketAddr;
2222
use std::pin::Pin;
23+
use std::sync::{mpsc, Mutex};
24+
use std::thread;
2325

2426
mod tcp;
2527
mod udp;
2628

2729
use tcp::{TcpListener, TcpStream};
2830
use udp::UdpSocket;
2931

30-
lazy_static! {
31-
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = {
32-
tokio::runtime::Builder::new()
33-
.after_start(|| {
34-
runtime_raw::set_runtime(&Tokio);
35-
})
36-
.build()
37-
.unwrap()
38-
};
39-
}
40-
41-
/// The Tokio runtime.
32+
/// The default Tokio runtime.
4233
#[derive(Debug)]
4334
pub struct Tokio;
4435

4536
impl runtime_raw::Runtime for Tokio {
4637
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> {
38+
lazy_static! {
39+
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = {
40+
tokio::runtime::Builder::new()
41+
.after_start(|| {
42+
runtime_raw::set_runtime(&Tokio);
43+
})
44+
.build()
45+
.unwrap()
46+
};
47+
}
48+
4749
TOKIO_RUNTIME
4850
.executor()
4951
.spawn(Compat::new(fut.map(|_| Ok(()))));
@@ -81,3 +83,70 @@ impl runtime_raw::Runtime for Tokio {
8183
Ok(Box::pin(UdpSocket { tokio_socket }))
8284
}
8385
}
86+
87+
/// The single-threaded Tokio runtime based on `tokio-current-thread`.
88+
#[derive(Debug)]
89+
pub struct TokioCurrentThread;
90+
91+
impl runtime_raw::Runtime for TokioCurrentThread {
92+
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> {
93+
lazy_static! {
94+
static ref TOKIO_RUNTIME: Mutex<tokio::runtime::current_thread::Handle> = {
95+
let (tx, rx) = mpsc::channel();
96+
97+
thread::spawn(move || {
98+
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
99+
let handle = rt.handle();
100+
tx.send(handle).unwrap();
101+
102+
runtime_raw::set_runtime(&TokioCurrentThread);
103+
let forever = futures01::future::poll_fn(|| {
104+
Ok::<futures01::Async<()>, ()>(futures01::Async::NotReady)
105+
});
106+
rt.block_on(forever).unwrap();
107+
});
108+
109+
let handle = rx.recv().unwrap();
110+
Mutex::new(handle)
111+
};
112+
}
113+
114+
TOKIO_RUNTIME
115+
.lock()
116+
.unwrap()
117+
.spawn(Compat::new(fut.map(|_| Ok(()))))
118+
.unwrap();
119+
Ok(())
120+
}
121+
122+
fn connect_tcp_stream(
123+
&self,
124+
addr: &SocketAddr,
125+
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>
126+
{
127+
use futures::compat::Compat01As03;
128+
use futures01::Future;
129+
130+
let tokio_connect = tokio::net::TcpStream::connect(addr);
131+
let connect = tokio_connect.map(|tokio_stream| {
132+
Box::pin(TcpStream { tokio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
133+
});
134+
Box::pin(Compat01As03::new(connect))
135+
}
136+
137+
fn bind_tcp_listener(
138+
&self,
139+
addr: &SocketAddr,
140+
) -> io::Result<Pin<Box<dyn runtime_raw::TcpListener>>> {
141+
let tokio_listener = tokio::net::TcpListener::bind(&addr)?;
142+
Ok(Box::pin(TcpListener { tokio_listener }))
143+
}
144+
145+
fn bind_udp_socket(
146+
&self,
147+
addr: &SocketAddr,
148+
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
149+
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
150+
Ok(Box::pin(UdpSocket { tokio_socket }))
151+
}
152+
}

tests/tokio-current-thread.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#![feature(async_await, await_macro, futures_api)]
2+
3+
#[runtime::test(runtime_tokio::TokioCurrentThread)]
4+
async fn spawn() {
5+
let handle = runtime::spawn(async {
6+
println!("hello planet from Tokio current-thread");
7+
42
8+
});
9+
assert_eq!(await!(handle), 42);
10+
}

0 commit comments

Comments
 (0)