Skip to content

Commit 86d96a9

Browse files
committed
chapter 10, found a good solution!
1 parent d5aa3d6 commit 86d96a9

File tree

10 files changed

+377
-5
lines changed

10 files changed

+377
-5
lines changed

ch10/a-coroutines-improved/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "a-coroutines-improved"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
mio = { version = "0.8", features = ["net", "os-poll"] }

ch10/a-pin-problem/src/main.rs renamed to ch10/a-coroutines-improved/src/main.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ fn main() {
1818
// =================================
1919

2020
// coro fn async_main() {
21+
// let mut counter = 0;
2122
// println!("Program starting");
2223
// let txt = http::Http::get("/600/HelloAsyncAwait").wait;
2324
// println!("{txt}");
25+
// counter += 1;
2426
// let txt = http::Http::get("/400/HelloAsyncAwait").wait;
2527
// println!("{txt}");
26-
28+
// counter += 1;
29+
//
30+
// println!("Sent {} GET requests.", counter);
2731
// }
2832

2933
// =================================
@@ -41,13 +45,19 @@ enum State0 {
4145
Resolved,
4246
}
4347

48+
#[derive(Default)]
49+
struct Stack0 {
50+
counter: Option<usize>,
51+
}
52+
4453
struct Coroutine0 {
54+
stack: Stack0,
4555
state: State0,
4656
}
4757

4858
impl Coroutine0 {
4959
fn new() -> Self {
50-
Self { state: State0::Start }
60+
Self { state: State0::Start, stack: Stack0::default() }
5161
}
5262
}
5363

@@ -59,23 +69,35 @@ impl Future for Coroutine0 {
5969
loop {
6070
match self.state {
6171
State0::Start => {
72+
// restore stack (no stack yet)
73+
6274
// ---- Code you actually wrote ----
75+
let mut counter = 0;
6376
println!("Program starting");
6477

6578
// ---------------------------------
6679
let fut1 = Box::new( http::Http::get("/600/HelloAsyncAwait"));
6780
self.state = State0::Wait1(fut1);
81+
82+
// save stack
83+
self.stack.counter = Some(counter);
6884
}
6985

7086
State0::Wait1(ref mut f1) => {
7187
match f1.poll() {
7288
PollState::Ready(txt) => {
89+
// Restore stack
90+
let mut counter = self.stack.counter.take().unwrap();
91+
7392
// ---- Code you actually wrote ----
7493
println!("{txt}");
75-
94+
counter += 1;
7695
// ---------------------------------
7796
let fut2 = Box::new( http::Http::get("/400/HelloAsyncAwait"));
7897
self.state = State0::Wait2(fut2);
98+
99+
// save stack
100+
self.stack.counter = Some(counter);
79101
}
80102
PollState::NotReady => break PollState::NotReady,
81103
}
@@ -84,11 +106,19 @@ impl Future for Coroutine0 {
84106
State0::Wait2(ref mut f2) => {
85107
match f2.poll() {
86108
PollState::Ready(txt) => {
109+
// Restore stack
110+
let mut counter = self.stack.counter.take().unwrap();
111+
87112
// ---- Code you actually wrote ----
88113
println!("{txt}");
89-
114+
counter += 1;
115+
116+
println!("Sent {} GET requests.", counter);
90117
// ---------------------------------
91118
self.state = State0::Resolved;
119+
120+
// Save stack (all variables set to None already)
121+
92122
break PollState::Ready(String::new());
93123
}
94124
PollState::NotReady => break PollState::NotReady,

ch10/a-pin-problem/Cargo.toml renamed to ch10/b-coroutines-problem/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "a-pin-problem"
2+
name = "b-coroutines-problem"
33
version = "0.1.0"
44
edition = "2021"
55

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
pub trait Future {
2+
type Output;
3+
fn poll(&mut self) -> PollState<Self::Output>;
4+
}
5+
6+
pub enum PollState<T> {
7+
Ready(T),
8+
NotReady,
9+
}
10+
11+
pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
12+
let futures = futures.into_iter().map(|f| (false, f)).collect();
13+
JoinAll {
14+
futures,
15+
finished_count: 0,
16+
}
17+
}
18+
19+
pub struct JoinAll<F: Future> {
20+
futures: Vec<(bool, F)>,
21+
finished_count: usize,
22+
}
23+
24+
impl<F: Future> Future for JoinAll<F> {
25+
type Output = ();
26+
27+
fn poll(&mut self) -> PollState<Self::Output> {
28+
for (finished, fut) in self.futures.iter_mut() {
29+
if *finished {
30+
continue;
31+
}
32+
33+
match fut.poll() {
34+
PollState::Ready(_) => {
35+
*finished = true;
36+
self.finished_count += 1;
37+
}
38+
39+
PollState::NotReady => continue,
40+
}
41+
}
42+
43+
if self.finished_count == self.futures.len() {
44+
PollState::Ready(())
45+
} else {
46+
PollState::NotReady
47+
}
48+
}
49+
}

ch10/b-coroutines-problem/src/http.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use crate::{future::PollState, runtime, Future};
2+
use mio::{Interest, Token};
3+
use std::io::{ErrorKind, Read, Write};
4+
5+
fn get_req(path: &str) -> String {
6+
format!(
7+
"GET {path} HTTP/1.1\r\n\
8+
Host: localhost\r\n\
9+
Connection: close\r\n\
10+
\r\n"
11+
)
12+
}
13+
14+
pub struct Http;
15+
16+
impl Http {
17+
pub fn get(path: &str) -> impl Future<Output = String> {
18+
HttpGetFuture::new(path.to_string())
19+
}
20+
}
21+
22+
struct HttpGetFuture {
23+
stream: Option<mio::net::TcpStream>,
24+
buffer: Vec<u8>,
25+
path: String,
26+
}
27+
28+
impl HttpGetFuture {
29+
fn new(path: String) -> Self {
30+
Self {
31+
stream: None,
32+
buffer: vec![],
33+
path,
34+
}
35+
}
36+
37+
fn write_request(&mut self) {
38+
let stream = std::net::TcpStream::connect("127.0.0.1:8080").unwrap();
39+
stream.set_nonblocking(true).unwrap();
40+
let mut stream = mio::net::TcpStream::from_std(stream);
41+
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
42+
self.stream = Some(stream);
43+
}
44+
}
45+
46+
impl Future for HttpGetFuture {
47+
type Output = String;
48+
49+
fn poll(&mut self) -> PollState<Self::Output> {
50+
if self.stream.is_none() {
51+
println!("FIRST POLL - START OPERATION");
52+
self.write_request();
53+
54+
// CHANGED
55+
runtime::registry()
56+
.register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE)
57+
.unwrap();
58+
// ============
59+
}
60+
61+
let mut buff = vec![0u8; 4096];
62+
loop {
63+
match self.stream.as_mut().unwrap().read(&mut buff) {
64+
Ok(0) => {
65+
let s = String::from_utf8_lossy(&self.buffer);
66+
break PollState::Ready(s.to_string());
67+
}
68+
Ok(n) => {
69+
self.buffer.extend(&buff[0..n]);
70+
continue;
71+
}
72+
Err(e) if e.kind() == ErrorKind::WouldBlock => {
73+
break PollState::NotReady;
74+
}
75+
76+
Err(e) => panic!("{e:?}"),
77+
}
78+
}
79+
}
80+
}

ch10/b-coroutines-problem/src/main.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
mod future;
2+
mod http;
3+
mod runtime;
4+
5+
use future::{Future, PollState};
6+
use runtime::Runtime;
7+
8+
fn main() {
9+
let future = async_main();
10+
let mut runtime = Runtime::new();
11+
runtime.block_on(future);
12+
}
13+
14+
use std::fmt::Write;
15+
// =================================
16+
// We rewrite this:
17+
// =================================
18+
19+
// coro fn async_main() {
20+
// let mut buffer = String::new();
21+
// let writer = &mut buffer;
22+
// println!("Program starting");
23+
// let txt = http::Http::get("/600/HelloAsyncAwait").wait;
24+
// writeln!(writer, "{txt}").unwrap();
25+
// let txt = http::Http::get("/400/HelloAsyncAwait").wait;
26+
// writeln!(writer, "{txt}").unwrap();
27+
//
28+
// println!("{}", buffer);
29+
// }
30+
31+
// =================================
32+
// Into this:
33+
// =================================
34+
35+
fn async_main() -> impl Future<Output = String> {
36+
Coroutine0::new()
37+
}
38+
39+
enum State0 {
40+
Start,
41+
Wait1(Box<dyn Future<Output = String>>),
42+
Wait2(Box<dyn Future<Output = String>>),
43+
Resolved,
44+
}
45+
46+
#[derive(Default)]
47+
struct Stack0 {
48+
buffer: Option<String>,
49+
writer: Option<*mut String>,
50+
}
51+
52+
struct Coroutine0 {
53+
stack: Stack0,
54+
state: State0,
55+
}
56+
57+
impl Coroutine0 {
58+
fn new() -> Self {
59+
Self {
60+
state: State0::Start,
61+
stack: Stack0::default(),
62+
}
63+
}
64+
}
65+
66+
impl Future for Coroutine0 {
67+
type Output = String;
68+
69+
fn poll(&mut self) -> PollState<Self::Output> {
70+
loop {
71+
match self.state {
72+
State0::Start => {
73+
// initialize stack (hoist declarations - no stack yet)
74+
self.stack.buffer = Some(String::new());
75+
self.stack.writer = Some(self.stack.buffer.as_mut().unwrap());
76+
// ---- Code you actually wrote ----
77+
println!("Program starting");
78+
79+
// ---------------------------------
80+
let fut1 = Box::new(http::Http::get("/600/HelloAsyncAwait"));
81+
self.state = State0::Wait1(fut1);
82+
83+
// save stack
84+
// nothing to save
85+
}
86+
87+
State0::Wait1(ref mut f1) => {
88+
match f1.poll() {
89+
PollState::Ready(txt) => {
90+
// Restore stack
91+
let writer = unsafe { &mut *self.stack.writer.take().unwrap() };
92+
93+
// ---- Code you actually wrote ----
94+
writeln!(writer, "{txt}").unwrap();
95+
// ---------------------------------
96+
let fut2 = Box::new(http::Http::get("/400/HelloAsyncAwait"));
97+
self.state = State0::Wait2(fut2);
98+
99+
// save stack
100+
self.stack.writer = Some(writer);
101+
}
102+
PollState::NotReady => break PollState::NotReady,
103+
}
104+
}
105+
106+
State0::Wait2(ref mut f2) => {
107+
match f2.poll() {
108+
PollState::Ready(txt) => {
109+
// Restore stack
110+
let buffer = self.stack.buffer.as_ref().take().unwrap();
111+
let writer = unsafe { &mut *self.stack.writer.take().unwrap() };
112+
113+
// ---- Code you actually wrote ----
114+
writeln!(writer, "{txt}").unwrap();
115+
116+
println!("{}", buffer);
117+
// ---------------------------------
118+
self.state = State0::Resolved;
119+
120+
// Save stack (all variables set to None already)
121+
122+
break PollState::Ready(String::new());
123+
}
124+
PollState::NotReady => break PollState::NotReady,
125+
}
126+
}
127+
128+
State0::Resolved => panic!("Polled a resolved future"),
129+
}
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)