select!
Макрос futures::select
запускает несколько future
одновременно, позволяя пользователю ответить как только любая
из future
завершится.
# #![allow(unused_variables)] #fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } #}
Функция выше запустит обе t1
и t2
параллельно. Когда t1
или t2
закончится, соответствующий дескриптор вызовет
println!
и функция завершится без выполнения
оставшейся задачи.
Базовый синтаксис для select
: <pattern> = <expression> => <code>,
,
повторяемый столько раз, из скольких future
вам надо сделать select
.
default => ...
и complete => ...
Также select
поддерживает ветки default
и complete
.
Ветка default
выполнится, если ни одна из future
,
переданная в select
, не завершится. Поэтому,
select
с веткой default
, всегда будет
незамедлительно завершаться, так как default
будет
запущена, когда ещё ни одна future
не готова.
Ветка complete
может быть использована для
обработки случая, когда все futures
, бывшие в
select
, завершились и уже не могут прогрессировать
дальше. Это бывает удобно, при использовании select!
в цикле.
# #![allow(unused_variables)] #fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } #}
Взаимодействие с Unpin
и FusedFuture
Одна вещь, на которую вы могли обратить внимание в первом
примере, это то, что мы вызвали .fuse()
на futures
,
возвращённых из двух async fn
, а потом закрепили
их с помощью pin_mut
. Оба этих вызова важны,
потому что future
, используемая в select
, должна
реализовывать и типаж Unpin
, и типаж
FusedFuture
.
Unpin
важен по той причине, что future
, используемые в select
, берутся не по значению, а по изменяемой ссылке. Так как владение future
никому не передано, незавершённые futures
могут быть снова использованы после вызова select
.
Аналогично, типаж FusedFuture
необходим, так как select
не должен опрашивать future
после их
завершения. FusedFuture
реализуется future
, которые отслеживают, завершены ли они или нет. Это делает
возможным использование select
в цикле, опрашивая только future
, которые до сих пор не завершились.
Это можно увидеть в примере выше, где a_fut
или b_fut
будут завершены во второй раз за цикл. Так
как future
, возвращённая future::ready
, реализует FusedFuture
, она может сообщить
select
, что её не надо снова опрашивать.
Заметьте, что у stream
есть соответствующий типаж FusedStream
. Stream
, реализующие этот типаж
или имеющие обёртку, созданную .fuse()
, возвращают FusedFuture
из их комбинаторов
.next()
и .try_next()
.
# #![allow(unused_variables)] #fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } #}
Распараллеливание задач в цикле с select
с помощью Fuse
и FuturesUnordered
Одна довольно труднодоступная, но удобная функция - Fuse::terminated()
, которая позволяет создавать уже
прекращённые пустые future
, которые в последствии могут быть заполнены другой future
, которую надо запустить.
Это может быть удобно, когда есть задача, которую надо запустить в цикле в select
, но которая
создана вне этого цикла.
Обратите внимание на функцию .select_next_some()
. Она может использоваться с select
для запуска тех
ветвей, которые получили от потока Some(_)
, а не None
.
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}
Когда надо одновременно запустить много копий какой-либо future
, используйте тип FuturesUnordered
.
Следующий пример похож на один из тех, что выше, но дождётся завершения каждой выполненной копии
run_on_new_num_fut
, а не остановит её при создании новой. Она также отобразит значение, возвращённое
run_on_new_num_fut
.
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}