Rust async-await: проверить, разрешается ли какое-либо будущее в списке одновременно? - PullRequest
1 голос
/ 04 октября 2019

Я пытаюсь запустить список фьючерсов одновременно (а не по порядку) в Rust async-await (который скоро стабилизируется), пока какой-либо из них не разрешится до true.

Представьте, что у вас естьVec<File> и будущее для каждого файла, дающего bool (может быть неупорядоченным). Вот простая последовательная реализация.

async fn my_function(files: Vec<File>) -> bool {
    // Run the future on each file, return early if we received true
    for file in files {
        if long_future(file).await {
            return true;
        }
    }

    false
}

async fn long_future(file: File) -> bool {
    // Some long-running task here...
}

Это работает, но я бы хотел запустить несколько из этих вариантов одновременно, чтобы ускорить процесс. Я сталкивался с buffer_unordered() (на Stream), но не мог понять, как это реализовать.

Насколько я понимаю, что-то вроде join также можно использовать для одновременного запуска фьючерсов, учитывая, что вы дали многопоточный пул. Но я не понимаю, как это можно эффективно использовать здесь.

Я пытался сделать что-то подобное, но не смог заставить его работать:

let any_true = futures::stream::iter(files)
    .buffer_unordered(4) // Run up to 4 concurrently
    .map(|file| long_future(file).await)
    .filter(|stop| stop) // Only propagate true values
    .next() // Return early on first true
    .is_some();

Наряду с этим, я 'я ищу что-то вроде any, используемое в итераторах, чтобы заменить оператор if или комбинацию filter().next().is_some().

Как мне поступить?

1 Ответ

2 голосов
/ 04 октября 2019

Я думаю, что вы должны быть в состоянии использовать select_ok, как упомянуто Some Guy. Пример, в котором я заменил файлы набором u32 для иллюстрации:

use futures::future::FutureExt;

async fn long_future(file: u32) -> bool {
    true
}

async fn handle_file(file: u32) -> Result<(), ()> {
    let should_stop = long_future(file).await;
    // Would be better if there were something more descriptive here
    if should_stop {
        Ok(())
    } else {
        Err(())
    }
}

async fn tims_answer(files: Vec<u32>) -> bool {
    let waits = files.into_iter().map(|f| handle_file(f).boxed());

    let any_true = futures::future::select_ok(waits).await.is_ok();

    any_true
}
...