Iteration and Concurrency
Similar to synchronous Iterator
s, there are many different ways to iterate
over and process the values in a Stream
. There are combinator-style methods
such as map
, filter
, and fold
, and their early-exit-on-error cousins
try_map
, try_filter
, and try_fold
.
Unfortunately, for
loops are not usable with Stream
s, but for
imperative-style code, while let
and the next
/try_next
functions can
be used:
# #![allow(unused_variables)] #fn main() { async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { use futures::stream::StreamExt; // for `next` let mut sum = 0; while let Some(item) = stream.next().await { sum += item; } sum } async fn sum_with_try_next( mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>, ) -> Result<i32, io::Error> { use futures::stream::TryStreamExt; // for `try_next` let mut sum = 0; while let Some(item) = stream.try_next().await? { sum += item; } Ok(sum) } #}
However, if we're just processing one element at a time, we're potentially
leaving behind opportunity for concurrency, which is, after all, why we're
writing async code in the first place. To process multiple items from a stream
concurrently, use the for_each_concurrent
and try_for_each_concurrent
methods:
# #![allow(unused_variables)] #fn main() { async fn jump_around( mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>, ) -> Result<(), io::Error> { use futures::stream::TryStreamExt; // for `try_for_each_concurrent` const MAX_CONCURRENT_JUMPERS: usize = 100; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } #}