Применение: создание исполнителя
Футуры Rust'a ленивы: они ничего не будут делать, если не будут активно выполняться. Один из способов довести future до завершения - это .await
и функция async
внутри него, но это просто подталкивает проблему на один уровень вверх: кто будет
запускать future, возвращённые из async
функций верхнего уровня? Ответ в том,
что нам нужен исполнитель для Future
.
Исполнители берут набор future верхнего уровня и запускают их через вызов метода poll
, до тех пока они не завершатся. Как правило, исполнитель будет вызывать метод
poll
у future один раз, чтобы запустить. Когда future сообщают, что готовы продолжить вычисления при вызове метода wake()
, они помещаются обратно в очередь и вызов poll
повторяется до тех пор, пока Future
не будут завершены.
В этом разделе мы напишем нашего собственного простого исполнителя, способного одновременно запускать большое количество future верхнего уровня.
В этом примере мы зависим от пакета futures
, в котором определён типаж ArcWake
. Данный типаж предоставляет простой способ для создания Waker
.
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
Дальше, мы должны в верней части файла src/main.rs
разместить следующий список зависимостей:
# #![allow(unused_variables)] #fn main() { use { futures::{ future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}, }, std::{ future::Future, sync::{Arc, Mutex}, sync::mpsc::{sync_channel, SyncSender, Receiver}, task::{Context, Poll}, time::Duration, }, // The timer we wrote in the previous section: timer_future::TimerFuture, }; #}
Наш исполнитель будет работать, посылая задачи для запуска по каналу. Исполнитель извлечёт события из канала и запустит их. Когда задача готова выполнить больше работы (будет пробуждена), она может запланировать повторный опрос самой себя, отправив себя обратно в канал.
В этом проекте самому исполнителю просто необходим получатель для канала задачи. Пользователь получит экземпляр отправителя, чтобы он мог создавать новые future. Сами задачи - это просто future, которые могут перепланировать самих себя, поэтому мы сохраним их как future в сочетании с отправителем, который задача может использовать, чтобы запросить себя.
# #![allow(unused_variables)] #fn main() { /// Task executor that receives tasks off of a channel and runs them. struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner` spawns new futures onto the task channel. #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A future that can reschedule itself to be polled by an `Executor`. struct Task { /// In-progress future that should be pushed to completion. /// /// The `Mutex` is not necessary for correctness, since we only have /// one thread executing tasks at once. However, Rust isn't smart /// enough to know that `future` is only mutated from one thread, /// so we need use the `Mutex` to prove thread-safety. A production /// executor would not need this, and could use `UnsafeCell` instead. future: Mutex<Option<BoxFuture<'static, ()>>>, /// Handle to place the task itself back onto the task queue. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // Maximum number of tasks to allow queueing in the channel at once. // This is just to make `sync_channel` happy, and wouldn't be present in // a real executor. const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender}) } #}
Давайте также добавим метод к spawner
, чтобы было легко создавать новые futures
.
Этот метод возьмёт future, упакует и поместит его в FutureObj
и создаст новую Arc<Task>
с ней внутри, которая может быть поставлена в очередь
исполнителя.
# #![allow(unused_variables)] #fn main() { impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); } } #}
Чтобы опросить futures
, нам нужно создать Waker
.
Как описано в разделе задачи пробуждения, Waker
s отвечают
за планирование задач, которые будут опрошены снова после вызова wake
. Waker
s сообщают исполнителю, какая именно задача завершилась, позволяя
опрашивать как раз те futures
, которые готовы к продолжению выполнения. Простой способ
создать новый Waker
, необходимо реализовать типаж ArcWake
, а затем использовать
waker_ref
или .into_waker()
функции для преобразования Arc<impl ArcWake>
в Waker
. Давайте реализуем ArcWake
для наших задач, чтобы они были
превращены в Waker
s и могли пробуждаться:
# #![allow(unused_variables)] #fn main() { impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); } } #}
Когда Waker
создаётся на основе Arc<Task>
, вызывая wake()
, это
вызовет отправку копии Arc
в канал задач. Тогда нашему исполнителю
нужно подобрать задание и опросить его. Давайте реализуем это:
# #![allow(unused_variables)] #fn main() { impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // Take the future, and if it has not yet completed (is still Some), // poll it in an attempt to complete it. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` from the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>` is a type alias for // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. // We can get a `Pin<&mut dyn Future + Send + 'static>` // from it by calling the `Pin::as_mut` method. if let Poll::Pending = future.as_mut().poll(context) { // We're not done processing the future, so put it // back in its task to be run again in the future. *future_slot = Some(future); } } } } } #}
Поздравляю! Теперь у нас есть работающий исполнитель futures
. Мы даже можем использовать его
для запуска async/.await
кода и пользовательских futures
, таких как TimerFuture
которую мы описали ранее:
fn main() { let (executor, spawner) = new_executor_and_spawner(); // Spawn a task to print before and after waiting on a timer. spawner.spawn(async { println!("howdy!"); // Wait for our timer future to complete after two seconds. TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // Drop the spawner so that our executor knows it is finished and won't // receive more incoming tasks to run. drop(spawner); // Run the executor until the task queue is empty. // This will print "howdy!", pause, and then print "done!". executor.run(); }