Skip to content

Commit 3eda61d

Browse files
committed
finished ch 11
1 parent 57cc859 commit 3eda61d

File tree

13 files changed

+371
-1
lines changed

13 files changed

+371
-1
lines changed

ch11/a-async-await/Cargo.toml renamed to ch11/a-rust-futures/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "a-async-await"
2+
name = "a-rust-futures"
33
version = "0.1.0"
44
edition = "2021"
55

ch11/a-rust-futures/src/http.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use crate::runtime::{self, reactor};
2+
use mio::Interest;
3+
use std::{
4+
future::Future,
5+
io::{ErrorKind, Read, Write},
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
9+
10+
fn get_req(path: &str) -> String {
11+
format!(
12+
"GET {path} HTTP/1.1\r\n\
13+
Host: localhost\r\n\
14+
Connection: close\r\n\
15+
\r\n"
16+
)
17+
}
18+
19+
pub struct Http;
20+
21+
impl Http {
22+
pub fn get(path: &str) -> impl Future<Output = String> {
23+
HttpGetFuture::new(path.to_string())
24+
}
25+
}
26+
struct HttpGetFuture {
27+
stream: Option<mio::net::TcpStream>,
28+
buffer: Vec<u8>,
29+
path: String,
30+
id: usize,
31+
}
32+
33+
impl HttpGetFuture {
34+
fn new(path: String) -> Self {
35+
let id = reactor().next_id();
36+
Self {
37+
stream: None,
38+
buffer: vec![],
39+
path,
40+
id,
41+
}
42+
}
43+
44+
fn write_request(&mut self) {
45+
let stream = std::net::TcpStream::connect("127.0.0.1:8080").unwrap();
46+
stream.set_nonblocking(true).unwrap();
47+
let mut stream = mio::net::TcpStream::from_std(stream);
48+
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
49+
self.stream = Some(stream);
50+
}
51+
}
52+
53+
impl Future for HttpGetFuture {
54+
type Output = String;
55+
56+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
57+
// If this is first time polled, start the operation
58+
// see: https://users.rust-lang.org/t/is-it-bad-behaviour-for-a-future-or-stream-to-do-something-before-being-polled/61353
59+
// Avoid dns lookup this time
60+
//let this = self.get_mut();
61+
62+
let id = self.id;
63+
if self.stream.is_none() {
64+
println!("FIRST POLL - START OPERATION");
65+
self.write_request();
66+
// CHANGED
67+
let stream = (&mut self).stream.as_mut().unwrap();
68+
runtime::reactor().register(stream, Interest::READABLE, id);
69+
runtime::reactor().set_waker(cx, self.id);
70+
// ============
71+
}
72+
73+
let mut buff = vec![0u8; 147];
74+
loop {
75+
match self.stream.as_mut().unwrap().read(&mut buff) {
76+
Ok(0) => {
77+
let s = String::from_utf8_lossy(&self.buffer).to_string();
78+
runtime::reactor().deregister(self.stream.as_mut().unwrap(), id);
79+
break Poll::Ready(s.to_string());
80+
}
81+
Ok(n) => {
82+
self.buffer.extend(&buff[0..n]);
83+
continue;
84+
}
85+
Err(e) if e.kind() == ErrorKind::WouldBlock => {
86+
// always store the last given Waker
87+
runtime::reactor().set_waker(cx, self.id);
88+
break Poll::Pending;
89+
}
90+
91+
Err(e) => panic!("{e:?}"),
92+
}
93+
}
94+
}
95+
}

ch11/a-rust-futures/src/main.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
mod http;
2+
mod runtime;
3+
use crate::http::Http;
4+
5+
fn main() {
6+
let mut executor = runtime::init();
7+
executor.block_on(async_main());
8+
}
9+
10+
async fn async_main() {
11+
println!("Program starting");
12+
let txt = Http::get("/600/HelloAsyncAwait").await;
13+
println!("{txt}");
14+
let txt = Http::get("/400/HelloAsyncAwait").await;
15+
println!("{txt}");
16+
}
17+
18+
// use isahc::prelude::*;
19+
20+
// async fn async_main2() {
21+
// let mut buffer = String::from("\nBUFFER:\n----\n");
22+
// let writer = &mut buffer;
23+
// println!("Program starting");
24+
// let mut res = isahc::get_async("http://127.0.0.1:8080/600/HelloAsyncAwait").await.unwrap();
25+
// let txt = res.text().await.unwrap();
26+
// writeln!(writer, "{txt}").unwrap();
27+
// let mut res = isahc::get_async("http://127.0.0.1:8080/400/HelloAsyncAwait").await.unwrap();
28+
// let txt = res.text().await.unwrap();
29+
// writeln!(writer, "{txt}").unwrap();
30+
31+
// println!("{}", buffer);
32+
// }
33+
34+
// async fn spawn_many() {
35+
// for i in 0..100 {
36+
// let delay = i * 10;
37+
// let req = format!("http://127.0.0.1:8080/{delay}/HelloAsyncAwait{i}");
38+
39+
// runtime::spawn(async move {
40+
// let mut res = isahc::get_async(&req).await.unwrap();
41+
// let txt = res.text().await.unwrap();
42+
// println!("{txt}");
43+
// });
44+
// }
45+
// }
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use std::{
2+
cell::{Cell, RefCell},
3+
collections::HashMap,
4+
future::Future,
5+
pin::Pin,
6+
sync::{Arc, Mutex},
7+
task::{Context, Poll, Wake, Waker},
8+
thread::{self, Thread},
9+
};
10+
11+
type Task = Pin<Box<dyn Future<Output = ()>>>;
12+
13+
thread_local! {
14+
static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
15+
}
16+
17+
#[derive(Default)]
18+
struct ExecutorCore {
19+
tasks: RefCell<HashMap<usize, Task>>,
20+
ready_queue: Arc<Mutex<Vec<usize>>>,
21+
next_id: Cell<usize>,
22+
}
23+
24+
pub fn spawn<F>(future: F)
25+
where
26+
F: Future<Output = ()> + 'static,
27+
{
28+
CURRENT_EXEC.with(|e| {
29+
let id = e.next_id.get();
30+
e.tasks.borrow_mut().insert(id, Box::pin(future));
31+
e.ready_queue.lock().map(|mut q| q.push(id)).unwrap();
32+
e.next_id.set(id + 1);
33+
});
34+
}
35+
36+
pub struct Executor;
37+
38+
impl Executor {
39+
pub fn new() -> Self {
40+
Self {}
41+
}
42+
43+
fn pop_ready(&self) -> Option<usize> {
44+
CURRENT_EXEC.with(|q| q.ready_queue.lock().map(|mut q| q.pop()).unwrap())
45+
}
46+
47+
fn get_future(&self, id: usize) -> Option<Task> {
48+
CURRENT_EXEC.with(|q| q.tasks.borrow_mut().remove(&id))
49+
}
50+
51+
fn get_waker(&self, id: usize) -> Arc<MyWaker> {
52+
Arc::new(MyWaker {
53+
id,
54+
thread: thread::current(),
55+
ready_queue: CURRENT_EXEC.with(|q| q.ready_queue.clone()),
56+
})
57+
}
58+
59+
fn insert_task(&self, id: usize, task: Task) {
60+
CURRENT_EXEC.with(|q| q.tasks.borrow_mut().insert(id, task));
61+
}
62+
63+
fn task_count(&self) -> usize {
64+
CURRENT_EXEC.with(|q| q.tasks.borrow().len())
65+
}
66+
67+
pub fn block_on<F>(&mut self, future: F)
68+
where
69+
F: Future<Output = ()> + 'static,
70+
{
71+
// ===== OPTIMIZATION, ASSUME READY
72+
// let waker = self.get_waker(usize::MAX);
73+
// let mut future = future;
74+
// match future.poll(&waker) {
75+
// PollState::Pending => (),
76+
// PollState::Ready(_) => return,
77+
// }
78+
// ===== END
79+
80+
spawn(future);
81+
82+
loop {
83+
while let Some(id) = self.pop_ready() {
84+
let mut future = match self.get_future(id) {
85+
Some(f) => f,
86+
// guard against false wakeups
87+
None => continue,
88+
};
89+
90+
let waker: Waker = self.get_waker(id).into();
91+
let mut cx = Context::from_waker(&waker);
92+
93+
match future.as_mut().poll(&mut cx) {
94+
Poll::Pending => self.insert_task(id, future),
95+
Poll::Ready(_) => continue,
96+
}
97+
}
98+
99+
let task_count = self.task_count();
100+
let name = thread::current().name().unwrap_or_default().to_string();
101+
102+
if task_count > 0 {
103+
println!("{name}: {task_count} pending tasks. Sleep until notified.");
104+
thread::park();
105+
} else {
106+
println!("{name}: All tasks are finished");
107+
break;
108+
}
109+
}
110+
}
111+
}
112+
113+
#[derive(Clone)]
114+
pub struct MyWaker {
115+
thread: Thread,
116+
id: usize,
117+
ready_queue: Arc<Mutex<Vec<usize>>>,
118+
}
119+
120+
impl Wake for MyWaker {
121+
fn wake(self: Arc<Self>) {
122+
self.ready_queue
123+
.lock()
124+
.map(|mut q| q.push(self.id))
125+
.unwrap();
126+
self.thread.unpark();
127+
}
128+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
2+
use std::{
3+
collections::HashMap,
4+
sync::{
5+
atomic::{AtomicUsize, Ordering},
6+
Arc, Mutex, OnceLock,
7+
},
8+
thread, task::{Context, Waker},
9+
};
10+
11+
12+
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
13+
14+
static REACTOR: OnceLock<Reactor> = OnceLock::new();
15+
16+
pub fn reactor() -> &'static Reactor {
17+
REACTOR.get().expect("Called outside an runtime context")
18+
}
19+
20+
pub struct Reactor {
21+
wakers: Wakers,
22+
registry: Registry,
23+
next_id: AtomicUsize,
24+
}
25+
26+
impl Reactor {
27+
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
28+
self.registry.register(stream, Token(id), interest).unwrap();
29+
}
30+
31+
pub fn set_waker(&self, cx: &Context, id: usize) {
32+
let _ = self
33+
.wakers
34+
.lock()
35+
.map(|mut w| w.insert(id, cx.waker().clone()).is_none())
36+
.unwrap();
37+
}
38+
39+
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
40+
self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
41+
self.registry.deregister(stream).unwrap();
42+
}
43+
44+
pub fn next_id(&self) -> usize {
45+
self.next_id.fetch_add(1, Ordering::Relaxed)
46+
}
47+
}
48+
49+
fn event_loop(mut poll: Poll, wakers: Wakers) {
50+
let mut events = Events::with_capacity(100);
51+
loop {
52+
poll.poll(&mut events, None).unwrap();
53+
for e in events.iter() {
54+
// Optimization for Windows since we get unneeded wakeups
55+
// if !e.is_readable() && e.is_read_closed() {
56+
// continue;
57+
// }
58+
let Token(id) = e.token();
59+
let wakers = wakers.lock().unwrap();
60+
61+
if let Some(waker) = wakers.get(&id) {
62+
waker.wake_by_ref();
63+
}
64+
}
65+
}
66+
}
67+
68+
pub fn start() {
69+
use thread::spawn;
70+
let wakers = Arc::new(Mutex::new(HashMap::new()));
71+
let poll = Poll::new().unwrap();
72+
let registry = poll.registry().try_clone().unwrap();
73+
let next_id = AtomicUsize::new(1);
74+
let reactor = Reactor {
75+
wakers: wakers.clone(),
76+
registry,
77+
next_id,
78+
};
79+
80+
REACTOR.set(reactor).ok().expect("Reactor already running");
81+
spawn(move || event_loop(poll, wakers));
82+
}

ch11/b-rust-futures-bonus/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[package]
2+
name = "b-rust-futures-bonus"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
isahc = "1.7"
10+
mio = { version = "0.8", features = ["net", "os-poll"] }

0 commit comments

Comments
 (0)