время жизни около асин c и поток - PullRequest
1 голос
/ 03 мая 2020

Я пытаюсь создать функцию, которая потребляет Stream, и обрезать ее при последовательных сбоях max_consecutive_fails. Однако, вещи не работали хорошо (E0495). Я изменил Stream s на Iterator s (и удалил async s), и это просто сработало. Почему это происходит? Как я могу рефакторинг этого кода (для работы)?

use futures::stream::Stream;
pub fn max_fail<'a, T>(stream : impl Stream<Item = Option<T>> +'a , max_consecutive_fails: usize) -> impl Stream +'a where T : 'a
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| async {
        if x.is_some(){
            consecutive_fails = 0;
            true
        }
        else{
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        }
    })
}

Ниже приведен свернутый пример, который я попытался указать, в чем проблема, но все еще не смог понять сообщение об ошибке rust c.

use futures::stream::Stream;
pub fn minified_example<'a>(stream: impl Stream<Item = bool> + 'a) -> impl Stream + 'a
{
    use futures::stream::StreamExt;
    stream.take_while( |x| async { *x })
}

Ответы [ 2 ]

1 голос
/ 03 мая 2020
Блоки

Asyn c (async { ... }) похожи на замыкания в том, как они захватывают свою среду. По умолчанию каждое использование переменной из другой области видимости осуществляется по ссылке, что означает, что impl core::future::Future, созданный блоком, не может пережить захваченные им переменные.

Вам необходимо переместить x в блок с async move { ... } (так же, как с замыканиями)

0 голосов
/ 04 мая 2020

Таким образом, Future захватывает переменную, и компилятор не достаточно умен, чтобы удалить ненужные захваты, и что нужно сделать, это явно распутать захваты с отдельным асиновым c блоком.

use futures::stream::Stream;
pub fn max_fail<'a, T>(
    stream: impl Stream<Item = Option<T>> + 'a,
    max_consecutive_fails: usize,
) -> impl Stream + 'a
where
    T: 'a,
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| {
        let t = if x.is_some() {
            consecutive_fails = 0;
            true
        } else {
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        };
        return async move { t };
    })
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...