select!
futures::select 巨集可以同時執行多個 future,並在任何一個 future 完成是迅速回應使用者。
# #![allow(unused_variables)] #fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } #}
上面這個函式將並行執行 t1 與 t2。當 t1 或 t2 其中一個完成了,對應的處理程序就會呼叫 println!,這個函式就不會完成剩下的任務,而會直接結束。
select 基本的語法是 <pattern> = <expression> => <code>,你想要從多少個 future 中做 select 就重複多少次。
default => ... 與 complete => ...
select 也支援 default 與 complete 的流程分支。
default 分支會在所有被 select 的 future 都尚未完成時。一個包含 default 分支的 select 總是會立刻返回,因為開始執行時不會有任何 future 準備就緒。
complete 分支則負責處理這個狀況:當所有被 select 的 future 都完成且無法再有更多進展時。這有個不斷循環 select 的小撇步。
# #![allow(unused_variables)] #fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } #}
與 Unpin 及 FusedFuture 的互動
你可能會發現,第一個例子中我們會對兩個 async fn 回傳的 future 呼叫 .fuse(),且用了 pin_mut 來固定它們。這兩個都是必要操作,因為在 select 中的 future 必須實作 Unpin trait 與 FusedFuture trait。
Unpin 必要的原因是用在 select 的 future 不是直接取其值,而是取其可變引用(mutable reference)。由於沒有取走所有權,尚未完成的 future 可以在 select 之後再度使用。
FusedFuturue 列為必須是因為當一個 future 完成時,select 就不該再度輪詢它。FusedFuture 的原理是追蹤其 future 是否已完成。實作了 FusedFuture 使得 select 可以用在迴圈中,只輪詢尚未完成的 future。這可以從上面的範例中看到,a_fut 或 b_fut 將會在第二次迴圈迭代中完成,由於 future::ready 有實作 FusedFuture,所以可以告知 select 不要再輪詢它。
注意,stream 也有對應的 FusedStream trait。有實作這個 trait 或使用 .fuse() 封裝的 stream,都會從它們的組合子 .next() / .try_next() 產生 FusedFuture。
# #![allow(unused_variables)] #fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } #}
在 select 迴圈中使用 Fuse 與 FuturesUnordered 的並行任務
有個難以發現但很方便的函式是 Fuse::terminated(),它可以建立一個空的且已經終止的 future,並在往後填上實際需要執行的 future。
Fuse::termianted() 在遇到需要在 select 迴圈內執行一個任務,但這任務卻在在該迴圈內才產生的情況,顯得十分方便順手。
請注意使用 .select_next_some() 函式。這個函式可以與 select 配合,只執行會從 steam 產生 Some(_) 的分支,而忽略產生 None 的分支。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}
當同一個 future 的多個副本需要同時執行,請使用 FuturesUnordered 型別。接下來的範例和上一個很相似,但會執行每個 run_on_new_num_fut 的副本到其完成,而非當新的產生就中止舊的。它同時會印出 run_on_new_num_fut 的返回值。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}