
目录
- 引言
- Futures概述
- 什么是Futures
- Futures的 基本组成
- 代码示例解析
- foo函数的异步实现
- foo函数的同步实现
- JoinAll的实现
- 自定义Sleep实现
- 唤醒机制(Wake)
- 主函数的实现
- Pin、取消与递归
- 总结
引言
use futures::future;
use std::time::Duration;
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
#[tokio::main]
async fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let joined_future = future::join_all(futures);
joined_future.await;
}
在引言部分,我们展示了一个异步Rust的示例代码,但并未解释其内部工作原理。
这留下了几个疑问:什么是异步函数及其返回的“futures”?
join_all函数的作用是什么? tokio::time::sleep与std::thread::sleep有何不同?
为了回答这些问题,我们将把这些异步组件转换为普通的、非异步的Rust代码。
我们会发现,复制foo和join_all并不困难,但编写自定义的sleep函数则更为复杂。
让我们开始吧。
Futures概述
什么是Futures
在Rust的异步编程中,Future是一个核心概念。
一个Future代表了一个可能尚未完成的计算,类似于一个占位符,未来某个时刻会产生一个结果。
通过async和await语法,Rust允许我们以同步的方式编写异步代码,极大地简化了异步编程的复杂性。
Futures的基本组成
一个Future主要由以下几个部分组成:
Pin:一种指针包装器,用于确保内存中某个位置的数据不会被移动。对于某些需要**自引用的Future**来说,Pin是必需的。Context:上下文信息,包含一个WakerWaker:用于在Future需要被重新调度时唤醒它。Poll:Future的poll方法返回一个Poll枚举,指示Future是已完成(Ready)还是尚未完成(Pending)。
下面我们通过具体的代码示例来深入理解这些概念。
代码示例解析
foo函数的异步实现
首先,我们来看一个异步函数foo的示例:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
这个函数做了以下几件事:
- 打印开始信息。
- 异步等待1秒钟。
- 打印结束信息。
通过async关键字,这个函数返回一个Future,而不是立即执行。
调用者可以选择等待这个`Future`完成。
foo函数的同步实现
为了更好地理解异步函数的工作原理,我们将foo函数转换为一个同步的、非异步的版本:
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
fn foo(n: u64) -> Foo {
let started = false;
let duration = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(duration));
Foo { n, started, sleep }
}
struct Foo {
n: u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
解析:
- 函数定义:
foo函数现在返回一个Foo结构体,而不是一个Future。Foo结构体包含:- 一个计数器
n。 - 一个标志
started,用于跟踪是否已经开始执行。 - 一个被
Pin包装的sleepfuture。
- 一个计数器
Future实现:
- Foo实现了
Futuretrait。 - 在
poll方法中:- 如果尚未开始,打印开始信息并设置
started为true。 - 调用
sleep的poll方法。- 如果
sleep还未完成,返回Poll::Pending。 - 如果
sleep完成,打印结束信息并返回Poll::Ready(())。
- 如果
- 如果尚未开始,打印开始信息并设置
通过这种方式,我们手动实现了一个简单的Future,它模拟了异步函数的行为。
JoinAll的实现
接下来,我们来看join_all函数的实现。join_all用于等待一组Future全部完成。
异步实现
在异步代码中,使用join_all如下:
async fn main() {
let futures = vec![foo(1), foo(2), foo(3)];
futures::future::join_all(futures).await;
}
同步实现
我们将join_all转换为同步的、 非异步的版本:
fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
JoinAll {
futures: futures.into_iter().map(Box::pin).collect(),
}
}
struct JoinAll<F> {
futures: Vec<Pin<Box<F>>>,
}
impl<F: Future> Future for JoinAll<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
let is_pending = |future: &mut Pin<Box<F>>| {
future.as_mut().poll(context).is_pending()
};
self.futures.retain_mut(is_pending);
if self.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
解析:
- 函数定义:
join_all函数接收一个Future的向量,并返回一个JoinAll结构体。
- 结构体定义:
JoinAll结构体包含一个Future的Vec,每个Future被Box::pin包装,以确保它们在内存中的位置固定。
- Future实现:
- 在
poll方法中:- 使用
retain_mut方法保留所有尚未完成的Future。 - 如果所有
Future都完成了,返回Poll::Ready(())。 - 否则,返回
Poll::Pending。
- 使用
- 在
通过这种方式,我们手动实现了一个能够等待多个Future完成的Future。
自定义Sleep实现
现在,让我们尝试实现自己的sleep函数。我们希望它能够异步地等待指定的时间。
异步实现
在异步代码中,使用sleep如下:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
同步实现
我们将sleep函数转换为同步的、非异步的版本:
fn sleep(duration: Duration) -> Sleep {
let wake_time = Instant::now() + duration;
Sleep { wake_time }
}
struct Sleep {
wake_time: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
问题:
尽管代码逻辑看起来正确,运行时发现sleep函数无法正确唤醒,导致程序挂起。
这是因为Future::poll方法在返回Poll::Pending时需要安排一次唤醒,而当前的实现并未完成这一点。
唤醒机制(Wake)
为了让sleep函数能够在指定时间后正确唤醒,我们需要实现唤醒机制。这涉及到Context和Waker的使用。
理解Context和Waker
Context:包含一个Waker,用于在Future需要被重新调度时唤醒它。Waker:用于通知执行器,Future已经准备好被再次poll。
修改Sleep实现
我们需要在sleep函数的poll方法中安排唤醒:
use std::sync::{Mutex, Arc};
use std::collections::BTreeMap;
use std::task::Waker;
static WAKE_TIMES: Mutex<BTreeMap<Instant, Vec<Waker>>> =
Mutex::new(BTreeMap::new());
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
let mut wake_times = WAKE_TIMES.lock().unwrap();
let wakers_vec = wake_times.entry(self.wake_time).or_default();
wakers_vec.push(context.waker().clone());
Poll::Pending
}
}
}
解析:
- 全局唤醒时间表:
- 使用
BTreeMap按时间排序存储唤醒时间和对应的Waker。
- 使用
- 在
poll中注册Waker:- 如果当前时间未到唤醒时间,将
Waker添加到WAKE_TIMES中对应的时间点。
- 如果当前时间未到唤醒时间,将
- 主循环的实 现:
- 主函数会监视
WAKE_TIMES,并在到达唤醒时间时调用相应的Waker,从而重新poll相关的Future。
- 主函数会监视
主函数的实现
接下来,我们实现一个主函数,负责调度和执行所有的Future:
fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let mut joined_future = Box::pin(join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
// 获 取下一个唤醒时间
let mut wake_times = WAKE_TIMES.lock().unwrap();
let next_wake = wake_times.keys().next().expect("sleep forever?");
thread::sleep(next_wake.saturating_duration_since(Instant::now()));
// 唤醒所有到期的Waker
while let Some(entry) = wake_times.first_entry() {
if *entry.key() <= Instant::now() {
entry.remove().into_iter().for_each(Waker::wake);
} else {
break;
}
}
}
}
解析:
- 初始化
Future:- 创建多个foo函数的Future实例,并将它们传递给
join_all函数,得到一个聚合的Future。
- 创建多个foo函数的Future实例,并将它们传递给
- 创建
Context:
- 使用
noop_waker创建一个空的Waker,并构建Context。
- 轮询
Future:
- 在循环中不断
poll聚合的Future。 - 获取下一个唤醒时间,并让主线程休眠到该时间。
- 唤醒所有到期的
Waker,以重新调度相应的Future。
结果:
这样,我们实现了一个简单的异步运行时,能够正确地调度和执行多个Future,并解决了之前的“忙循环”问题。
Pin、Cancellation与Recursion
Pin
Pin是Rust中的一个关键类型,用于确保某个数据在内存中的位置固定,防止其被移动。
对于**自引用的Future**来说,Pin是必需的。
示例:
struct Foo {
n: u64,
n_ref: &u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
在这种情况下,Pin确保Foo结构体在内存中的位置固定,避免n_ref被移动。
取消(Cancellation)
- 异步函数具有取消的能力。当我们不再需要某个
Future时,可以通过不再poll它来取消它。 - tokio提供了
tokio::time::timeout等工具来实现超时取消。
示例:
struct Timeout<F> {
sleep: Pin<Box<tokio::time::Sleep>>,
inner: Pin<Box<F>>,
}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(
mut self: Pin<&mut Self>,
context: &mut Context,
) -> Poll<Self::Output> {
if let Poll::Ready(output) = self.inner.as_mut().poll(context) {
return Poll::Ready(Some(output));
}
if self.sleep.as_mut().poll(context).is_ready() {
return Poll::Ready(None);
}
Poll::Pending
}
}
fn timeout<F: Future>(duration: Duration, inner: F) -> Timeout<F> {
Timeout {
sleep: Box::pin(tokio::time::sleep(duration)),
inner: Box::pin(inner),
}
}
递归(Recursion)
- 异步函数不支持直接递归调用,因为这会导致无限大小的
Future - 解决方法是通过
Box::pin进行堆分配:
async fn factorial(n: u64) -> u64 {
if n == 0 {
1
} else {
let recurse = Box::pin(factorial(n - 1));
n * recurse.await
}
}
这样可以避免无限大小的问题,但需要堆分配,可能带来性能开销。
Rust中Future的生命周期流程分析
以下是对Rust中Future生命周期的流程中每个阶段的详细分析(如顶部图片所示):
- Future 创建阶段
步骤:
- 创建异步函数
- 编译器将异步函数转换为状态机
- 生成
Future实例
Rust中的异步函数(async fn)在编译时会被转换为一个状态机。
这种转换使得异步函数能够在执行过程中保存其状态,以便在未来的某个时间点继续执行。
编译器生成的状态机实现了Future trait,从而生成一个Future实例。
- 执行器处理阶段
步骤:
Future提交到执行器- 分配执行上下文
- 创建 Waker 对象
Future实例一旦创建,就需要被执行器(如tokio或async-std)管理。
执行器负责调度和驱动Future的执行。
执行器会为每个Future分配一个执行上下文(Context),并创建一个Waker对象,用于在Future需要被重新调度时唤醒它。
- Future 状态轮询阶段
步骤:
- 调用
poll方法 - 检查完成状态
- 未完成则注册
Waker - 返回
Pending状态
执行器通过调用Future的poll方法来推进其执行。poll方法会检查Future是否已经完成:
- 如果
Future已经完成,返回Poll::Ready。 - 如果
Future尚未完成,返回Poll::Pending,并 注册Waker,以便在未来某个时间点唤醒执行器重新调度该Future。
- 唤醒机制阶段
步骤:
- 等待外部事件
- 事件就绪时触发
- 调用
wake()方法 Future重新入队等待执行
当Future处于Poll::Pending状态时,它通常会等待某个外部事件(如I/O操作完成或定时器到期)。
一旦事件就绪,相关的Waker会被调用,通知执行器重新调度该Future进行下一步的poll操作。
- 完成阶段
步骤:
- 返回
Ready状态 - 获取执行结果
- 清理相关资源
当Future完成其任务后,poll方法会返回Poll::Ready,
执行器随后可以获取Future的执行结果,并进行必要的资源清理工作。
以下是对整个流程的总结:
- 创建阶段:
- 定义异步函数,编译器将其转换为状态机,生成
Future实例。
- 定义异步函数,编译器将其转换为状态机,生成
- 执行器处理阶段:
- 将
Future提交给执行器,分配执行上下文,创建Waker对象。
- 将
- 状态轮询阶段:
- 执行器调用
poll方法,检查Future是否完成,未完成则注册Waker并返回Pending。
- 执行器调用
- 唤醒机 制阶段:
- 外部事件就绪时,
Waker被调用,Future重新入队等待执行。
- 外部事件就绪时,
- 完成阶段:
Future返回Ready状态,执行器获取结果并清理资源。
但为了更全面地理解Future的生命周期,以下几点也值得注意:
- 执行器的具体实现:
- 不同的执行器(如
tokio、async-std)在具体的调度和任务管理上可能有所不同,但总体流程相似。
- 不同的执行器(如
- 多任务调度:
- 执行器通常会同时管理多个
Future,通过异步事件驱动机制高效地调度它们。
- 执行器通常会同时管理多个
- 错误处理:
- 在实际应用中,
Future可能会因为各种原因失败,执行器需要能够处理这些错误。
- 在实际应用中,
- 资源管理:
Future的生命周期结束后,相关的资源(如内存、文件句柄)需要被正确释放,以防止资源泄漏。
通过理解和掌握这些流程和细节,开发者可以更高效地编写和优化Rust中的异步代码,充分利用Rust在并发和异步编程中的强大优势。
总结
在Rust中,Future是异步编程的核心。
通过理解Future的工作原理、如何手动实现它们,以及如何构建一个简单的异步运行时,我们可以更深入地掌握Rust的异步机制。
虽然Rust的async/await语法极大地简化了异步编程,但了解底层机制对于编写高效、可靠的异步代码至关重要。
关键要点
Future的基本概念:Future代表一个可能尚未完成的计算,通过poll方法驱动其完成。- 手动实现
Future:通过实现Futuretrait,可以深入理解异步编程的内部机制。 - 唤醒机制:
Waker用于通知执行器,Future已经准备好被重新poll。 - 构建异步运行时:了解如何手动驱动
Future的执行,有助于理解像tokio这样的异步运行时的工作原理。 - 额外概念:
Pin、Cancellation与Recursion等高级概念,进一步增强对异步编程的掌握。
通过本文的学习,相信您对Rust的异步编程有了更深入的理解。
在接下来的篇章中,我们将继续探讨异步任务(Tasks)和异步IO(IO)的相关内容,进一步完善您的异步编程知识体系。