advance/async/future-excuting #811
Replies: 32 comments 35 replies
-
目前 Rust 缺少 Quartz 一样的大杀器,结合 Future 和 执行器可以搞一个,coding 中 |
Beta Was this translation helpful? Give feedback.
-
大佬帮忙过来看看,下面我对文中的这段代码的注释加了我的理解,请帮忙看看有没有问题: impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() { // 如果发送端还在,且ready_queue里没有任务,主线程就在此阻塞
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
// future是spawner.spawn的整个async块。第一次poll推动执行了块内await前的同步代码,即打印出"任务开始"
// 同步代码执行完TimerFuture::new后返回一个类型为TimerFuture的future,
// 之后的.await驱动执行TimerFuture的poll方法,顺便通过TimerFuture的poll方法把下面future.as_mut().poll(context)传入的context(包含了waker)接续传入TimerFuture内部
// 第一次对TimerFuture做poll返回了Poll::Pending,也就是在.await处返回Poll::Pending
// 那次轮执行外层task就在.await处保存好整个外层task的栈现场(task内.await后的代码不再执行),然后让出主线程的控制权.
// 主线程不可能被挂起,所以继续执行后续同步代码。
// 因为此轮while循环外层future的第一次poll返回了Poll::Pending,所以继续执行if块内部代码
if future.as_mut().poll(context).is_pending() {
// Future还没执行完,因此将它放回任务中,等待下次被poll
// 之所以再放回,是因为前面用了take()。但如果前面不用take(),这里也不用放回了,是否可行?
*future_slot = Some(future);
}
}
}
}
}
fn main() {
let (executor,spawner) = new_executor_and_spawner();
// 将外层task发送到 executor 和 spawner 所在的同步通道中
spawner.spawn(async {
println!("howdy!");
//等待TimerFuture睡醒后执行外层task的wake_by_ref将此外层task再次发送到此同步通道中
TimerFuture::new(Duration::from_secs(2)).await;
println!("done!");
});
drop(spawner); // 因为spawn里复制了一份发送端给外层task,所以spawner被删除后,上面建立的同步通道仍在,下面一句仍能正常执行
executor.run();
} |
Beta Was this translation helpful? Give feedback.
-
有一点不太明白, |
Beta Was this translation helpful? Give feedback.
-
将
改为
是不是更好理解 async .await,不然让他们在包裹了一层就很困惑, 这个拆解完语法糖怎么还需要语法糖 |
Beta Was this translation helpful? Give feedback.
-
我有一个问题,那如果我们自己写的 async 函数就是纯计算函数(不涉及系统 IO,极端点,一个死循环的 for,什么都不干),那是不是这个函数除非运行完,否则完全不能被调度(暂停)? |
Beta Was this translation helpful? Give feedback.
-
想问下各位大佬,在 Executor.run( ) 中 while let Ok(task) = self.ready_queue.recv() { 这个 task 在 while 的第一次循环后,ready_queue 就应该是空的了,那 task 不是应该就 drop 了吗,这个 task 怎么活到了 waker.wake() ? 我特地还把 TimerFuture::new( ) 中 创建新线程:
这段内容全删了,发现程序第二次运行到 Executor:: run ( ) 的 while let Ok(task) = self.ready_queue.recv() { ... } 的时候就 block 住了,而且发现 task 也没 drop ,这是为什么啊? 难道还有别的线程保存了 task ? |
Beta Was this translation helpful? Give feedback.
-
impl Executor { |
Beta Was this translation helpful? Give feedback.
-
PIN的理解还是有点困难,pin的目的说是可以让地址不发生变化,但是什么情况下一个对象的地址会发生变化呢?就像是vector那样重新分配了地址空间的那种? c/c++程序员一时没回过神来... |
Beta Was this translation helpful? Give feedback.
-
这章写得太好了。不仅学到了关于 rust 的异步的实现原理,还通过这个大致推测出 C++ asio 库的实现原理,以及为何其中任何一个异步函数都需要一个 io_context 作为参数传入。之前只会照猫画虎,现在算是彻底明白了,知其然,知其所以然。 |
Beta Was this translation helpful? Give feedback.
-
有跟我一样看懵的吗 |
Beta Was this translation helpful? Give feedback.
-
有一个问题,既然前面说 Future 是惰性的,只在 poll 后才会运行,那么这里的 TimerFuture 也不应该在 new的时候就生成 sleep的工作线程,而应该在 poll 的时候生成线程。🤔 |
Beta Was this translation helpful? Give feedback.
-
纠错: self.task_sender.send(task).expect("任务队列已满"); 仅当 |
Beta Was this translation helpful? Give feedback.
-
thread::spawn(move || { 关于这个TimerFuture请教一个问题,这里为什么在new方法中,要去启动一个线程? 我理解所有的异步任务是实现一个线程内调度多个Future来轮流执行,请问这么理解是否正确?如果这里启动一个线程,相当于一个线程内只有一个Future了,这个是不是反而性能更差了,这样的做法是不是还不如直接多线程? 当然可能这里只是一个例子而已,麻烦楼主回复前边的理解是否是准确的,谢谢! |
Beta Was this translation helpful? Give feedback.
-
spawner.spawn 这个方法, 获取 self 的所有权, main 函数中就不需要 drop 了 |
Beta Was this translation helpful? Give feedback.
-
看过 JYY 老师的视频, 再来看 rust 的线程调度, 别有一番感悟 |
Beta Was this translation helpful? Give feedback.
-
怎么感觉看完了之后,Poll的实现还是有一些黑盒?看起来 TimeFuture中 new函数中的那个中的函数 thread::spawn(move || {
println!("Start now; expect before executor");
// 执行休息时间
thread::sleep(duration);
let mut s = mv_shared_state.lock().unwrap();
s.completed = true;
if let Some(wake) = s.waker.take() {
println!("Wake here");
wake.wake()
}
}); 是在 impl Future for TimeFuture {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let mut state = self.share_state.lock().unwrap();
println!("Poll time {}", state.poll_times);
state.poll_times = state.poll_times + 1;
if state.completed {
Poll::Ready(())
} else {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
} 似乎也看不出为啥thread会在poll被调用的时候才执行? |
Beta Was this translation helpful? Give feedback.
-
放回了就能一直运行,不放回let的时候会failed,只能运行一次
|
Beta Was this translation helpful? Give feedback.
-
我想问一个问题 所以以上例子是基于线程的异步实现吗?那是不是其实就和 thread::spawn 后 用join handle的效果是一样的? |
Beta Was this translation helpful? Give feedback.
-
上面完整的代码可以在哪里找到 |
Beta Was this translation helpful? Give feedback.
-
看不懂,没有例子来说明很难看懂啊 |
Beta Was this translation helpful? Give feedback.
-
pub struct SocketRead<'a> { |
Beta Was this translation helpful? Give feedback.
-
例子一些要点:
|
Beta Was this translation helpful? Give feedback.
-
我有个细节的问题没理解到: 麻烦哪位老师解答一下rust实现这个逻辑的位置呢。 |
Beta Was this translation helpful? Give feedback.
-
用过 C++20 Coroutine 自己封装个 runtime 出来就能看懂这里了,原理都一模一样 |
Beta Was this translation helpful? Give feedback.
-
关于执行器的设计部分,是不是实际执行器中,当一个Task返回是Pengding时候,仍然会将Task移出队列,否则在之后Waker时候,又一次将Task发给队列,就重复发送了,而且执行器的关闭时机也不是判断队列是否为空,在示例代码中,当计时器计时完毕时候又wake进行发送,那么这里是不是将一个Task发给队列发了两遍,求大佬解答 |
Beta Was this translation helpful? Give feedback.
-
感覺要自己call wait() 一點都不方便。。。 |
Beta Was this translation helpful? Give feedback.
-
有个疑惑,关于 Executor 的实现,因为是使用消息通道机制,而之前消息那章说消息的 recv() 方法是会阻塞线程的,那这里 while let Ok(task) = self.ready_queue.recv() 的写法,当一个 future 第一次 poll 没有执行完,或者同时执行多个 future 的时候,不会因为当前线程被阻塞住而无法执行吗? |
Beta Was this translation helpful? Give feedback.
-
这里的 原文很多个逗号,其分隔语句的主语均不明确,且一直在变。 |
Beta Was this translation helpful? Give feedback.
-
Rust高度抽象的语法确实适合写异步调度 |
Beta Was this translation helpful? Give feedback.
-
我好笨啊,写了一遍源码,然后配合ai,不停地问,花了整整一天才搞懂整个流程。 |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
advance/async/future-excuting
https://course.rs/advance/async/future-excuting.html
Beta Was this translation helpful? Give feedback.
All reactions