Выпускать комбинацию потоков, когда один из нижележащих потоков исчерпан - PullRequest
0 голосов
/ 14 декабря 2018

Если я хочу объединить несколько однотипных потоков в один, я бы использовал Stream::select:

let combined = first_stream.select(second_stream)

Однако, когда один из потоков исчерпан, другойвсе еще может дать результаты для объединенного потока.Что я могу использовать, чтобы исчерпать объединенный поток, когда один из нижележащих потоков исчерпан?

1 Ответ

0 голосов
/ 14 декабря 2018

Напишите свой собственный потоковый комбинатор:

use futures::{Async, Poll, Stream}; // 0.1.25

struct WhileBoth<S1, S2>(S1, S2)
where
    S1: Stream,
    S2: Stream<Item = S1::Item, Error = S1::Error>;

impl<S1, S2> Stream for WhileBoth<S1, S2>
where
    S1: Stream,
    S2: Stream<Item = S1::Item, Error = S1::Error>,
{
    type Item = S1::Item;
    type Error = S1::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.0.poll() {
            // Return errors or ready values (including the `None`
            // that indicates the stream is empty) immediately.
            r @ Err(_) | r @ Ok(Async::Ready(_)) => r,
            // If the first stream is not ready, try the second one.
            Ok(Async::NotReady) => self.1.poll(),
        }
    }
}

См. Также:

...