Stream::map_err
- при наличии значений ошибок он может преобразовывать тип, но оставляет его как ошибку.
Stream::or_else
- при наличии значений ошибки он может преобразовать ошибку в успех, оставив значения успеха без изменений.
Stream::then
-предоставляется как со значениями успешности, так и со значениями ошибок и может делать все, что вы захотите.
Stream::map
не дает вам возможности преобразовывать ошибки в успех, поэтому это бесполезно.Stream::or_else
дает возможность, но она используется, когда вы можете преобразовать тип ошибки в тип успеха.Только Stream::then
дает вам возможность конвертировать оба типа одновременно.
Stream::flatten
можно использовать для преобразования потока потоков в один поток.
Объедините это с тем фактом, что Result
можно рассматривать как итератор, и вы можете создать это:
stream
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten()
Независимо от того, является ли элемент потока Ok
или Err
, мы конвертируем его витератор и создать поток из него.Затем мы сглаживаем поток потоков.
Если вы хотите распечатать ошибки, я бы использовал Stream::inspect_err
:
stream.inspect_err(|err| println!("Error on {:?}", err))
Полный код:
use futures::{
future,
stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14
fn main() {
let stream = stream::iter_ok({
(0..10).map(|num| {
println!("Started {}", num);
match num % 3 {
0 => future::ok(num),
_ => future::err(num),
}
})
})
.buffer_unordered(2);
let stream = stream
.inspect_err(|err| println!("Error on {:?}", err))
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten();
tokio::run({
stream.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
})
});
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9