引言
在前两篇文章中,我们分别介绍了 Rust 异步编程的基石 - Future 和任务(Task)。
本篇文章我们将进一步探索异步 IO,这也是异步编程最常见的应用场景。
想象一下,当你在餐厅点餐时:
- 同步模式就像是你点完 餐后一直在座位上等待,直到服务员把餐品送到
- 异步模式则是点完餐后你可以刷手机、看书,等餐品好了服务员会来通知你
这就是异步 IO 的核心思想 - 在等待 IO 操作完成时,我们的程序可以去做其他事情,而不是傻等着。
目录
从同步到异步
传统的同步服务器
让我们先看一个最基础的 TCP 服务器:
fn main() -> io::Result<()> {
let listener = TcpListener::bind("0.0.0.0:8000")?;
let mut n = 1;
loop {
let (mut socket, _) = listener.accept()?;
socket.write_all(format!("start {}\n", n).as_bytes())?;
thread::sleep(Duration::from_secs(1));
socket.write_all(format!("end {}\n", n).as_bytes())?;
n += 1;
}
}
这个服务器做了什么?
- 监听 8000 端口
- 接受连接后发送一个开始消息
- 等待 1 秒
- 发送结束消息
- 继续等待下一个连接
看起来很简单对吧?但这里有个严重的问题 - 它一次只能处理一个连接。 当一个客户端连接时,其他客户端必须等待,这就是同步 IO 的局限。
多线程方案
一个直观的改进是使用多线程:
fn main() -> io::Result<()> {
let listener = TcpListener::bind("0.0.0.0:8000")?;
let mut n = 1;
loop {
let (socket, _) = listener.accept()?;
let n = n;
thread::spawn(move || {
handle_connection(socket, n)
});
n += 1;
}
}
这样每个连接都有自己的线程,可以并发处理多个连接。
但线程也有问题:
- 创建和切换线程的开销较大
- 每个线程都占用一定内存
- 线程数量受系统限制
这就是为什么我们需要异步 IO。
非阻塞 IO 基础
在实现异步 IO 之前,我们需要理解几个重要概念:
阻塞与非阻塞
想象你去取快递:
- 阻塞模式:你 在快递点一直等到快递到达
- 非阻塞模式:你留下电话,快递到了给你打电话
在代码中:
// 阻塞模式
let data = socket.read()?; // 会一直等待直到有数据
// 非阻塞模式
socket.set_nonblocking(true)?;
match socket.read() {
Ok(data) => { /* 处理数据 */ }
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 现在没有数据,稍后再试
}
Err(e) => return Err(e)
}
Ready 与 Pending
在异步编程中,我们用 Ready
和 Pending
两个状态来表示操作是否完成:
enum Poll<T> {
Ready(T), // 操作完成,这里是结果
Pending, // 操作未完成,需要等待
}
实现异步 IO
现在我们来实现真正的异步 IO。
我们将分三步:
1. 异步 Accept
首先实现异步接受连接:
async fn accept(listener: &mut TcpListener) -> io::Result<(TcpStream, SocketAddr)> {
poll_fn(|cx| {
match listener.accept() {
Ok((stream, addr)) => {
stream.set_nonblocking(true)?;
Poll::Ready(Ok((stream, addr)))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 注册唤醒器
register_waker(cx.waker(), listener);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}).await
}
这里的关键点是:
- 使用
poll_fn
创建Future
- 处理
WouldBlock
错误 - 注册唤醒器
好的,我继续完成剩余部分:
2. 异步写入
接下来实现异步写入数据:
async fn write_all(buf: &[u8], stream: &mut TcpStream) -> io::Result<()> {
let mut remaining = buf;
poll_fn(|cx| {
while !remaining.is_empty() {
match stream.write(remaining) {
Ok(0) => {
return Poll::Ready(Err(
io::Error::new(io::ErrorKind::WriteZero, "写入零字节")
));
}
Ok(n) => {
remaining = &remaining[n..];
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 注册写就绪事件
register_write_interest(cx.waker(), stream);
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
Poll::Ready(Ok(()))
}).await
}
这个实现有几个重要特点:
- 使用循环确保所有数据都写入
- 处理写入零字节的错误情况
- 支持部分写入
3. 异步读取
最后是异步读取实现:
async fn read_to_string(stream: &mut TcpStream) -> io::Result<String> {
let mut buffer = String::new();
poll_fn(|cx| {
loop {
let mut chunk = [0u8; 1024];
match stream.read(&mut chunk) {
Ok(0) => return Poll::Ready(Ok(buffer)), // EOF
Ok(n) => {
if let Ok(s) = String::from_utf8_lossy(&chunk[..n]).into_owned() {
buffer.push_str(&s);
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 注册读就绪事件
register_read_interest(cx.waker(), stream);
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}).await
}
事件驱动机制
事件循环
异步 IO 的核心是事件循环(Event Loop):
struct EventLoop {
poll: Poll,
events: Events,
tasks: HashMap<Token, Task>,
}
impl EventLoop {
fn run(&mut self) {
loop {
// 1. 等待事件
self.poll.poll(&mut self.events, Some(Duration::from_secs(1)))?;
// 2. 处理就绪的事件
for event in &self.events {
if let Some(task) = self.tasks.get_mut(&event.token()) {
task.wake();
}
}
// 3. 执行可运行的任务
self.run_ready_tasks();
}
}
}
事件循环的工作流程:
- 等待 IO 事件
- 唤醒对应的任务
- 执行就绪的任务
- 重复以上步骤
注册事件监听
fn register_interest(
registry: &Registry,
stream: &TcpStream,
token: Token,
interests: Interest,
) -> io::Result<()> {
registry.register(
&mut SourceFd(&stream.as_raw_fd()),
token,
interests,
)
}
完整示例
让我们看一个完整的异步 echo 服务器示例:
async fn handle_connection(mut socket: TcpStream) {
let mut buffer = [0; 1024];
loop {
match read(&mut socket, &mut buffer).await {
Ok(0) => break, // 连接关闭
Ok(n) => {
// Echo 收到的数据
if let Err(e) = write_all(&mut socket, &buffer[..n]).await {
eprintln!("写入错误: {}", e);
break;
}
}
Err(e) => {
eprintln!("读取错误: {}", e);
break;
}
}
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器运行在 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("新连接: {}", addr);
tokio::spawn(async move {
handle_connection(socket).await
});
}
}
最佳实践
1. 合理的超时处理
async fn with_timeout<F, T>(future: F, duration: Duration) -> io::Result<T>
where
F: Future<Output = io::Result<T>>,
{
match tokio::time::timeout(duration, future).await {
Ok(result) => result,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"操作超时"
))
}
}
2. 错误处理
#[derive(Debug, Error)]
enum IoError {
#[error("连接断开: {0}")]
Disconnected(String),
#[error("超时: {0}")]
Timeout(String),
#[error("IO错误: {0}")]
Io(#[from] std::io::Error),
}
3. 资源管理
struct Connection {
stream: TcpStream,
timeout: Duration,
buffer_size: usize,
}
impl Connection {
async fn with_timeout<F, T>(&mut self, f: F) -> Result<T, IoError>
where
F: Future<Output = Result<T, IoError>>,
{
tokio::time::timeout(self.timeout, f)
.await
.map_err(|_| IoError::Timeout("操作超时".into()))?
}
}
4. 优雅关闭
async fn shutdown(listener: TcpListener, connections: Arc<Mutex<Vec<Connection>>>) {
// 1. 停止接受新连接
drop(listener);
// 2. 等待现有连接完成
let mut connections = connections.lock().await;
for conn in connections.drain(..) {
conn.shutdown().await;
}
}
性能优化
1. 使用缓冲区
struct BufferedStream {
inner: TcpStream,
read_buf: Vec<u8>,
write_buf: Vec<u8>,
}
impl BufferedStream {
fn with_capacity(stream: TcpStream, capacity: usize) -> Self {
Self {
inner: stream,
read_buf: Vec::with_capacity(capacity),
write_buf: Vec::with_capacity(capacity),
}
}
}
2. 批量处理
async fn batch_write<I>(stream: &mut TcpStream, items: I) -> io::Result<()>
where
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
let mut buffer = Vec::new();
// 收集数据
for item in items {
buffer.extend_from_slice(item.as_ref());
}
// 一次性写入
write_all(stream, &buffer).await
}
3. 连接池
struct ConnectionPool {
idle: Vec<Connection>,
max_size: usize,
}
impl ConnectionPool {
async fn get(&mut self) -> io::Result<Connection> {
if let Some(conn) = self.idle.pop() {
Ok(conn)
} else if self.idle.len() < self.max_size {
Connection::new().await
} else {
// 等待空闲连接
self.wait_for_idle().await
}
}
}