Как я могу удалить или иным образом игнорировать ошибки при обработке потока? - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть длинный список фьючерсов, которые я хотел бы запустить, используя Stream::buffer_unordered / Stream::buffered.Я объединяю этот поток в одно будущее с for_each, а затем выполняю все это с Tokio.Довольно часто один из фьючерсов возвращает ошибку.Согласно документации, for_each остановится, когда будет возвращена ошибка.

Как я могу игнорировать или просто напечатать сообщение, когда эти ошибки будут возвращены, и продолжать выполнение последующих фьючерсов?

ВотОбщий код, похожий на мою ситуацию:

use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;

fn main() {
    let queries: Vec<u32> = (0..10).collect();
    let futures = queries.into_iter().map(move |num| {
        println!("Started {}", num);
        // Maybe throw error
        let future = match num % 3 {
            0 => ok::<u32, u32>(num),
            _ => err::<u32, u32>(num)
        };
        future
    });

    let stream = stream::iter_ok(futures);
    let num_workers = 8;
    let future = stream
        .buffer_unordered(num_workers)
        .map_err(|err| {
            println!("Error on {:?}", err);
        })
        .for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        });

    tokio::runtime::run(future);
}

Rust Playground

Если вы попробуете этот пример, очередь фьючерсов прекратит работу рано, когда Errвыброшены.

1 Ответ

0 голосов
/ 08 февраля 2019
  • Stream::map_err - при наличии значений ошибок он может преобразовывать тип, но оставляет его как ошибку.

  • Stream::or_else - при наличии значений ошибки он может преобразовать ошибку в успех, оставив значения успеха без изменений.

  • Stream::then -предоставляется как со значениями успешности, так и со значениями ошибок и может делать все, что вы захотите.

Stream::map не дает вам возможности преобразовывать ошибки в успех, поэтому это бесполезно.Stream::or_else дает возможность, но она используется, когда вы можете преобразовать тип ошибки в тип успеха.Только Stream::then дает вам возможность конвертировать оба типа одновременно.

Stream::flatten можно использовать для преобразования потока потоков в один поток.

Объедините это с тем фактом, что Result можно рассматривать как итератор, и вы можете создать это:

stream
    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
    .flatten()

Независимо от того, является ли элемент потока Ok или Err, мы конвертируем его витератор и создать поток из него.Затем мы сглаживаем поток потоков.

Если вы хотите распечатать ошибки, я бы использовал Stream::inspect_err:

stream.inspect_err(|err| println!("Error on {:?}", err))

Полный код:

use futures::{
    future,
    stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14

fn main() {
    let stream = stream::iter_ok({
        (0..10).map(|num| {
            println!("Started {}", num);
            match num % 3 {
                0 => future::ok(num),
                _ => future::err(num),
            }
        })
    })
    .buffer_unordered(2);

    let stream = stream
        .inspect_err(|err| println!("Error on {:?}", err))
        .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
        .flatten();

    tokio::run({
        stream.for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        })
    });
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9
...