目录
- 引言
- Futures概述
- 什么是Futures
- Futures的基本组成
- 代码示例解析
- foo函数的异步实现
- foo函数的同步实现
- JoinAll的实现
- 自定义Sleep实现
- 唤醒机制(Wake)
- 主函数的实现
- Pin、取消与递归
- 总结
引言
use futures::future;
use std::time::Duration;
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
#[tokio::main]
async fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let joined_future = future::join_all(futures);
joined_future.await;
}
在引言部分,我们展示了一个异步Rust的示例代码,但并未解释其内部工作原理。
这留下了几个疑问:什么是异步函数及其返回的“futures”?
join_all
函数的作用是什么? tokio::time::sleep
与std::thread::sleep
有何不同?
为了回答这些问题,我们将把这些异步组件转换为普通的、非异步的Rust代码。
我们会发现,复制foo
和join_all
并不困难,但编写自定义的sleep
函数则更为复杂。
让我们开始吧。
Futures概述
什么是Futures
在Rust的异步编程中,Future是一个核心概念。
一个Future代表了一个可能尚未完成的计算,类似于一个占位符,未来某个时刻会产生一个结果。
通过async
和await
语法,Rust允许我们以同步的方式编写异步代码,极大地简化了异步编程的复杂性。
Futures的基本组成
一个Future主要由以下几个部分组成:
Pin
:一种指针包装器,用于确保内存中某个位置的数据不会被移动。对于某些需要**自引用的Future
**来说,Pin
是必需的。Context
:上下文信息,包含一个Waker
Waker
:用于在Future
需要被重新调度时唤醒它。Poll
:Future
的poll
方 法返回一个Poll
枚举,指示Future
是已完成(Ready
)还是尚未完成(Pending
)。
下面我们通过具体的代码示例来深入理解这些概念。
代码示例解析
foo
函数的异步实现
首先,我们来看一个异步函数foo
的示例:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
这个函数做了以下几件事:
- 打印开始信息。
- 异步等待1秒钟。
- 打印结束信息。
通过async
关键字,这个函数返回一个Future
,而不是立即执行。
调用者可以选择等待这个`Future`完成。
foo
函数的同步实现
为了更好地理解异步函数的工作原理,我们将foo
函数转换为一个同步的、非异步的版本:
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
fn foo(n: u64) -> Foo {
let started = false;
let duration = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(duration));
Foo { n, started, sleep }
}
struct Foo {
n: u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
解析:
- 函数定义:
foo
函数现在返回一个Foo
结构体,而不是一个Future
。Foo
结构体包含:- 一个计数器
n
。 - 一个标志
started
,用于跟踪是否已经开始执行。 - 一个被
Pin
包装的sleep
future。
- 一个计数器
Future
实现:
- Foo实现了
Future
trait。 - 在
poll
方法中:- 如果尚未开始,打印开始信息并设置
started
为true
。 - 调用
sleep
的poll
方法。- 如果
sleep
还未完成,返回Poll::Pending
。 - 如果
sleep
完成,打印结束信息并返回Poll::Ready(())
。
- 如果
- 如果尚未开始,打印开始信息并设置
通过这种方式,我们手动实现了一个简单的Future
,它模拟了异步函数的行为。
JoinAll的实现
接下来,我们来看join_all
函数的实现。join_all
用于等待一组Future
全部完成。
异步实现
在异步代码中,使用join_all
如下:
async fn main() {
let futures = vec![foo(1), foo(2), foo(3)];
futures::future::join_all(futures).await;
}
同步实现
我们将join_all
转换为同步的、非异步的版本:
fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
JoinAll {
futures: futures.into_iter().map(Box::pin).collect(),
}
}
struct JoinAll<F> {
futures: Vec<Pin<Box<F>>>,
}
impl<F: Future> Future for JoinAll<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
let is_pending = |future: &mut Pin<Box<F>>| {
future.as_mut().poll(context).is_pending()
};
self.futures.retain_mut(is_pending);
if self.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
解析:
- 函数定义:
join_all
函数接收一个Future
的向量,并返回一个JoinAll
结构体。
- 结构体定义:
JoinAll
结构体包含一个Future
的Vec
,每个Future
被Box::pin
包装,以确保它们在内存中的位置固定。
- Future实现:
- 在
poll
方法中:- 使用
retain_mut
方法保留所有尚未完成的Future
。 - 如果所有
Future
都完成了,返回Poll::Ready(())
。 - 否则,返回
Poll::Pending
。
- 使用
- 在
通过这种方式,我们手动实现了一个能够等待多个Future
完成的Future
。
自定义Sleep实现
现在,让我们尝试实现自己的sleep
函数。我们希望它能够异步地等待指定的时间。
异步实现
在异步代码中,使用sleep
如下:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
同步实现
我们将sleep
函数转换为同步的、非异步的版本:
fn sleep(duration: Duration) -> Sleep {
let wake_time = Instant::now() + duration;
Sleep { wake_time }
}
struct Sleep {
wake_time: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
问题:
尽管代码逻辑看起来正确,运行时发现sleep
函数无法正确唤醒,导致程序挂起。
这是因为Future::poll
方法在返回Poll::Pending
时需要安排一次唤醒,而当前的实现并未完成这一点。
唤醒机制(Wake)
为了让sleep
函数能够在指定时间后正确唤醒,我们需要实现唤醒机制。这涉及到Context
和Waker
的使用。