Skip to content

Commit a2d5fe9

Browse files
committed
final example finished, just had to double check that the changes were as easy as I hoped
1 parent cde4825 commit a2d5fe9

File tree

7 files changed

+512
-0
lines changed

7 files changed

+512
-0
lines changed

ch11/a-async-await/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "a-async-await"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
mio = { version = "0.8", features = ["net", "os-poll"] }

ch11/a-async-await/src/future.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// NEW
2+
use std::{future::Future, pin::Pin, task::{Poll, Context}};
3+
4+
5+
pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
6+
let futures = futures.into_iter().map(|f| (false, Box::pin(f))).collect();
7+
JoinAll {
8+
futures,
9+
finished_count: 0,
10+
}
11+
}
12+
13+
pub struct JoinAll<F: Future> {
14+
futures: Vec<(bool, Pin<Box<F>>)>,
15+
finished_count: usize,
16+
}
17+
18+
impl<F: Future> Future for JoinAll<F> {
19+
type Output = String;
20+
fn poll(mut self: Pin<&mut Self>, waker: &mut Context) -> Poll<Self::Output> {
21+
let Self {
22+
futures,
23+
finished_count,
24+
} = &mut *self;
25+
for (finished, fut) in futures.iter_mut() {
26+
if *finished {
27+
continue;
28+
}
29+
30+
match fut.as_mut().poll(waker) {
31+
Poll::Ready(_) => {
32+
*finished = true;
33+
*finished_count += 1;
34+
}
35+
36+
Poll::Pending => continue,
37+
}
38+
}
39+
40+
if self.finished_count == self.futures.len() {
41+
Poll::Ready(String::new())
42+
} else {
43+
Poll::Pending
44+
}
45+
}
46+
}

ch11/a-async-await/src/http.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::{
2+
io::{ErrorKind, Read, Write},
3+
pin::Pin,
4+
task::{Poll, Context},
5+
};
6+
7+
use mio::Interest;
8+
9+
use crate::{
10+
runtime::{self, reactor},
11+
Future,
12+
};
13+
14+
fn get_req(path: &str) -> String {
15+
format!(
16+
"GET {path} HTTP/1.1\r\n\
17+
Host: localhost\r\n\
18+
Connection: close\r\n\
19+
\r\n"
20+
)
21+
}
22+
23+
pub struct Http;
24+
25+
impl Http {
26+
pub fn get(path: &str) -> impl Future<Output = String> {
27+
HttpGetFuture::new(path.to_string())
28+
}
29+
}
30+
struct HttpGetFuture {
31+
stream: Option<mio::net::TcpStream>,
32+
buffer: Vec<u8>,
33+
path: String,
34+
id: usize,
35+
}
36+
37+
impl HttpGetFuture {
38+
fn new(path: String) -> Self {
39+
let id = reactor().next_id();
40+
Self {
41+
stream: None,
42+
buffer: vec![],
43+
path,
44+
id,
45+
}
46+
}
47+
48+
fn write_request(&mut self) {
49+
let stream = std::net::TcpStream::connect("127.0.0.1:8080").unwrap();
50+
stream.set_nonblocking(true).unwrap();
51+
let mut stream = mio::net::TcpStream::from_std(stream);
52+
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
53+
self.stream = Some(stream);
54+
}
55+
}
56+
57+
impl Future for HttpGetFuture {
58+
type Output = String;
59+
60+
fn poll(mut self: Pin<&mut Self>, waker: &mut Context) -> Poll<Self::Output> {
61+
// If this is first time polled, start the operation
62+
// see: https://users.rust-lang.org/t/is-it-bad-behaviour-for-a-future-or-stream-to-do-something-before-being-polled/61353
63+
// Avoid dns lookup this time
64+
//let this = self.get_mut();
65+
66+
let id = self.id;
67+
if self.stream.is_none() {
68+
println!("FIRST POLL - START OPERATION");
69+
self.write_request();
70+
// CHANGED
71+
let stream = (&mut self).stream.as_mut().unwrap();
72+
runtime::reactor().register(stream, Interest::READABLE, id);
73+
runtime::reactor().set_waker(waker, self.id);
74+
// ============
75+
}
76+
77+
let mut buff = vec![0u8; 147];
78+
loop {
79+
match self.stream.as_mut().unwrap().read(&mut buff) {
80+
Ok(0) => {
81+
let s = String::from_utf8_lossy(&self.buffer).to_string();
82+
runtime::reactor().deregister(self.stream.as_mut().unwrap(), id);
83+
break Poll::Ready(s.to_string());
84+
}
85+
Ok(n) => {
86+
self.buffer.extend(&buff[0..n]);
87+
continue;
88+
}
89+
Err(e) if e.kind() == ErrorKind::WouldBlock => {
90+
// always store the last given Waker
91+
runtime::reactor().set_waker(waker, self.id);
92+
break Poll::Pending;
93+
}
94+
95+
Err(e) => panic!("{e:?}"),
96+
}
97+
}
98+
}
99+
}

ch11/a-async-await/src/main.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
mod future;
2+
mod http;
3+
mod runtime;
4+
use crate::http::Http;
5+
6+
7+
fn main() {
8+
let mut executor = runtime::init();
9+
executor.block_on(async_main());
10+
}
11+
12+
use std::{fmt::Write, future::Future, marker::PhantomPinned, mem, pin::Pin, task::{Poll, Context}};
13+
// =================================
14+
// We rewrite this:
15+
// =================================
16+
17+
// coro fn async_main() {
18+
// let mut buffer = String::from("\nBUFFER:\n----\n");
19+
// let writer = &mut buffer;
20+
// println!("Program starting");
21+
// let txt = http::Http::get("/600/HelloAsyncAwait").wait;
22+
// writeln!(writer, "{txt}").unwrap();
23+
// let txt = http::Http::get("/400/HelloAsyncAwait").wait;
24+
// writeln!(writer, "{txt}").unwrap();
25+
//
26+
// println!("{}", buffer);
27+
// }
28+
29+
// =================================
30+
// Into this:
31+
// =================================
32+
33+
fn async_main() -> impl Future<Output = String> {
34+
Coroutine0::new()
35+
}
36+
37+
enum State0 {
38+
Start,
39+
Wait1(Pin<Box<dyn Future<Output = String>>>),
40+
Wait2(Pin<Box<dyn Future<Output = String>>>),
41+
Resolved,
42+
}
43+
44+
#[derive(Default)]
45+
struct Stack0 {
46+
buffer: Option<String>,
47+
writer: Option<*mut String>,
48+
}
49+
50+
struct Coroutine0 {
51+
stack: Stack0,
52+
state: State0,
53+
_pin: PhantomPinned,
54+
}
55+
56+
impl Coroutine0 {
57+
fn new() -> Self {
58+
Self {
59+
state: State0::Start,
60+
stack: Stack0::default(),
61+
_pin: PhantomPinned,
62+
}
63+
}
64+
}
65+
66+
impl Future for Coroutine0 {
67+
type Output = String;
68+
69+
fn poll(self: Pin<&mut Self>, waker: &mut Context) -> Poll<Self::Output> {
70+
let this = unsafe { self.get_unchecked_mut() };
71+
loop {
72+
match this.state {
73+
State0::Start => {
74+
// initialize stack (hoist declarations - no stack yet)
75+
76+
this.stack.buffer = Some(String::from("\nBUFFER:\n----\n"));
77+
this.stack.writer = Some(this.stack.buffer.as_mut().unwrap());
78+
// ---- Code you actually wrote ----
79+
println!("Program starting");
80+
81+
// ---------------------------------
82+
let fut1 = Box::pin(http::Http::get("/600/HelloAsyncAwait"));
83+
this.state = State0::Wait1(fut1);
84+
85+
// save stack
86+
// nothing to save
87+
}
88+
89+
State0::Wait1(ref mut f1) => {
90+
match f1.as_mut().poll(waker) {
91+
Poll::Ready(txt) => {
92+
// Restore stack
93+
let writer = unsafe { &mut *this.stack.writer.unwrap() };
94+
95+
// ---- Code you actually wrote ----
96+
writeln!(writer, "{txt}").unwrap();
97+
// ---------------------------------
98+
let fut2 = Box::pin(http::Http::get("/400/HelloAsyncAwait"));
99+
this.state = State0::Wait2(fut2);
100+
101+
// save stack
102+
this.stack.writer = Some(writer);
103+
}
104+
Poll::Pending => break Poll::Pending,
105+
}
106+
}
107+
108+
State0::Wait2(ref mut f2) => {
109+
match f2.as_mut().poll(waker) {
110+
Poll::Ready(txt) => {
111+
// Restore stack
112+
let buffer = this.stack.buffer.as_ref().take().unwrap();
113+
let writer = unsafe { &mut *this.stack.writer.unwrap() };
114+
115+
// ---- Code you actually wrote ----
116+
writeln!(writer, "{txt}").unwrap();
117+
118+
println!("{}", buffer);
119+
// ---------------------------------
120+
this.state = State0::Resolved;
121+
122+
// Save stack (all variables set to None already)
123+
124+
break Poll::Ready(String::new());
125+
}
126+
Poll::Pending => break Poll::Pending,
127+
}
128+
}
129+
130+
State0::Resolved => panic!("Polled a resolved future"),
131+
}
132+
}
133+
}
134+
}

ch11/a-async-await/src/runtime.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pub use executor::{spawn, Executor};
2+
pub use reactor::reactor;
3+
4+
mod executor;
5+
mod reactor;
6+
7+
pub fn init() -> Executor {
8+
reactor::start();
9+
Executor::new()
10+
}

0 commit comments

Comments
 (0)