Начало работы

Добро пожаловать в асинхронное программирование в Rust! Если вы собираетесь начать писать асинхронный код на Rust, вы находитесь в правильном месте. Строите ли вы web-сервер, базу данных или операционную систему, эта книга покажет вам как использовать инструменты асинхронного программирования, чтобы получить максимальную отдачу от вашего оборудования.

Что эта книга охватывает?

Эта книга призвана стать исчерпывающим, современным руководством по использованию асинхронных языковых возможностей и библиотек Rust, подходящим как новичкам, так и опытным.

  • Ранние главы содержат введение в асинхронное программирование в целом, а также его особенности в Rust.

  • В средних главах обсуждаются ключевые утилиты и инструменты управления потоком, которые вы можете использовать, когда пишете асинхронный код. Также здесь описаны лучшие практики структурирования библиотек и приложений для получения максимальной производительности и повторного использования кода.

  • Последняя глава книги покрывает асинхронную экосистему и предоставляет ряд примеров того, как можно выполнить общие задачи.

Итак, давайте исследуем захватывающий мир асинхронного программирования в Rust!

Для чего нужна асинхронность?

Все мы любим то, что Rust позволяет нам писать быстрые и безопасные приложения. Но для чего писать асинхронный код?

Асинхронный код позволяет нам запускать несколько задач параллельно в одном потоке ОС. Если вы ходите одновременно загрузить две разных web-страницы в обычном приложении, вы должны разделить работу между двумя разным потоками, как тут:


# #![allow(unused_variables)]
#fn main() {
fn get_two_sites() {
    // Spawn two threads to do work.
    let thread_one = thread::spawn(|| download("https:://www.foo.com"));
    let thread_two = thread::spawn(|| download("https:://www.bar.com"));

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}
#}

Для многих приложений это замечательно работает - в конце концов, потоки были разработаны именно для этого: запускать несколько разных задач одновременно. Однако, они имеют некоторые ограничения. В процессе переключения между разными потоками и обменом данными между ними возникает много накладных расходов. Даже поток, который сидит и ничего не делает, использует ценные системные ресурсы. Асинхронный код предназначен для устранения этих проблем. Мы можем переписать функции выше используя Rust нотацию async/.await, которая позволяет нам запустить несколько задач одновременно, не создавая несколько потоков:


# #![allow(unused_variables)]
#fn main() {
async fn get_two_sites_async() {
    // Create a two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    let future_one = download_async("https:://www.foo.com");
    let future_two = download_async("https:://www.bar.com");

    // Run both futures to completion at the same time.
    join!(future_one, future_two);
}
#}

В целом, асинхронные приложения могут быть намного быстрее и использовать меньше ресурсов, чем соответствующая многопоточная реализация. Однако, есть и обратная сторона. Потоки изначально поддерживаются операционной системой и их использование не требует какой-либо специальной модели программирования - любая функция может создать поток и вызов функции, использующей поток, обычно так же прост, как вызов обычной функции. Тем не менее, асинхронные функции требует специальной поддержки со стороны языка или библиотек. В Rust, async fn создаёт асинхронную функцию, которая возвращает Future. Для выполнения тела функции, возвращённая Future должна быть завершена.

Важно помнить, что традиционные приложения с потоками могут быть вполне эффективными и предсказуемость Rust и небольшой объём памяти могут значить, что вы можете далеко продвинуться без использования async. Повышенная сложность асинхронной модели программирования не всегда стоит этого и важно понимать, когда ваше приложения будет лучше работать с использованием просто поточной модели.

Состояние асинхронности в Rust

Асинхронная экосистема Rust претерпела большую эволюцию с течением времени, поэтому может быть трудно понять, какие инструменты использовать, в какие библиотеки инвестировать, или какую документацию читать. Однако типаж Future внутри стандартной библиотеке и async/await в языке последнее время стабилизировалась. Таким образом, экосистема в целом находится в процессе миграции к недавно стабилизированному API, после чего точка оттока будет значительно уменьшена.

Тем не менее, сейчас экосистема всё ещё находится в стадии активной разработки и асинхронный опыт в Rust не отполирован. Многие библиотеки до сих пор используют пакет futures версии 0.1, а это значит, что для взаимодействия с ними разработчикам часто требуется функциональность compat из пакета futures версии 0.3. async/await до сих пор новы. Важное расширение синтаксиса, как async fn, для методов типажей до сих пор не реализовано, и текущие сообщения компилятора об ошибках могут быть сложны для восприятия.

Это говорит о том, что Rust на пути к более эффективной и эргономичной поддержке асинхронного программирования и если вы не боитесь изменений, наслаждайтесь погружением в мир асинхронного программирования в Rust!

Пример async/.await

async/.await - это встроенные в Rust инструменты для написания асинхронного кода. async преобразует блок кода в конечный автомат, который реализует типаж, зовущийся Future. В то время как вызов блокирующей функции в синхронном методе заблокирует весь поток, блокировка Future вернёт контроль над потоком, позволяя работать другим Future.

Для создания асинхронной функции, вы можете использовать синтаксис async fn:


# #![allow(unused_variables)]
#fn main() {
async fn do_something() { ... }
#}

Значение, возвращённоеasync fn - Future. Что бы ни произошло, Future должна быть запущена в исполнителе.

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

Внутри async fn вы можете использовать .await для ожидания завершения другого типа, реализующего типаж Future (например, полученного из другой async fn). В отличие от block_on, .await не блокирует текущий поток, но асинхронно ждёт завершения футуры, позволяя другим задачам выполняться, если в данный момент футура не может добиться прогресса.

Например, представим что у нас есть три async fn: learn_song, sing_song и dance:


# #![allow(unused_variables)]
#fn main() {
async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }
#}

Один из путей учиться, петь и танцевать - останавливаться на каждом из них:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

Тем не менее, в этом случае мы не получаем наилучшей производительности - мы одновременно делаем только одно дело! Очевидно, что мы должны выучить песню до того, как петь её, но мы можем танцевать в то же время, пока учим песню и поём её. Чтобы сделать это, мы создадим две отдельные async fn, которые могут запуститься параллельно:

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

В этом примере, запоминание песни должно быть сделано до пения песни, но и запоминание и пение могут завершиться одновременно с танцем. Если мы используем block_on(learn_song()) вместо learn_song().await в learn_and_sing, поток не может делать ничего другого, пока запущена learn_song. Из-за этого мы одновременно с этим не можем танцевать. Пока ожидается (.await) футура learn_song, мы разрешаем другим задачам захватить текущий поток, если learn_song заблокирована. Это делаем возможным запуск нескольких футур, завершающихся параллельно в одном потоке.

Теперь мы изучили основы async/await, давайте посмотрим их в работе.

Применение: HTTP сервер

Давайте используем async/.await для создания echo-сервера!

Для начала, запустите rustup update nightly чтобы быть уверенным, что используете последнюю версию Rust - мы работаем с передовыми функциями, так что надо быть обновлённым. Когда вы закончите это, создайте новый проект с помощью cargo +nightly new async-await-echo и откройте созданную директорию async-await-echo.

Добавим некоторые зависимости в файл Cargo.toml:

[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] }

# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"

Теперь, когда у нас есть свои зависимости, давайте начнём писать код. Вот список зависимостей, которые необходимо добавить:


# #![allow(unused_variables)]
#fn main() {
use {
    hyper::{
        // Miscellaneous types from Hyper for working with HTTP.
        Body, Client, Request, Response, Server, Uri,

        // This function turns a closure which returns a future into an
        // implementation of the the Hyper `Service` trait, which is an
        // asynchronous function from a generic `Request` to a `Response`.
        service::service_fn,

        // A function which runs a future to completion using the Hyper runtime.
        rt::run,
    },
    futures::{
        // Extension trait for futures 0.1 futures, adding the `.compat()` method
        // which allows us to use `.await` on 0.1 futures.
        compat::Future01CompatExt,
        // Extension traits providing additional methods on futures.
        // `FutureExt` adds methods that work for all futures, whereas
        // `TryFutureExt` adds methods to futures that return `Result` types.
        future::{FutureExt, TryFutureExt},
    },
    std::net::SocketAddr,
};
#}

Как только закончим с импортами, мы можем собрать вместе весь шаблонный код, который позволит обрабатывать запросы:

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // Always return successfully with a response containing a body with
    // a friendly greeting ;)
    Ok(Response::new(Body::from("hello, world!")))
}

async fn run_server(addr: SocketAddr) {
    println!("Listening on http://{}", addr);

    // Create a server bound on the provided address
    let serve_future = Server::bind(&addr)
        // Serve requests using our `async serve_req` function.
        // `serve` takes a closure which returns a type implementing the
        // `Service` trait. `service_fn` returns a value implementing the
        // `Service` trait, and accepts a closure which goes from request
        // to a future of the response. To use our `serve_req` function with
        // Hyper, we have to box it and put it in a compatability
        // wrapper to go from a futures 0.3 future (the kind returned by
        // `async fn`) to a futures 0.1 future (the kind used by Hyper).
        .serve(|| service_fn(|req| serve_req(req).boxed().compat()));

    // Wait for the server to complete serving or exit with an error.
    // If an error occurred, print it to stderr.
    if let Err(e) = serve_future.compat().await {
        eprintln!("server error: {}", e);
    }
}

fn main() {
    // Set the address to run our socket on.
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    // Call our `run_server` function, which returns a future.
    // As with every `async fn`, for `run_server` to do anything,
    // the returned future needs to be run. Additionally,
    // we need to convert the returned future from a futures 0.3 future into a
    // futures 0.1 future.
    let futures_03_future = run_server(addr);
    let futures_01_future = futures_03_future.unit_error().boxed().compat();

    // Finally, we can run the future to completion using the `run` function
    // provided by Hyper.
    run(futures_01_future);
}

Если вы сейчас запустите cargo run, в консоли вы увидите сообщение "Listening on http://127.0.0.1:3000". Если вы откроете URL в вашем любимом браузере, вы увидите как в нём отобразится "hello, world!". Поздравляем! Вы только что написали свой первый асинхронный web-сервер на Rust.

Вы также можете посмотреть сам запрос, который содержит такую информацию, как URI, версию HTTP, заголовки и другие метаданные. Например, мы можем вывести URI запроса следующим образом:


# #![allow(unused_variables)]
#fn main() {
println!("Got request at {:?}", req.uri());
#}

Вы могли заметить, что мы до сих пор не делали ничего асинхронного для обработки запроса - мы только незамедлительно ответили на него, мы не пользуемся гибкостью, которую нам даёт async fn. Вместо этого, мы только возвращаем статическое сообщение. Давайте попробуем проксировать пользовательский запрос на другой web-сайт используя HTTP-клиент Hyper'а.

Мы начнём с парсинга URL, который мы хотим запросить:


# #![allow(unused_variables)]
#fn main() {
let url_str = "http://www.rust-lang.org/en-US/";
let url = url_str.parse::<Uri>().expect("failed to parse URL");
#}

Затем мы создадим новый hyper::Client и используем его для создания GET запроса, который вернём пользователю ответ:


# #![allow(unused_variables)]
#fn main() {
let res = Client::new().get(url).compat().await;
// Return the result of the request directly to the user
println!("request finished-- returning response");
res
#}

Client::get возвращает hyper::client::FutureResponse, который реализует Future<Output = Result<Response, Error>> (или Future<Item = Response, Error = Error> в терминах futures 0.1). Когда мы разрешаем (.await) футуру, отправляется HTTP-запрос, текущая задача приостанавливается и становится в очередь, чтобы продолжить работу после получения ответа.

Если вы сейчас запустите cargo run и откроете http://127.0.0.1:3000/foo в браузере, вы увидите домашнюю страницу Rust, а в консоли следующий вывод:

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

Поздравляем! Вы только что проксировали HTTP запрос.

Под капотом: выполнение Future и задач

В этом разделе мы рассмотрим как планируются Future и асинхронные задачи. Если вам только интересно изучить как писать высокоуровневый код, который использует существующие типы Future, и не интересуетесь как Future работает, вы можете сразу перейти к главе async/await. Тем не менее, некоторые темы, которые обсуждаются в этой главе, полезны для понимания работы async/await кода и построения новых асинхронных примитивов. Если сейчас вы решили пропустить этот раздел, вы можете добавить его в закладки, чтобы вернуться к нему в будущем.

Теперь давайте рассмотрим типаж Future.

Типаж Future

Типаж Future является центральным для асинхронного программирования в Rust. Future - это асинхронное вычисление, которое может производить значение (хотя значение может быть и пустым, например ()). Упрощённый вариант этого типажа может выглядеть как-то так:


# #![allow(unused_variables)]
#fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
#}

Футуры могут быть продвинуты(?) при помощи функции poll, которая продвигает их так далеко, на сколько это возможно. Если футура завершается, она возвращает Poll::Ready(result). Если же она до сих пор не готова завершиться, то - Poll::Pending и предоставляет функцию wake(), которая будет вызвана, когда Future будет готова совершить прогресс(?). Когда wake() вызван, исполнитель снова вызывает у Future метод poll, чтобы она смогла продвинуться(?).

Без wake(), исполнитель не имеет возможности узнать, когда какая-либо future может продвинуться, и был бы должен постоянно опрашивать каждую future. С wake() точно знает какие futures можно опросить.

Например, представим ситуацию, когда мы хотим прочитать из сокета, который может иметь, а может и не иметь данных. Если данные есть, мы можем прочитать их и вернуть Poll::Ready(data), но если данных ещё нет, наша future блокируется и не может прогрессировать. Когда данных нет, мы должны зарегистрировать вызов wake, когда данные появятся в сокете


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

Такая модель future позволяет держать вместе несколько асинхронных операций без лишних промежуточных выделений памяти. Одновременный запуск нескольких futures или соединение их в цепочку может быть реализовано при помощи машины состояний, не делающий выделений памяти, например так:


# #![allow(unused_variables)]
#fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
#}

Здесь показано, как несколько футур могут быть запущены одновременно без необходимости раздельной аллокации, позволяя асинхронным программам быть более эффективными. Аналогично, несколько последовательных футур могут быть запущены одна за другой, как тут:


# #![allow(unused_variables)]
#fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
#}

Этот пример показывает, как типаж Future может использоваться для выражения асинхронного управления потоком без необходимости множественной аллокации объектов и глубоко вложенных замыканий. Оставим базовое управление потоком в стороне и давайте поговорим о реальном типаже Future и чем он отличается.


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
#}

Первое, что вы могли заметить, что наш тип self больше не &mut self, а заменён на Pin<&mut Self>. Мы поговорим о закреплении в следующей секции, но пока что знайте, что оно позволяет нам создавать неперемещаемые futures. Неперемещаемые объекты могут сохранять указатели на собственные поля, например struct MyFut { a: i32, ptr_to_a: *const i32 }. Прикрепление необходимо для async/await.

Второе, wake: fn() была изменена на &mut Context<'_>. В SimpleFuture мы использовали вызов указателя на функцию (fn()) чтобы сказать исполнителю, что future должна быть опрошена. Однако, так как fn() имеют нулевой тип, они не могут сохранить информацию о том какая future вызвала wake.

В примере из реального мира, сложное приложение, как web-сервер может иметь тысячи различных подключений, все пробуждения которых должны обрабатываться отдельно. Тип Context решает это предоставляя доступ к значению типа Waker, который может быть использован для пробуждения конкретной задачи.

Вызовы задачи при помощи Waker

Обычно футуры не могут завершиться сразу же, как их опросили (вызвали метод poll). Когда это случается, футура должна быть уверена, что, когда она будет готова прогрессировать,
она будет снова опрошена. Это решается при помощи типа Waker.

Каждый раз, когда футура опрашивается, она бывает частью "задачи". Задачи - это высокоуровневые футуры, с которыми работает исполнитель.

Waker обеспечивает метод wake(), который может быть использован, чтобы сказать исполнителю, что соответствующая задача должна быть пробуждена. Когда вызывается wake(), исполнитель знает, что задача, связанная с Waker, готова к выполнению, и в будущем должна быть опрошена снова.

Waker так же реализует clone(). Так вы можете его копировать где необходимо и хранить.

Давайте попробуем реализовать простой таймер с использованием Waker.

Применение: Создание таймера

В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем сообщаем о том когда время таймера истечёт.

Вот список зависимостей, которые нам понадобятся для начала:


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

Давайте определим тип нашей future. Нашей future необходим канал связи, чтобы сообщить о том что время таймера истекло и future должна завершиться. В качестве канала связи между таймером и future мы будем использовать разделяемое значение Arc<Mutex<..>>.


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
#}

Теперь давайте напишем реализацию Future!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

Просто, не так ли? Если поток установит shared_state.completed = true, мы закончили! В противном случае мы клонируем Waker для текущей задачи и сохраняем его в shared_state.waker. Так поток может разбудить задачу позже.

Важно отметить, что мы должны обновлять Waker каждый раз, когда future опрашивается, потому что future может быть перемещена в другую задачу с другим Waker. Это может произойти когда футуры передаются между задачами после опроса.

Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

Это всё, что нам нужно для того, чтобы построить простой таймер на future. Теперь нам нужен исполнитель, чтобы запустить future на исполнение.

Применение: создание исполнителя

Футуры Rust'a ленивы: они ничего не будут делать, если не будут активно выполняться. Один из способов довести future до завершения - это .await и функция async внутри него, но это просто подталкивает проблему на один уровень вверх: кто будет запускать future, возвращённые из async функций верхнего уровня? Ответ в том, что нам нужен исполнитель для Future.

Исполнители берут набор future верхнего уровня и запускают их через вызов метода poll, до тех пока они не завершатся. Как правило, исполнитель будет вызывать метод poll у future один раз, чтобы запустить. Когда future сообщают, что готовы продолжить вычисления при вызове метода wake(), они помещаются обратно в очередь и вызов poll повторяется до тех пор, пока Future не будут завершены.

В этом разделе мы напишем нашего собственного простого исполнителя, способного одновременно запускать большое количество future верхнего уровня.

В этом примере мы зависим от пакета futures, в котором определён типаж ArcWake. Данный типаж предоставляет простой способ для создания Waker.

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.17"

Дальше, мы должны в верней части файла src/main.rs разместить следующий список зависимостей:


# #![allow(unused_variables)]
#fn main() {
use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};
#}

Наш исполнитель будет работать, посылая задачи для запуска по каналу. Исполнитель извлечёт события из канала и запустит их. Когда задача готова выполнить больше работы (будет пробуждена), она может запланировать повторный опрос самой себя, отправив себя обратно в канал.

В этом проекте самому исполнителю просто необходим получатель для канала задачи. Пользователь получит экземпляр отправителя, чтобы он мог создавать новые future. Сами задачи - это просто future, которые могут перепланировать самих себя, поэтому мы сохраним их как future в сочетании с отправителем, который задача может использовать, чтобы запросить себя.


# #![allow(unused_variables)]
#fn main() {
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}
#}

Давайте также добавим метод к spawner, чтобы было легко создавать новые futures. Этот метод возьмёт future, упакует и поместит его в FutureObj и создаст новую Arc<Task> с ней внутри, которая может быть поставлена в очередь исполнителя.


# #![allow(unused_variables)]
#fn main() {
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
#}

Чтобы опросить futures, нам нужно создать Waker. Как описано в разделе задачи пробуждения, Wakers отвечают за планирование задач, которые будут опрошены снова после вызова wake. Wakers сообщают исполнителю, какая именно задача завершилась, позволяя опрашивать как раз те futures, которые готовы к продолжению выполнения. Простой способ создать новый Waker, необходимо реализовать типаж ArcWake, а затем использовать waker_ref или .into_waker() функции для преобразования Arc<impl ArcWake> в Waker. Давайте реализуем ArcWake для наших задач, чтобы они были превращены в Wakers и могли пробуждаться:


# #![allow(unused_variables)]
#fn main() {
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}
#}

Когда Waker создаётся на основе Arc<Task>, вызывая wake(), это вызовет отправку копии Arc в канал задач. Тогда нашему исполнителю нужно подобрать задание и опросить его. Давайте реализуем это:


# #![allow(unused_variables)]
#fn main() {
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}
#}

Поздравляю! Теперь у нас есть работающий исполнитель futures. Мы даже можем использовать его для запуска async/.await кода и пользовательских futures, таких как TimerFuture которую мы описали ранее:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Исполнители и системный ввод/вывод

В главе "Типаж Future", мы обсуждали такой пример future, которая выполняет асинхронное чтение сокета:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

Эта future читает доступные данные из сокета и если доступных данных нет, то она будет передана исполнителю с запросом активирования задачи, если сокет снова станет читаемым. Однако, из текущего примера не ясна реализация типа Socket и, в частности, не совсем очевидно как работает функция set_readable_callback. Как мы можем сделать так, чтобы lw.wake() был вызван, когда сокет станет читаемым? Один из вариантов - иметь поток, который постоянно проверяет стал ли socket читаемым, вызывая при необходимости метод wake(). Тем не менее, такой подход будет весьма не эффективным, так как он требует отдельного потока для каждой блокирующей IO future. Это значительно снизит эффективность нашего асинхронного кода.

На практике эта проблема решается при помощи интеграции с IO - зависимыми системными блокирующими примитивами такими, как epoll в Linux, kqueue во FreeBSD и Mac OS, IOCP в Windows и port в Fuchsia (все они предоставляются при помощи кроссплатформенного Rust-пакета mio). Все эти примитивы позволяют потоку заблокироваться с несколькими асинхронными IO-событиями, возвращая одно из завершённых событий. На практике эти API выглядят примерно так:


# #![allow(unused_variables)]
#fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // ID уникально идентифицирующее событие, которое уже произошло и на которое мы подписались.
    id: usize,

    // Набор сигналов для ожидания или которые произошли.
    signals: Signals,
}

impl IoBlocker {
    /// Создаём новую коллекцию асинхронных IO-событий для блокировки.
    fn new() -> Self { ... }

    /// Проявим интерес к определённому IO-событию.
    fn add_io_event_interest(
        &self,

        /// Объект, на котором происходит событие
        io_object: &IoObject,

        /// Набор сигналов, которые могут применяться к `io_object`,
        /// для которых должно быть инициировано событие, в паре с
        /// ID, которые передадутся событиям, получившимся в результате нашего интереса.
        event: Event,
    ) { ... }

    /// Заблокируется до появления одного из событий.
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// выведет что-то похожее на "Socket 1 is now READABLE", если сокет станет доступным для чтения.
println!("Socket {:?} is now {:?}", event.id, event.signals);
#}

Исполнители Future могут использовать эти примитивы для предоставления асинхронных объектов ввода-вывода, таких как сокеты, которые могут настроить обратные вызовы для запуска при определённом событии ввода-вывода. В случае нашего примера SocketRead выше, Socket::set_readable_callback функция может выглядеть следующим псевдокодом:


# #![allow(unused_variables)]
#fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` это ссылка на локальный исполнитель.
        // это может быть предусмотрено при создании сокета, 
        // большинство реализаций исполнителей делают это через локальный поток, так удобнее.
        let local_executor = self.local_executor;

        // Уникальный ID для объекта ввода вывода.
        let id = self.id;

        // Сохраним `waker` в данных исполнителя,
        // чтобы его можно было вызвать после того как будет получено событие.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
#}

Теперь у нас может быть только один поток исполнителя, который может принимать и отправлять любые события ввода-вывода в соответствующий Waker, который разбудит соответствующую задачу, позволяющая исполнителю довести больше задач до завершения перед возвратом, чтобы проверить больше событий ввода-вывода (и цикл продолжается...).

async/await

В первой главе мы бросили беглый взгляд на async/.await и использовали это чтобы построить простой сервер. В этой главе мы обсудим async/.await более подробно, объясняя, как это работает и как async код отличается от традиционных программ на Rust.

async/.await - это специальный синтаксис Rust, который позволяет передавать контроль выполнения в потоке другому коду, пока ожидается окончание завершения, а не блокировать поток.

Существует два основных способа использования async: async fn и async блоки. Каждый возвращает значение, реализующее типаж Future :


# #![allow(unused_variables)]

#fn main() {
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
#}

Как мы видели в первой главе, async блоки и другие futures ленивы: они ничего не делают, пока их не запустят. Наиболее распространённый способ запуска Future - это .await. Когда .await вызывается на Future, он пытается завершить выполнение до конца. Если Future заблокирована, то контроль будет передан текущему потоку. Чтобы добиться большего прогресса, будет выбрана верхняя Future исполнителя, позволяя .await продолжить работу.

Времена жизни async

В отличие от традиционных функций, async fn, которые принимают ссылки или другие не-'static аргументы, возвращают Future, которая ограничена временем жизни аргумента:


# #![allow(unused_variables)]
#fn main() {
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}
#}

Это означает, что future, возвращаемая из async fn, должен быть вызван .await до тех пор пока её не-'static аргументы все ещё действительны. В общем случае, вызов .await у future сразу после вызова функции (как в foo(&x).await) это не проблема. Однако, если сохранить future или отправить её в другую задачу или поток, это может быть проблемой.

Один общий обходной путь для включения async fn со ссылками в аргументах в 'static future состоит в том, чтобы связать аргументы с вызовом async fn внутри async блока:


# #![allow(unused_variables)]
#fn main() {
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}
#}

Перемещая аргумент в async блок, мы продлеваем его время жизни до времени жизни Future, которая возвращается при вызове foo.

async move

async блоки и замыкания позволяют использовать ключевое слово move, как обычные замыкания. async move блок получает владение переменными со ссылками, позволяя им пережить текущую область, но отказывая им в возможности делиться этими переменными с другим кодом:


# #![allow(unused_variables)]
#fn main() {
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}
#}

.await в многопоточном исполнителе

Обратите внимание, что при использовании Future в многопоточном исполнителе, Future может перемещаться между потоками, поэтому любые переменные, используемые в телах async, должны иметь возможность перемещаться между потоками, как и любой .await потенциально может привести к переключению на новый поток.

Это означает, что не безопасно использовать Rc, &RefCell или любые другие типы, не реализующие типаж Send (включая ссылки на типы, которые не реализуют типаж Sync).

(Предостережение: можно использовать эти типы до тех пор, пока они не находятся в области действия вызова .await.)

Точно так же не очень хорошая идея держать традиционную non-futures-aware блокировку через .await, так как это может привести к блокировке пула потоков: одна задача может получить объект блокировки, вызвать .await и передать управление исполнителю, разрешив другой задаче совершить попытку взять блокировку, что и вызовет взаимоблокировку. Чтобы избежать этого, используйте Mutex из futures::lock, а не из std::sync.

Закрепление (pinning)

Чтобы опросить futures, они должны быть закреплены с помощью специального типа под названием Pin<T>. Если Вы прочитаете описание типажа Future в предыдущем разделе "выполнение Futures и задач", вы узнаете о Pin из self: Pin<&mut Self> в методеFuture:poll. Но что это значит, и зачем нам это нужно?

Для чего перемещение

Закрепление даёт гарантию того, что объект не будет перемещён. Чтобы понять почему это важно, нам надо помнить как работает async/.await. Рассмотрим следующий код:


# #![allow(unused_variables)]
#fn main() {
let fut_one = ...;
let fut_two = ...;
async move {
    fut_one.await;
    fut_two.await;
}
#}

Под капотом, он создаёт два анонимных типа, которые реализуют типаж Future, предоставляющий метод poll, выглядящий примерно так:


# #![allow(unused_variables)]
#fn main() {
// Тип `Future`, созданный нашим `async { ... }` блоком
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// Список возможных состояний нашего `async` блока
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}
#}

Когда poll вызывается первый раз, он опрашивает fut_one. Если fut_one не завершена, возвращается AsyncFuture::poll. Следующие вызовы poll будут начинаться там, где завершился предыдущий вызов. Этот процесс продолжается до тех пор, пока future не сможет завершиться.

Однако, что будет, если async блок использует ссылки? Например:


# #![allow(unused_variables)]
#fn main() {
async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}
#}

Во что скомпилируется эта структура?


# #![allow(unused_variables)]
#fn main() {
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // указывает на `x` далее
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'?>, // какое тут время жизни?
}
#}

Здесь future ReadIntoBuf содержит ссылку на другое поле нашей структуры, x. Однако, если AsyncFuture будет перемещена, положение x тоже будет изменено, что сделает указатель, сохранённый в read_into_buf_fut.buf, недействительным.

Закрепление future в определённом месте памяти, предотвращает эту проблему, делая безопасным создание ссылок на данные за пределами async блока.

Как использовать закрепление

Тип Pin оборачивает указатель на другие типы, гарантируя, что значение за указателем не будет перемещено. Например, Pin<&mut T>, Pin<&T>, Pin<Box<T>> - все гарантируют, что положение T останется неизменным.

У большинства типов нет проблем с перемещением. Эти типы реализуют типаж Unpin. Указатели на Unpin-типы могут свободно помещаться в Pin или извлекаться из него. Например, тип u8 реализует Unpin, таким образом Pin<&mut T> ведёт себя также, как и &mut T.

Некоторые функции требуют, чтобы future, с которыми они работают, были Unpin. Для использования Future или Stream, не реализующего Unpin, с функцией, требующей Unpin-типа, вы сначала должны закрепить значение при помощи Box::pin (для создания Pin<Box<T>>) или макроса pin_utils::pin_mut! (для создания Pin<&mut T>). Оба Pin<Box<Fut>> и Pin<&mut Fut> могут использоваться как future, и оба реализуют Unpin.

Например:


# #![allow(unused_variables)]
#fn main() {
use pin_utils::pin_mut; // `pin_utils` - удобный пакет, доступный на crates.io

// Функция, принимающая `Future`, которая реализует `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }

let fut = async { ... };
execute_unpin_future(fut); // Ошибка: `fut` не реализует типаж `Unpin`

// Закрепление с помощью `Box`:
let fut = async { ... };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Закрепление с помощью `pin_mut!`:
let fut = async { ... };
pin_mut!(fut);
execute_unpin_future(fut); // OK
#}

Типаж Stream

Типаж Stream похож на Future, но может давать несколько значений до завершения, а также похож на типаж Iterator из стандартной библиотеки:


# #![allow(unused_variables)]
#fn main() {
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
#}

Одним из распространённых примеров Stream является Receiver для типа канала из пакета futures. Это даёт Some(val) каждый раз, когда значение отправляется от Sender, и даст None после того, как Sender был удалён из памяти и все ожидающие сообщения были получены:


# #![allow(unused_variables)]
#fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
#}

Итерирование и параллелизм

Подобно синхронным итераторам, существует множество различных способов итерации и обработки значений в Stream. Существуют методы комбинаторного стиля например, map, filter и fold и их раннего выхода из-за ошибки try_map, try_filter и try_fold.

К сожалению, цикл for не может использоваться со Stream, но для императивного стиля написания кода, while let и функции next/try_next могут быть использованы:


# #![allow(unused_variables)]
#fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
#}

Однако, если мы просто обрабатываем один элемент за раз, мы потенциально оставляем возможность для параллелизма, который, в конце концов, стоит на первом месте при написании асинхронного кода. Для обработки нескольких элементов из потока одновременно, используйте методы for_each_concurrent и try_for_each_concurrent:


# #![allow(unused_variables)]
#fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
#}

Одновременное выполнение нескольких Future

До этого времени, мы в основном выполняли future используя .await, который блокирует текущую задачу до тех пор, пока отдельная Future не завершится. Однако, настоящие асинхронные приложения чаще всего должны выполнять несколько различных операций одновременно.

В этой главе мы рассмотрим разные способы одновременного выполнения нескольких асинхронных операций:

  • join!: ждёт завершения всех futures
  • select!: ждёт завершения одной из future
  • Порождение: создание задач верхнего уровня, которые запускают future до их завершения
  • FuturesUnordered: группа future, которые возвращают результат каждой subfuture

join!

Макрос futures::join позволяет дождаться завершения нескольких разных futures при одновременном их выполнении.

При выполнении нескольких асинхронных операций возникает соблазн просто сделать несколько .await последовательно:


# #![allow(unused_variables)]
#fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
#}

Однако это будет медленнее, чем необходимо, так как он не начнёт пытаться выполнять get_music до завершения get_book. В некоторых других языках, futures выполняются до завершения, поэтому две операции могут быть запущены одновременно сначала вызовом для каждой async fn, чтобы их запустить, а потом ожиданием обеих:


# #![allow(unused_variables)]
#fn main() {
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
#}

Однако futures на Rust не будут работать, пока для них не будет вызван .await. Это означает, что оба приведённых выше фрагмента кода запустят book_future и music_future последовательно, вместо того, чтобы запустить их одновременно. Чтобы правильно запустить две futures одновременно, используйте futures::join!:


# #![allow(unused_variables)]
#fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
#}

Значение, возвращаемое join! - это кортеж, содержащий выходные данные каждой из переданных Future.

try_join!

Для futures, которые возвращают Result, рассмотрите возможность использования try_join! а не join!. Так как join! завершается только после завершения всех subfutures, он будет продолжать обрабатывать другие futures даже после одного из своих subfutures или вернёт ошибку Err.

В отличие отjoin!, try_join! завершится немедленно, если одна из subfutures вернёт ошибку.


# #![allow(unused_variables)]
#fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
#}

Обратите внимание, что все futures, переданные в try_join!, должны иметь один и тот же тип ошибки. Рассмотрите возможность использования функций .map_err(|e| ...) и .err_into() из futures::future::TryFutureExt для консолидации типов ошибок:


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
#}

select!

Макрос futures::select запускает несколько future одновременно, позволяя пользователю ответить как только любая из future завершится.


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
#}

Функция выше запустит обе t1 и t2 параллельно. Когда t1 или t2 закончится, соответствующий дескриптор вызовет println! и функция завершится без выполнения оставшейся задачи.

Базовый синтаксис для select: <pattern> = <expression> => <code>,, повторяемый столько раз, из скольких future вам надо сделать select.

default => ... и complete => ...

Также select поддерживает ветки default и complete.

Ветка default выполнится, если ни одна из future, переданная в select, не завершится. Поэтому, select с веткой default, всегда будет незамедлительно завершаться, так как default будет запущена, когда ещё ни одна future не готова.

Ветка complete может быть использована для обработки случая, когда все futures, бывшие в select, завершились и уже не могут прогрессировать дальше. Это бывает удобно, при использовании select! в цикле.


# #![allow(unused_variables)]
#fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
#}

Взаимодействие с Unpin и FusedFuture

Одна вещь, на которую вы могли обратить внимание в первом примере, это то, что мы вызвали .fuse() на futures, возвращённых из двух async fn, а потом закрепили их с помощью pin_mut. Оба этих вызова важны, потому что future, используемая в select, должна реализовывать и типаж Unpin, и типаж FusedFuture.

Unpin важен по той причине, что future, используемые в select, берутся не по значению, а по изменяемой ссылке. Так как владение future никому не передано, незавершённые futures могут быть снова использованы после вызова select.

Аналогично, типаж FusedFuture необходим, так как select не должен опрашивать future после их завершения. FusedFuture реализуется future, которые отслеживают, завершены ли они или нет. Это делает возможным использование select в цикле, опрашивая только future, которые до сих пор не завершились. Это можно увидеть в примере выше, где a_fut или b_fut будут завершены во второй раз за цикл. Так как future, возвращённая future::ready, реализует FusedFuture, она может сообщить select, что её не надо снова опрашивать.

Заметьте, что у stream есть соответствующий типаж FusedStream. Stream, реализующие этот типаж или имеющие обёртку, созданную .fuse(), возвращают FusedFuture из их комбинаторов .next() и .try_next().


# #![allow(unused_variables)]
#fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
#}

Распараллеливание задач в цикле с select с помощью Fuse и FuturesUnordered

Одна довольно труднодоступная, но удобная функция - Fuse::terminated(), которая позволяет создавать уже прекращённые пустые future, которые в последствии могут быть заполнены другой future, которую надо запустить.

Это может быть удобно, когда есть задача, которую надо запустить в цикле в select, но которая создана вне этого цикла.

Обратите внимание на функцию .select_next_some(). Она может использоваться с select для запуска тех ветвей, которые получили от потока Some(_), а не None.


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
#}

Когда надо одновременно запустить много копий какой-либо future, используйте тип FuturesUnordered. Следующий пример похож на один из тех, что выше, но дождётся завершения каждой выполненной копии run_on_new_num_fut, а не остановит её при создании новой. Она также отобразит значение, возвращённое run_on_new_num_fut.


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

#}