Как вернуть ошибку из FuturesUnordered? - PullRequest
0 голосов
/ 13 мая 2019

У меня есть набор фьючерсов, которые должны быть запущены параллельно, и в случае сбоя я бы хотел, чтобы ошибка возвращалась вызывающей стороне.

Вот что я тестировал до сих пор:

use futures::prelude::*;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{future, Future};

fn main() {
    let tasks: FuturesUnordered<_> = (1..10).map(|_| async_func(false)).collect();

    let mut runtime = tokio::runtime::Runtime::new().expect("Unable to start runtime");
    let res = runtime.block_on(tasks.into_future());

    if let Err(_) = res {
        println!("err");
    }
}

fn async_func(success: bool) -> impl Future<Item = (), Error = String> {
    if success {
        future::ok(())
    } else {
        future::err("Error".to_string())
    }
}

Как я могу получить ошибку от любого неудачного фьючерса? Еще лучше было бы прекратить запуск любых ожидающих фьючерсов в случае неудачи одного будущего.

1 Ответ

1 голос
/ 13 мая 2019

Ваш код уже возвращает и обрабатывает ошибку. Если вы попытались использовать ошибку, компилятор быстро направит вас к решению:

if let Err(e) = res {
    println!("err: {}", e);
}
error[E0277]: `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)` doesn't implement `std::fmt::Display`
  --> src/main.rs:12:29
   |
12 |         println!("err: {}", e);
   |                             ^ `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)` cannot be formatted with the default formatter
   |
   = help: the trait `std::fmt::Display` is not implemented for `(std::string::String, futures::stream::futures_unordered::FuturesUnordered<impl futures::future::Future>)`
   = note: in format strings you may be able to use `{:?}` (or {:#?} for pretty-print) instead
   = note: required by `std::fmt::Display::fmt`

Значение Err - это кортеж вашей ошибки, и исходный поток будет продолжать вытягивать после того, как вы исправите ошибку. Это то, что Stream::into_future / StreamFuture делает.

Получите доступ к первому значению в кортеже, чтобы получить ошибку:

if let Err((e, _)) = res {
    println!("err: {}", e);
}

Если вы хотите увидеть все значения, вы можете продолжать опрашивать поток снова и снова (но не делайте этого, потому что он, вероятно, неэффективен):

let mut f = tasks.into_future();
loop {
    match runtime.block_on(f) {
        Ok((None, _)) => {
            println!("Stream complete");
            break;
        }
        Ok((Some(v), next)) => {
            println!("Success: {:?}", v);
            f = next.into_future();
        }
        Err((e, next)) => {
            println!("Error: {:?}", e);
            f = next.into_future();
        }
    }
}
...