Skip to content

Commit ef0d5d2

Browse files
committed
got something working in ch10
1 parent 86d96a9 commit ef0d5d2

File tree

32 files changed

+1535
-180
lines changed

32 files changed

+1535
-180
lines changed

ch10/a-coroutines-improved/src/runtime.rs

Lines changed: 0 additions & 40 deletions
This file was deleted.

ch10/a-coroutines-improved/Cargo.toml renamed to ch10/a-coroutines-variables/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "a-coroutines-improved"
2+
name = "a-coroutines-variables"
33
version = "0.1.0"
44
edition = "2021"
55

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
mod future;
2+
mod http;
3+
mod runtime;
4+
use crate::http::Http;
5+
use future::{Future, PollState};
6+
use runtime::Waker;
7+
8+
fn main() {
9+
let mut executor = runtime::init();
10+
executor.block_on(async_main());
11+
}
12+
13+
coroutine fn async_main() {
14+
println!("Program starting");
15+
let txt = Http::get("/600/HelloAsyncAwait").wait;
16+
println!("{txt}");
17+
let txt = Http::get("/400/HelloAsyncAwait").wait;
18+
println!("{txt}");
19+
}

ch10/b-coroutines-problem/src/future.rs renamed to ch10/a-coroutines-variables/src/future.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1+
// NEW
2+
use std::{thread::Thread, sync::Arc};
3+
4+
use crate::runtime::Waker;
5+
// END NEW
6+
7+
18
pub trait Future {
29
type Output;
3-
fn poll(&mut self) -> PollState<Self::Output>;
10+
///////////////////////// NEW
11+
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
412
}
513

614
pub enum PollState<T> {
@@ -22,15 +30,15 @@ pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
2230
}
2331

2432
impl<F: Future> Future for JoinAll<F> {
25-
type Output = ();
26-
27-
fn poll(&mut self) -> PollState<Self::Output> {
33+
type Output = String;
34+
////////////////////////// HERE
35+
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
2836
for (finished, fut) in self.futures.iter_mut() {
2937
if *finished {
3038
continue;
3139
}
3240

33-
match fut.poll() {
41+
match fut.poll(waker) {
3442
PollState::Ready(_) => {
3543
*finished = true;
3644
self.finished_count += 1;
@@ -41,7 +49,7 @@ pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
4149
}
4250

4351
if self.finished_count == self.futures.len() {
44-
PollState::Ready(())
52+
PollState::Ready(String::new())
4553
} else {
4654
PollState::NotReady
4755
}

ch10/b-coroutines-problem/src/http.rs renamed to ch10/a-coroutines-variables/src/http.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1-
use crate::{future::PollState, runtime, Future};
2-
use mio::{Interest, Token};
31
use std::io::{ErrorKind, Read, Write};
42

3+
use mio::{Interest, Registry, Token};
4+
5+
use crate::{
6+
future::PollState,
7+
runtime::{self, reactor, Waker},
8+
Future,
9+
};
10+
511
fn get_req(path: &str) -> String {
612
format!(
713
"GET {path} HTTP/1.1\r\n\
@@ -18,19 +24,21 @@ impl Http {
1824
HttpGetFuture::new(path.to_string())
1925
}
2026
}
21-
2227
struct HttpGetFuture {
2328
stream: Option<mio::net::TcpStream>,
2429
buffer: Vec<u8>,
2530
path: String,
31+
id: usize,
2632
}
2733

2834
impl HttpGetFuture {
2935
fn new(path: String) -> Self {
36+
let id = reactor().next_id();
3037
Self {
3138
stream: None,
3239
buffer: vec![],
3340
path,
41+
id,
3442
}
3543
}
3644

@@ -46,30 +54,35 @@ impl HttpGetFuture {
4654
impl Future for HttpGetFuture {
4755
type Output = String;
4856

49-
fn poll(&mut self) -> PollState<Self::Output> {
57+
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
58+
// If this is first time polled, start the operation
59+
// see: https://users.rust-lang.org/t/is-it-bad-behaviour-for-a-future-or-stream-to-do-something-before-being-polled/61353
60+
// Avoid dns lookup this time
5061
if self.stream.is_none() {
5162
println!("FIRST POLL - START OPERATION");
5263
self.write_request();
53-
5464
// CHANGED
55-
runtime::registry()
56-
.register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE)
57-
.unwrap();
65+
let stream = self.stream.as_mut().unwrap();
66+
runtime::reactor().register(stream, Interest::READABLE, self.id);
67+
runtime::reactor().set_waker(waker, self.id);
5868
// ============
5969
}
6070

61-
let mut buff = vec![0u8; 4096];
71+
let mut buff = vec![0u8; 147];
6272
loop {
6373
match self.stream.as_mut().unwrap().read(&mut buff) {
6474
Ok(0) => {
6575
let s = String::from_utf8_lossy(&self.buffer);
76+
runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
6677
break PollState::Ready(s.to_string());
6778
}
6879
Ok(n) => {
6980
self.buffer.extend(&buff[0..n]);
7081
continue;
7182
}
7283
Err(e) if e.kind() == ErrorKind::WouldBlock => {
84+
// always store the last given Waker
85+
runtime::reactor().set_waker(waker, self.id);
7386
break PollState::NotReady;
7487
}
7588

ch10/a-coroutines-improved/src/main.rs renamed to ch10/a-coroutines-variables/src/main.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,30 @@
11
mod future;
22
mod http;
33
mod runtime;
4-
4+
use crate::http::Http;
55
use future::{Future, PollState};
6-
use runtime::Runtime;
6+
use runtime::Waker;
77

88
fn main() {
9-
let future = async_main();
10-
let mut runtime = Runtime::new();
11-
runtime.block_on(future);
9+
let mut executor = runtime::init();
10+
executor.block_on(async_main());
1211
}
1312

14-
15-
1613
// =================================
1714
// We rewrite this:
1815
// =================================
1916

20-
// coro fn async_main() {
21-
// let mut counter = 0;
17+
// coroutine fn async_main() {
18+
// let mut counter = 0;
2219
// println!("Program starting");
2320
// let txt = http::Http::get("/600/HelloAsyncAwait").wait;
2421
// println!("{txt}");
2522
// counter += 1;
2623
// let txt = http::Http::get("/400/HelloAsyncAwait").wait;
2724
// println!("{txt}");
2825
// counter += 1;
29-
//
30-
// println!("Sent {} GET requests.", counter);
26+
27+
// println!("Received {} responses.", counter);
3128
// }
3229

3330
// =================================
@@ -65,37 +62,36 @@ impl Coroutine0 {
6562
impl Future for Coroutine0 {
6663
type Output = String;
6764

68-
fn poll(&mut self) -> PollState<Self::Output> {
65+
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
6966
loop {
7067
match self.state {
7168
State0::Start => {
72-
// restore stack (no stack yet)
73-
69+
// initialize stack (hoist variables)
70+
self.stack.counter = Some(0);
7471
// ---- Code you actually wrote ----
75-
let mut counter = 0;
7672
println!("Program starting");
7773

7874
// ---------------------------------
7975
let fut1 = Box::new( http::Http::get("/600/HelloAsyncAwait"));
8076
self.state = State0::Wait1(fut1);
81-
77+
8278
// save stack
83-
self.stack.counter = Some(counter);
79+
8480
}
8581

8682
State0::Wait1(ref mut f1) => {
87-
match f1.poll() {
83+
match f1.poll(waker) {
8884
PollState::Ready(txt) => {
8985
// Restore stack
9086
let mut counter = self.stack.counter.take().unwrap();
91-
87+
9288
// ---- Code you actually wrote ----
9389
println!("{txt}");
9490
counter += 1;
9591
// ---------------------------------
9692
let fut2 = Box::new( http::Http::get("/400/HelloAsyncAwait"));
9793
self.state = State0::Wait2(fut2);
98-
94+
9995
// save stack
10096
self.stack.counter = Some(counter);
10197
}
@@ -104,21 +100,21 @@ impl Future for Coroutine0 {
104100
}
105101

106102
State0::Wait2(ref mut f2) => {
107-
match f2.poll() {
103+
match f2.poll(waker) {
108104
PollState::Ready(txt) => {
109105
// Restore stack
110106
let mut counter = self.stack.counter.take().unwrap();
111-
107+
112108
// ---- Code you actually wrote ----
113109
println!("{txt}");
114110
counter += 1;
115-
116-
println!("Sent {} GET requests.", counter);
111+
112+
println!("Received {} responses.", counter);
117113
// ---------------------------------
118114
self.state = State0::Resolved;
119-
115+
120116
// Save stack (all variables set to None already)
121-
117+
122118
break PollState::Ready(String::new());
123119
}
124120
PollState::NotReady => break PollState::NotReady,
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pub use executor::{spawn, Waker, Executor};
2+
pub use reactor::reactor;
3+
4+
mod executor;
5+
mod reactor;
6+
7+
pub fn init() -> Executor {
8+
reactor::start();
9+
Executor::new()
10+
}

0 commit comments

Comments
 (0)