Как преобразовать итератор в поток при успешном завершении или пустой поток при ошибке? - PullRequest
0 голосов
/ 30 июня 2018

Я хотел бы взять обычный итератор и превратить его в поток , чтобы я мог выполнять дальнейшую обработку потока. Проблема в том, что у меня может быть итератор или ошибка, чтобы иметь дело с. Я думаю, что я довольно близок с этим:

#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;

use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};

fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
            Some(a) => Some(future::ok((a, iter))),
            None => None,
        }),
        Err(e) => {
            error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
            stream::empty()
        }
    }
}

fn main() {
    let task = resolve("1.2.3.4:12345")
        .map_err(|e| error!("{:?}", e))
        .for_each(|addr| info!("{:?}", addr))
        .fold();
    tokio::run(task);
}

площадка

error[E0308]: match arms have incompatible types
  --> src/main.rs:12:5
   |
12 | /     match addrs.to_socket_addrs() {
13 | |         Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | |             Some(a) => Some(future::ok((a, iter))),
15 | |             None => None,
...  |
20 | |         }
21 | |     }
   | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
   |
   = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
              found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
  --> src/main.rs:17:19
   |
17 |           Err(e) => {
   |  ___________________^
18 | |             error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | |             stream::empty()
20 | |         }
   | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:27:10
   |
27 |         .for_each(|addr| info!("{:?}", addr))
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
  --> src/main.rs:28:10
   |
28 |         .fold();
   |          ^^^^
   |
   = note: the method `fold` exists but the following trait bounds were not satisfied:
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`

Намек довольно очевиден. Два Result s, которые я возвращаю из match, отличаются и должны быть одинаковыми. Теперь, как я могу сделать это, чтобы я возвращал поток?

1 Ответ

0 голосов
/ 30 июня 2018

Rust - это статически типизированный язык, который означает, что возвращаемый тип функции должен быть одного типа, известного во время компиляции. Вы пытаетесь вернуть несколько типов, определенных во время выполнения.

Самое близкое решение к вашему оригиналу - всегда возвращать поток Unfold:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::unfold(addrs.to_socket_addrs(), |r| {
        match r {
            Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
            Err(_) => None,
        }
    })
}

Но зачем изобретать велосипед?

futures::stream::iter_ok

Преобразует Iterator в Stream, который всегда готов выдать следующее значение.

Последующие версии ящика для фьючерсов реализуют Stream для Either, что делает его очень элегантным:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::iter_ok(iter).left_stream(),
        Err(_) => stream::empty().right_stream(),
    }
}

Просто перенести эту функцию в фьючерсы 0.1 (возможно, кто-то должен представить ее в качестве пиара для тех, кто застрял на 0.1 ...):

enum MyEither<L, R> {
    Left(L),
    Right(R),
}

impl<L, R> Stream for MyEither<L, R>
where
    L: Stream,
    R: Stream<Item = L::Item, Error = L::Error>,
{
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self {
            MyEither::Left(l) => l.poll(),
            MyEither::Right(r) => r.poll(),
        }
    }
}

trait EitherStreamExt {
    fn left_stream<R>(self) -> MyEither<Self, R>
    where
        Self: Sized;
    fn right_stream<L>(self) -> MyEither<L, Self>
    where
        Self: Sized;
}

impl<S: Stream> EitherStreamExt for S {
    fn left_stream<R>(self) -> MyEither<Self, R> {
        MyEither::Left(self)
    }
    fn right_stream<L>(self) -> MyEither<L, Self> {
        MyEither::Right(self)
    }
}

Еще лучше, используйте тот факт, что Result является итератором и Stream::flatten существует:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::iter_ok(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .flatten()
}

Или, если вы действительно хотите напечатать ошибки:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::once(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .map_err(|e| eprintln!("err: {}", e))
        .flatten()
}

Смотри также:

...