Rust异步编程: 你需要知道的事
异步编程的魅力之处在于两点:
首先, 它能让你事半功倍. 你能够用单单一个操作系统层面的线程来处理任意数量的同步交互; 一个单线程异步服务器能够扩展处理规模到数百万的连接.
时下, 一些操作系统让你可以用 相对 较小的代价同时使用很大数量的操作系统线程, 但开销仍在那. 而这引出了异步编程的第二个好处: 通过使"任务"基本没有开销, 表达能力更强的编程模式能够被运用, 而这些模式在同步编程中难以实践.
简而言之, 异步程序的效率提升十分显著, 让我们解锁了强而有力的新编程风格.
那么, 有被忽略的问题吗?
线程是操作系统层面的一等公民, 但如果你想在同一个线程中同时处理不同活动, 那就要完全靠你自己实现. 幸运的是, Rust富有表达力, 我们能构建共享的, 零开销的抽象, 而这让"任务级"编程也成了一等概念
也就是说, Rust编程语言的同步编程与异步编程有着重要的区别--尤其在stable Rust里. 这本书的目的部分在于帮助指导你了解这些区别, 并教会你一系列高效的异步编程模式
最后, 要牢记除了高伸缩性服务器意外, 传统的同步编程也可以很高效. 特别的, Rust在内存跟踪与预测的优势意味着比起使用其他语言, 你可以在同步服务上走的更远. 考虑你的应用能否在更简单的同步模型中提供更好的服务, 和其他的架构决策一样重要.
Rust异步编程的现状及展望
有了futures
和tokio
包(crate), Rust为异步编程打下了坚实的基础. 这两个包分别涵盖了异步的核心概念, 以及异步I/O的基本元素. 在这些包之上, 形成了与各种协议和服务交互的生态, 例如HTTP, SSL, DNS, WebSockets. 这本书将覆盖这个生态的争夺内容, 推荐各个方面中生产级的库.
还有, 利用async/await
注解提高工效性的工作在进行当中. 这些工作只在nightly Rust当中进行, 并期望在未来提供更加灵活的借用支持. 不过, 如果你愿意使用nighlty Rust工作, 这个库本身是稳定并且有帮助工作的. 这本书也会涵盖这个库的用法.
短期内可以预见, 有很多简单的方式去联系同步及异步代码. 长远来看, 当异步变成这门语言的更加重要的部分时, 核心库可能会重写为异步版本, 但更加容易被同步或异步代码运用. 我们也会在这本书中看到示例.
目标读者群
这本书旨在作为完整, 及时更新的讲述Rust异步理念的指南, 适合新老手:
- 前面章节将简单介绍异步编程的概念, 以及Rust在完善这方面的行动
- 中间章节将提供更加强大的工具, 最佳实践, 以及更大型的示例, 以助你在使用异步时更加严谨
- 后面的章节涵盖更广泛的异步生态以及核心库的最先进的特性, 还有更深入的案例学习.
让我们深入了解吧!
A crash course
Show how to build a simple web app using Hyper and
- Hyper
Hello, world!
Serving files
Adding caching
Adding streaming
task-model/chapter.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
任务与执行器
为了用Rust高效地写异步代码, 你需要了解它的基础: 任务模型. 幸运的是, 该模块仅由 几个简单的模块组成.
本章将深层介绍任务概念, 然后阐释怎样通过构建能可工作的任务执行器和 事件循环, 以及整合它们来让系统运行起来.
背景: 同步 vs. 异步
通过对比同步编程来理解异步编程是最容易的方法, 所以我们先来看一个简单的同步示例:
# #![allow(unused_variables)] #fn main() { // reads 4096 bytes into `my_vec` socket.read_exact(&mut my_vec[..4096]); #}
这里的代码使用标准库的read_exact
来从socket
读取字节流. 让我们看看对应文档:
Read the exact number of bytes required to fill the given buffer. "读取恰好数量的字节, 并填充给定的缓存."
所以, 如果这个方法成功地返回, 我们能保证my_vec
已经被填充, 也就是我们从socket
读取了4k字节. 太棒了!
但如果数据暂时还没有准备好呢? 如果数据还没从这个套接字(socket)那边传过来呢?
为了满足填充好缓存的保证, read_exact
方法必须等待. 这也是术语"同步"的
来源: read_exact
是和所需数据的可用性同步的.
更准确的说, read_exact
阻塞了调用它的线程, 意味着该线程不能进一步执行, 直到
接收到需要的数据. 问题在于, 线程总体来说是一个太重量级的资源而不应该被浪费.
而且, 当一个线程被阻塞了, 它一直在做无用功; 所有的动作都发生在操作系统层面, 直到
数据可用, 并疏通了该线程.
放开来讲, 如果我们想要处理一堆连接, 而我们在用类似read_exact
那样会阻塞线程的
方法, 那我们就需要给每个连接分配单独一个线程; 否则, 连接的处理会被阻塞以等待
另外的连接的活动完成. 就算我们能够协调连接的活动时间的分配, 线程的开销仍然会限制
系统的可伸缩性.
异步
为了达到更好的可伸缩性, 我们需要避免在等待资源释放的时候线程会被阻塞. 绝大部分的 操作系统提供一个"非阻塞"(或者叫异步)模式来和像套接字的对象进行交互. 在这个 模式里, 不能马上就绪(ready)的操作会返回一个错误, 然后允许你在当前线程继续做其他 工作.
人工地通过这种方式和资源打交道是相当痛苦的. 你可以指出如何在单线程中处理这些 "正在进行中"的操作, 然而大多数情况下, 这些操作来自于完全独立的不同活动(例如两个 分离的连接).
幸运的是, Rust提供了一种实现异步编程的方法, 这种方法感觉上像在使用多线程, 但 底层则是异步访问资源, 并且自动地为你糅合正在进行中的操作. 这方法的核心的概念就是 任务, 你可以把它当做是能够自动映射到更少数量的操作系统线程的"轻量级线程" (类似于goroutines)
让我们来了解任务模型是怎样工作的吧!
task-model/tasks.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
通过任务(task)掌握异步编程
Rust 通过任务提供异步性, 任务是:
- 能够单独运行的工作碎块(也就是可以并发处理), 比较像是OS线程.
- 轻量级的, 这样就不需要一整个OS线程. 取而代之的是OS线程可以整合任意数量的独立 任务, 也就是我们常说的"M:N 线程", 或"用户空间线程"
关键的想法(idea)是, 如果一个任务可能阻塞线程以等待 外部事件发生, 那么它马上返还控制权给执行器线程, 执行器能够马上处理其他任务.
为了知道这些想法是如何实现的, 在这个章节的教程里, 我们会用futures
包来构建一个
玩具版的任务模型和执行器. 在本章的最后, 我们会将这些玩具与更加巧妙的抽象方式
联系起来, 并将其应用到真实的系统当中.
我们从定义一个简单的任务trait开始. 一个任务包含一些(很可能正在进行)的工作, 你
可以告诉任务尝试去调用poll
方法来完成工作:
# #![allow(unused_variables)] #fn main() { /// An independent, non-blocking computation trait ToyTask { /// Attempt to finish executing the task, returning `Async::Pending` /// if the task needs to wait for an event before it can complete. fn poll(&mut self, wake: &Waker) -> Async<()>; } #}
该任务会在那时尽可能地完成工作, 但是它也可以遇到了需要等待事件发生, 例如套接字中
可用的数据. 这时候任务不应该阻塞线程, 而是返回Async::Pending
:
# #![allow(unused_variables)] #fn main() { enum Async<T> { /// Work completed with a result of type `T`. Ready(T), /// Work was blocked, and the task is set to be woken when ready /// to continue. Pending, } #}
任务返回而不是阻塞给了底层线程去做其他有用的工作的机会(例如调用不同任务的
poll
方法). 但是我们怎么只要什么时候要再尝试调用原本任务的poll
方法呢?
如果你回过头看ToyTask::poll
方法, 你会发现有个参数waker
. 这个值是waker类型的
一个实例, 提供了唤醒任务的方法:
# #![allow(unused_variables)] #fn main() { #[derive(Clone)] struct Waker { .. } impl Waker { /// Signals that the associated task is ready to be `poll`ed again. pub fn wake(&self) { ... } } #}
所以**无论什么时候你执行了一个任务, 你同时也会给他一个用于以后唤醒该
任务的句柄 (handle).**如果这个任务因为他在等待套接字上的数据而无法被处理, 它可以
把waker
句柄告诉给套接字, 那么当数据可用的时候, waker
就会被调用.
The Waker type is essentially just a trait object for the Wake trait, which allows different executors to use different wakeup strategies: Waker类型实际上是Wake trait的trait对象, 允许不同的执行器执行不同的唤醒策略:
# #![allow(unused_variables)] #fn main() { trait Wake: Send + Sync { /// Signals that the associated task is ready to be `poll`ed again.; fn wake(arced: &Arc<Self>) } impl<T: Wake + 'static> From<Arc<T>> for Waker { fn from(wake: Arc<T>) -> Waker { ... } } #}
不过这有点抽象. 我们再具体过一遍整个故事吧, 也就是在本章我们要构建:
- 一个可以在单个OS线程上跑任意数量的任务的简单任务执行器
- 一个能够基于定时事件唤醒任务的简单定时时间循环
- 一个会将上面两者整合在一起的简单的实例
只要你理解了这些机制, 你就建立了理解这本书其他内容的基础.
task-model/exec.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
一个玩具任务执行器
来造一个任务执行器吧! 我们的目标是使任意数量的任务能够协调运行在单个OS线程上 成为可能. 为了保持示例简单, 我们挑选了最原始的数据结构来实现. 以下是我们需要 导入的数据结构:
# #![allow(unused_variables)] #fn main() { use std::mem; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex, MutexGuard}; use std::thread::{self, Thread}; #}
首先, 我们定义一个保持执行器状态的结构. 该执行器拥有任务本身, 并分配 给每个任务
一个usize
ID, 使得我们能够从外部指向这些任务. 特殊的, 执行器保持一个ready
集合, 存储应该要被唤醒的任务id(因为这些任务在等待的事件已经发生了):
# #![allow(unused_variables)] #fn main() { // the internal executor state struct ExecState { // The next available task ID. next_id: usize, // The complete list of tasks, keyed by ID. tasks: HashMap<usize, TaskEntry>, // The set of IDs for ready-to-run tasks. ready: HashSet<usize>, // The actual OS thread running the executor. thread: Thread, } #}
执行器本身只是将这个状态用Arc
和Mutex
包装起来, 允许它能够被其他线程使用
(即使所有任务都可以局部线程中运行):
# #![allow(unused_variables)] #fn main() { #[derive(Clone)] pub struct ToyExec { state: Arc<Mutex<ExecState>>, } #}
现在, ExecState
的tasks
字段提供了TaskEntry
实例, 这些实例包装了一个具体任务,
和对应的Waker
:
# #![allow(unused_variables)] #fn main() { struct TaskEntry { task: Box<ToyTask + Send>, wake: Waker, } #}
最后, 我们需要一些创建执行器并修改执行器状态的模板:
# #![allow(unused_variables)] #fn main() { impl ToyExec { pub fn new() -> Self { ToyExec { state: Arc::new(Mutex::new(ExecState { next_id: 0, tasks: HashMap::new(), ready: HashSet::new(), thread: thread::current(), })), } } // a convenience method for getting our hands on the executor state fn state_mut(&self) -> MutexGuard<ExecState> { self.state.lock().unwrap() } } #}
有了这些模板代码, 我们可以开始关注于执行器的内部工作原理. 我们最好从核心执行器 循环开始. 简便起见, 这个循环从来不退出; 它的职责仅仅是继续跑完所有分出线程的 新任务:
# #![allow(unused_variables)] #fn main() { impl ToyExec { pub fn run(&self) { loop { // Each time around, we grab the *entire* set of ready-to-run task IDs: let mut ready = mem::replace(&mut self.state_mut().ready, HashSet::new()); // Now try to `complete` each ready task: for id in ready.drain() { // we take *full ownership* of the task; if it completes, it will // be dropped. let entry = self.state_mut().tasks.remove(&id); if let Some(mut entry) = entry { if let Async::Pending = entry.task.poll(&entry.wake) { // The task hasn't completed, so put it back in the table. self.state_mut().tasks.insert(id, entry); } } } // we'd processed all work we acquired on entry; block until more work // is available. If new work became available after our `ready` snapshot, // this will b no-op. thread::park(); } } } #}
这里精妙的地方是, 在每一轮循环, 我们在开始的时候poll
所有准备好的东西, 然后
"park"线程. std
库的park
/unpark
让处理阻塞和唤醒OS线程变得很容易. 在
这个例子里, 我们想要的是执行器底层的OS线程阻塞, 直到一些额外的任务已经就绪. 这样
的话, 即使我们无法唤醒线程, 也不会有任何风险: 如果另外的线程在我们初次读取
ready
集与调用park
方法之间调用了unpark
方法, park
方法就会马上返回.
另一方面, 一个任务会像这样被唤醒:
# #![allow(unused_variables)] #fn main() { impl ExecState { fn wake_task(&mut self, id: usize) { self.ready.insert(id); // *after* inserting in the ready set, ensure the executor OS // thread is woken up if it's not already running. self.thread.unpark(); } } struct ToyWake { // A link back to the executor that owns the task we want to wake up. exec: ToyExec, // The ID for the task we want to wake up. id: usize, impl Wake for ToyWake { fn wake(&self) { self.exec.state_mut().wake_task(self.id); } } #}
剩下的部分就很直接了. spawn
方法负责将任务包装成TaskEntry
并执行它:
# #![allow(unused_variables)] #fn main() { impl ToyExec { pub fn spawn<T>(&self, task: T) where T: Task + Send + 'static { let mut state = self.state_mut(); let id = state.next_id; state.next_id += 1; let wake = ToyWake { id, exec: self.clone() }; let entry = TaskEntry { wake: Waker::from(Arc::new(wake)), task: Box::new(task) }; state.tasks.insert(id, entry); // A newly-added task is considered immediately ready to run, // which will cause a subsequent call to `park` to immediately // return. state.wake_task(id); } } #}
好了, 我们造了个任务调度器了! 但是在高兴之余, 一个很重要的地方是, 我们发现前面的
实现会因为Arc
循环引用而导致任务泄露. 试着解决这个问题吧, 这是个很好的练习.
但是, 就算解决了上面的问题, 它仍然是个"玩具"执行器. 现在, 让我们继续造个事件源, 让任务等待去处理.
task-model/events.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
一个玩具事件循环
异步编程经常被用于I/O, 但其实有很多其他类型的事件源. 在这一小节, 我们将会构建 一个简单的事件循环, 它将允许你注册一些可以在将来被唤醒的任务.
为了完成这件事, 我们需要一个专门的定时器事件线程, 它的工作就是在正确的时间唤醒 并执行任务. 这个定时器的消费者们只需要告诉定时器什么时候应该要唤醒它们, 以及如何 唤醒:
# #![allow(unused_variables)] #fn main() { use std::collections::BTreeMap; use std::sync::{Arc, mpsc}; use std::thread; use std::time::{Duration, Instant}; /// A handle to a timer, used for registering wakeups #[derive(Clone)] struct ToyTimer { tx: mpsc::Sender<Registration>, } /// A wakeup request struct Registration { at: Instant, wake: Waker, } /// State for the worker thread that processes timer events struct Worker { rx: mpsc::Receiver<Registration>, active: BTreeMap<Instant, Waker> } impl ToyTimer { fn new() -> ToyTimer { let (tx, rx) = mpsc::channel(); let worker = Worker { rx, active: BTreeMap::new() }; thread::spawn(|| worker.work()); ToyTimer { tx } } // Register a new wakeup with this timer fn register(&self, at: Instant, wake: Waker) { self.tx.send(Registration { at, wake }).unwrap(); } } #}
时间循环在Worker::work
方法里面实现. 基本的目标特简单: 我们维护一个记录目前
已注册的唤醒句柄(wakeups)的BTreeMap
, 并且用一个通道(channel)来进行注册. 这样做
的好处是:如果还没有到触发(fire)任何句柄的时刻, 但是我们已经有了一些注册好的句柄,
我们可以用通道上的recv_timeout
方法来等待要么一个进来的新注册事件, 或者等待
触发第一个已有句柄的时刻:
# #![allow(unused_variables)] #fn main() { impl Worker { fn enroll(&mut self, item: Registration) { if self.active.insert(item.at, item.wake).is_some() { // this simple setup doesn't support multiple registrations for // the same instant; we'll revisit that in the next section. panic!("Attempted to add to registrations for the same instant") } } fn fire(&mut self, key: Instant) { self.active.remove(&key).unwrap().wake(); } fn work(mut self) { loop { if let Some(first) = self.active.keys().next().cloned() { let now = Instant::now(); if first <= now { self.fire(first); } else { // we're not ready to fire off `first` yet, so wait until we are // (or until we get a new registration, which might be for an // earlier time). if let Ok(new_registration) = self.rx.recv_timeout(first - now) { self.enroll(new_registration); } } } else { // no existing registrations, so unconditionally block until // we receive one. let new_registration = self.rx.recv().unwrap(); self.enroll(new_registration); } } } } #}
完成了! 这种"事件循环"模式, 一个专用线程在不停地处理事件与注册(或阻塞直到有 新事件触发或注册), 是异步编程的基础. 幸运的是, 在Rust里实现异步编程, 你可以用 现成的(off-the-shelf)事件循环来驱动你感兴趣的事件, 而我们将会在下一章中看到.
但在那之前, 我们先把前面的执行器, 调度器整合成一个简单的app吧.
task-model/finish.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
整合任务执行器与事件循环
至此, 我们已经做了一个简单的执行器, 以在单线程中执行多个任务, 以及一个简单的 事件循环以分发定时事件, 当然用的也是单线程. 现在, 我们将他们整合到一起来构建一个 app, 这个app能够支持任意数量的周期任务"dinging", 而只需要两个OS线程.
为了达成这个目标, 我们需要创建一个Periodic
任务:
# #![allow(unused_variables)] #fn main() { struct Periodic { // a name for this task id: u64, // how often to "ding" period: Duration, // when the next "ding" is scheduled next: Instant, // a handle back to the timer event loop timer: Timer, } impl Periodic { fn new(id: u64, period: Duration, timer: Timer) -> Periodic { Periodic { id, period, timer, next: Instant::now() + period } } } #}
period
字段告诉我们要多频繁地打印"ding"信息. 这个实现很直接: 告诉任务是要永远
执行, 并且持续地在每经过一个period
时间后打印一次信息:
# #![allow(unused_variables)] #fn main() { impl Task for Periodic { fn poll(&mut self, wake: &Waker) -> Async<()> { // are we ready to ding yet? let now = Instant::now(); if now >= self.next { self.next = now + self.period; println!("Task {} - ding", self.id); } // make sure we're registered to wake up at the next expected `ding` self.timer.register(self.next, wake); Async::Pending } } #}
然后, 把以上的东西都放到一起:
fn main() { let timer = ToyTimer::new(); let exec = ToyExec::new(); for i in 1..10 { exec.spawn(Periodic::new(i, Duration::from_millis(i * 500), timer.clone())); } exec.run() }
这个程序最后产生类似这样的输出:
Task 1 - ding
Task 2 - ding
Task 1 - ding
Task 3 - ding
Task 1 - ding
Task 4 - ding
Task 2 - ding
Task 1 - ding
Task 5 - ding
Task 1 - ding
Task 6 - ding
Task 2 - ding
Task 3 - ding
Task 1 - ding
Task 7 - ding
Task 1 - ding
...
回过头看, 我们所做的有点魔法: Periodic
的Task
实现直接说明单个任务所应具有的
行为, 但之后我们把一堆任务交织(interleave)到仅仅两个OS线程! 这就是异步的力量!
练习: 多任务同时注册
例子的定时器事件循环有个panic代码: "无法在同一时刻注册任务".
- 可以在实例的程序中遇到这个panic吗?
- 如果我们仅仅是移除了panic, 会发生什么?
- 如何改进代码来完全避免这个问题呢?
练习: 逐渐结束程序
Periodic
和ToyExec
都被设计成不会停止的.
- 修改
Periodic
, 使得每个实例能够被设置为只会ding固定次数, 然后对应的任务停止. - 修改
ToyExec
, 使得当没有任务存在的时候, 他会停止执行. - 测试你的方案!
The real task system
实用任务系统
We've worked through toy definitions of a number of components in this chapter,
but now it's time to get acquainted with the more efficient and powerful
abstractions that are part of the futures
crate.
在这一章前面的部分, 我们已经完成了一系列组件的简单定义, 现在是时候让我们来理解
更加有效和强大的抽象方式. 这些抽象已经整合到了futures
库中.
task-model/real/tasks.md commit ffb00140a767d6e7a4a8875bf6965d10f830a271
任务
futures
库没有直接定义Task
trait, 取而代之地定义了更加通用的概念futures,
我们很快将深入了解它的细节. 现在, 让我们看看future的核心定义吧:
# #![allow(unused_variables)] #fn main() { /// An asynchronous computation that completes with a value or an error. trait Future { type Item; type Error; /// Attempt to complete the future, yielding `Ok(Async::Pending)` /// if the future is blocked waiting for some other event to occur. fn poll(&mut self, cx: task::Context) -> Poll<Self::Item, Self::Error>; } type Poll<T, E> = Result<Async<T>, E>; #}
Futures很像任务, 除了他们返回的是一个result(这允许它们进行组合). 换言之, 你可以
将任务理解为是任意实现了Future<Item = (), Error = !>
的类型
然而, 它们还有其他区别: Futures不需要WakeHandle
参数. 实践中, 这个参数几乎总是
从执行器处创建到将句柄(handle)排队时被传递下来并保持不变, 因此, 在futures
库
里, 执行器在现成局部变量里面提供了方便的WakeHanle
. 你可以用futures::task
里
的curent_wake
函数来获得它:
# #![allow(unused_variables)] #fn main() { /// When called within a task being executed, returns the wakeup handle for /// that task. Panics if called outside of task execution. fn current_wake() -> WakeHandle; #}
理解Future
和ToyTask
的关联
能够知道Future
和ToyTask
如何精确地联系在一起是很有帮助的. 为此, 我们引入了
一个包装类型来将ToyTask
转换成Future
:
# #![allow(unused_variables)] #fn main() { use futures::task; use futures::{Future, AsyncResult}; struct ToyTaskToFuture<T>(T); impl<T: ToyTask> Future for ToyTaskToFuture<T> { type Item = (); type Error = !; fn get(&mut self) -> AsyncResult<(), !> { Ok(self.0.complete(task::current_wake())) } } #}
task-model/real/exec.md commit 01ce17e6d1a62ffc514099e78b279399673ef2f9
执行器
First off, in the futures
crate, executors are objects that can spawn a
Future<Item = (), Error = !>
as a new task. There are two key executors to be
familiar with.
首先, 在futures
库里, 执行器是能够产生作为(spawn)新任务的
Future<Item = (), Error = !>
对象的对象. 有两种关键的执行器需要熟悉
线程池(ThreadPool
)执行器
The simplest executor is futures::executor::ThreadPool
, which schedules tasks
onto a fixed pool of OS threads. Splitting the tasks across multiple OS threads
means that even if a particular tick
invocation takes a long time, other tasks
can continue to make progress on other threads.
最简单的执行器是futures::executor::ThreadPool
, 能够在固定OS线程数量的线程池上
调度任务. 将任务跨线程分离意味着如果一个特定的tick
调度花费太长的时间, 其他任务
能够在其他线程上继续进行.
初始化和使用也是很直接:
# #![allow(unused_variables)] #fn main() { // Set up the thread pool, which spins up worker threads behind the scenes. let exec = ThreadPool::new(); // Spawn tasks onto the thread pool. exec.spawn(my_task); exec.spawn(other_task); #}
后面我们会看到一系列沟通已产生任务的不同方法.
请注意, 因为任务会在任意线程上执行, 所以它需要满足Send
和'static
当前线程(CurrentThread
)执行器
futures
库也提供了一个单线程执行器CurrentThread
, 类似我们创建的那个玩具版本.
它和线程池执行器的关键不同点是CurrentThread
能够执行非Send
和非'static
任务,
因为该执行器是通过在当前线程显式地被调用:
# #![allow(unused_variables)] #fn main() { // start up the CurrentThread executor, which by default runs until all spawned // tasks are complete: CurrentThread::run(|_| { CurrentThread::spawn(my_task); CurrentThread::spawn(other_task); }) #}
ThreadPool
和CurrentThread
执行器之间的取舍会在
本书后面章节中详细解释.
伪唤醒(Spurious wakeups)
总的来说, 执行器保证了它们会在任务被唤醒的任何时候去调用对应的get
方法. 然而,
它们也可以在其他时候调用get
. 因此, 任务不能假设每次get
的调用都有进展; 它们
应该经常重试之前阻塞了它们的操作, 并且准备再次等待
练习
- 用
ThreadPool
执行器重写前面小结的例子 - 用
CurrentThread
执行器重写前面小结的例子 - 对于timer例子, 使用这两种执行器的取舍是什么?
Event loops
Tokio: async network I/O
The tokio
crate complements the futures
crate by providing a low-level,
cross-platform way to do asynchronous network I/O. The crate's API is modeled
after std::net
and provides async versions of the same core functionality,
with strong cross-platform support.
This chapter covers both the primary tokio
networking APIs as well as some
important tools in the futures
crate for doing async I/O in general. It closes
by building a proxy server using tokio
directly that aims for low overhead by
minimizing the number of in-flight buffers needed at any time.
Acquiring a socket
Accepting TCP connections with Tokio is much like doing so with std::net
,
except that it works asynchronously. In particular, tokio::net
contains a
TcpListener
with an API similar to the std::net
version:
# #![allow(unused_variables)] #fn main() { type AsyncIoResult<T> = AsyncResult<T, io::Error>; impl TcpListener { fn bind(addr: &SocketAddr) -> io::Result<TcpListener>; fn accept(&mut self) -> AsyncIoResult<(TcpStream, SocketAddr)>; } #}
Just like the occurrence of Result
in a signature tells you that a function
may fail with an error, the occurrence of Async
or AsyncResult
tells you
that the function is intended to be used within the async task system. Thus,
looking at the two functions above, we can see that bind
is a a standard
synchronous function, while accept
is an asynchronous method.
To quickly see these APIs in action, let's build a future that will accept connections asynchronously, record the peer address, and then close the connection:
# #![allow(unused_variables)] #fn main() { use tokio::net::TcpListener; struct LogAndDrop { listener: TcpListener, } impl LogAndDrop { fn new(addr: &SocketAddr) -> io::Result<LogAndDrop> { LogAndDrop { listener: TcpListener::bind(addr)? } } } impl Future for LogAndDrop { type Item = (); type Error = io::Error; fn complete(&mut self) -> AsyncIoResult<()> { loop { match self.listener.accept(wake) { Ok(Async::Done((_, peer))) => { println!("Connected to peer {:?}", peer); } Ok(Async::WillWake) => { return Ok(Async::WillWake); } Err(e) => { println!("Error: {:?}; shutting down", e); return Err(e); } } } } } #}
Note that, in the case that we succeed in accepting a connection, after logging
it we immediately loop and try to take another. This is typical for async tasks:
the task code does as much work as it possibly can, stopping only when
encountering an obstruction (such as the listener returning WillWake
), at
which point it will be descheduled and woken up later, when the obstruction has
been cleared.
Reading and writing
The futures
crate contains an io
module, which is the async counterpart to
std::io
. That module defines, in particular, the core primitives for doing
async reading, writing, and flushing:
# #![allow(unused_variables)] #fn main() { trait AsyncRead { fn read(&mut self, buf: &mut [u8]) -> AsyncIoResult<usize>; } trait AsyncWrite { fn write(&mut self, buf: &[u8]) -> AsyncIoResult<usize>; fn flush(&mut self) -> AsyncIoResult<()>; } #}
These methods work exactly like their counterparts in std
, except that if the
underlying I/O object is not ready to perform the requested action, they return
Ok(Async::WillWake)
, and stash the given wake
to be used once I/O is
ready. Once more, the fact that their result type involves Async
is the clear
signal that they plug into the async task system.
Example: echoing input
While the AsyncRead
and AsyncWrite
traits are simple enough, there are some
significant differences in using them, compared to the synchronous
versions. Most importantly, async tasks generally have an explicit overall
state associated with them (which plays the role usually played by the stack in
synchronous programming). To see this concretely, let's write a task for echoing
everything sent on a socket. First, the basic setup:
# #![allow(unused_variables)] #fn main() { use tokio::net::TcpStream; // The task structure -- echoing on a *single* connection struct Echo { // The connection io: TcpStream, // Buffered data to be echoed back buf: Vec<u8>, // The current state of the "echo state machine" state: EchoState, } enum EchoState { // The next step is reading into `buf`, from the front Reading, // The next step is echoing back out `buf`, from the // given start and end points. Writing(usize, usize), } impl Echo { fn new(io: TcpStream) -> Echo { Echo { io, state: EchoState::Reading, buf: vec![0; 4096], } } } #}
The idea then is that the Echo
task alternates between reading and writing. If
at any point it is unable to perform that task, it returns Async::WillWake
,
having been enrolled to be woken when the needed I/O is available. Such
state-machine tasks almost always have an outer loop
that continuously moves
through the states until an obstruction is reached:
# #![allow(unused_variables)] #fn main() { impl Future for Echo { type Item = (); type Error = io::Error; fn complete(&mut self) -> AsyncIoResult<()> { loop { self.state = match self.state { EchoState::Reading => { match self.io.read(&mut self.buf)? { Async::WillWake => return Ok(Async::WillWake), Async::Done(len) => EchoState::Writing(0, len), } } EchoState::Writing(from, to) if from >= to => { EchoState::Reading } EchoState::Writing(from, to) => { match self.io.write(&self.buf[from..to])? { Async::WillWake => return Ok(Async::WillWake), Async::Done(n) => EchoState::Writing(from + n, to), } } }; } } } #}
It's important to note that we can freely "bubble up" WillWake
, because if a
function like read
, returns it, that function has already guaranteed to wake
up our task when read
ing is possible. In particular, the tokio
crate takes
care of stashing the WakeHandle
as necessary whenever we attempt an
AsyncRead::read
, and so on. All we have to do is bubble out the WillWake
result.
While the code here is not so complicated, it's a bit noisy for something so
simple. Much of the rest of this book will cover higher-level abstractions that
cut down on the noise. For this kind of low-level programming, though, the
futures crate provides a try_done
macro that works much like the ?
operator,
except that it also bubbles out Async::WillWake
. Using that macro, we can
rewrite the code as follows:
# #![allow(unused_variables)] #fn main() { impl Future for Echo { type Item = (); type Error = io::Error; fn complete(&mut self) -> AsyncIoResult<()> { loop { self.state = match self.state { EchoState::Reading => { let let = try_done!(self.io.read(&mut self.buf)); EchoState::Writing(0, len) } EchoState::Writing(from, to) if from >= to => { EchoState::Reading } EchoState::Writing(from, to) => { let n = try_done!(self.io.write(&self.buf[from..to])) EchoState::Writing(from + n, to) } }; } } } #}
As we'll see in the Futures chapter, though, we'll ultimately do better than this, by avoid writing a state machine at all.
Exercises
- What would happen if we did not include the outer
loop
? - Use the
CurrentThread
executor andTcpListener
to turn the above code into a complete, working server.
https://gist.github.com/alexcrichton/da80683060f405d6be0e06b426588886
Transforming at the byte level
Closing down a connection
In the synchronous world, you often don't have to worry too much about flushing. You can freely write to a buffered I/O object and it will periodically flush as you do so. And, most importantly, if at any point you drop the object, the remaining content of the buffer is automatically flushed. Worst case, there is an error trying to perform this flush, which gets swallowed; but generally there's not much you could've done about such an error anyway.
In the async world, flushing is more critical, for a simple reason: the model does not afford us the ability to automatically flush on drop. In particular, forcing a flush means potentially blocking the calling thread until that flush completes. Since async I/O objects are generally floating around within executor tasks, this is a non-starter; blocking an executor can bring the whole system to a halt.
Thus, it's critical to ensure all data is flushed before dropping an async I/O
object, using AsyncWrite::flush
.
Futures
Up until now, we've been working with the task model and I/O events in a
"direct" way, writing code that manually juggles Async
values and
WakeHandle
s. In this chapter, we'll see how this kind of code can be fit into
a general abstraction, futures, that provides a number of tools for working at
a higher level.
The core definition
In Rust, a future is an asynchronous computation that can be driven to produce a value. It represents a value that may become available in the future, but which requires pushing along the computation to produce it.
We've already seen the core definitions of the
Future
trait, describing such computations:
# #![allow(unused_variables)] #fn main() { /// An asynchronous computation that completes with a value or an error. trait Future { type Item; type Error; /// Attempt to complete the future, yielding `Ok(Async::WillWake)` /// if the future is blocked waiting for some other event to occur. fn get(&mut self) -> AsyncResult<Self::Item, Self::Error>; // ... and a large number of default methods that we'll meet shortly! } type AsyncResult<T, E> = Result<Async<T>, E>; enum Async<T> { /// Work completed with a result of type `T`. Done(T), /// Work was blocked, and the task is set to be woken when ready /// to continue. WillWake, } #}
Just calling get
once does not guarantee that a final value will be
produced, and if the future is blocked waiting on some other event to occur, it
is not guaranteed to make progress until get
is called again. The first part
of this chapter will focus on exactly who calls get
, and when.
What makes futures interesting is that, by abstracting out the very general idea of "computing something asychronously", we make it possible to combine such computations in expressive and surprising ways. This also informs their relationship to tasks: a task is generally made up of a number of smaller futures that have been stitched together.
Example: ReadExact
Let's get started with a useful example. The standard library's Read
trait
provides a convenient function, read_exact
, which reads a specific number of
bytes from an I/O object (which may require issuing multiple calls to the read
method).
We want to "port" this functionality to the async world. Futures are a perfect match: we can represent
# #![allow(unused_variables)] #fn main() { struct ReadExactData<R> { reader: R, buf: Vec<u8>, } struct ReadExact<R> { data: Option<ReadExactData<R>>, from: usize, to: usize, } fn read_exact<R>(reader: R, len: usize) -> ReadExact<R> { ReadExact { data: Some(ReadExactData { reader, buf: vec![0; len], }, from: 0, to: len, } } #}
# #![allow(unused_variables)] #fn main() { impl<R: AsyncRead> Future for ReadExact<R> { type Item = ReadExactData<R>; type Error = io::Error; fn get(&mut self) -> AsynIoResult<Self::Item> { use std::mem; while self.from < self.to { let data = self.data.as_mut().unwrap(); let n = try_done!(data.read(&mut data.buf[data.from .. data.to])); data.from += n; } Ok(Async::Done(self.data.take().unwrap())) } } #}
Example: a timeout wrapper
Push and pull: futures and tasks
This description is hopefully not surprising, given the previous few chapters. However, it's important to realize that this setup is drastically different from futures (aka promises) in other languages:
-
Rust futures follow a pull model, where, once a future is blocked by an event, it must be retried through a call to
get
(i.e., by repeatedly trying to "pull" a value out). -
Other futures libraries follow a push model, where the completion of one event automatically triggers a cascade of follow-up work, pushing the computation forward. That follow-up work is usually given by registering callbacks with a future.