Как написать len методы для фьючерсов :: Stream? - PullRequest
0 голосов
/ 25 января 2019

Вот код, который я пробовал (использовал fold() для реализации len() для потоков, но есть некоторые странные ошибки типа, которые я не могу понять (do(row: Row) возвращает Result<(), Error>):

let rows_count = rows
    .for_each(|row| parse_row(&row))
    .fold(0, |acc, a| futures::future::ok(acc + 1))
    .wait()
    .unwrap();

Спасибо!

1 Ответ

0 голосов
/ 25 января 2019

По умолчанию fold при Streams возвращает Future. Если вы хотите заблокировать Stream и получить необходимое вам количество элементов wait.

Кроме того, если какой-либо элемент в потоке является ошибкой, он будет паниковать.

Возможная реализация блокировки:

use futures::stream::Stream;
use futures::future::{Future, ok};

trait StreamExt {
    fn len(self) -> usize;
}

impl<T: Stream> StreamExt for T {
    fn len(self) -> usize {
        self.fold(0, |a, _| ok(a + 1))
            .wait()
            .map_err(|_| ())
            .unwrap()
    }
}

Детская площадка

...