Как конвертировать будущее в поток? - PullRequest
0 голосов
/ 30 января 2020

Я пытаюсь использовать async_std для получения дейтаграмм UDP из сети.

Существует UdpSocket, который реализует async recv_from, этот метод возвращает будущее, но Мне нужен async_std::stream::Stream, который дает поток дейтаграмм UDP, потому что это лучшая абстракция.

Я нашел tokio::net::UdpFramed, который делает именно то, что мне нужно, но он недоступен в текущих версиях Tokio.

В общем вопрос в том, как мне преобразовать Future s из заданной асинхронной c функции в Stream?

Ответы [ 2 ]

1 голос
/ 30 января 2020

Вообще говоря, вопрос в том, как преобразовать Futures из заданной асинхронной c функции в Stream?

Существует FutureExt::into_stream, но не позволяйте имени обмануть вас; это не очень подходит для вашей ситуации.

Существует UdpSocket, который реализует asyn c recv_from, этот метод возвращает будущее, но мне нужен async_std::stream::Stream, который дает поток дейтаграмм UDP, потому что это лучшая абстракция.

Это не , здесь обязательно лучшая абстракция.

В частности, async-std UdpSocket::recv_from возвращает будущее, которое имеет тип вывода (usize, SocketAddr) - размер полученных данных и адрес партнера. Если бы вы использовали into_stream для преобразования его в поток, это дало бы вам именно это, а не полученные данные.

Я нашел tokio::net::UdpFramed, который делает именно то, что мне нужно, но он недоступен в текущих версиях Tokio.

Он был перемещен в tokio-util ящик. К сожалению, вы не можете (легко) использовать это либо. Для этого требуется tokio::net::UdpSocket, который не совпадает с async_std::net::UdpSocket.

Вы, конечно, можете использовать функции утилит фьючерсов, такие как futures::stream::poll_fn или futures::stream::unfold, чтобы придать UdpSocket::recv_from futures::stream::Stream фасад, но что тогда вы будете делать с этим? Если вы в конечном итоге вызовете StreamExt::next, чтобы получить из него значение, вы могли бы использовать recv_from напрямую.

Необходимо достичь значения Stream, если какой-то API Для использования требуется ввод Stream, например rusoto:

1 голос
/ 30 января 2020

Для одного элемента используйте FutureExt::into_stream:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

Для потока из числа фьючерсов, сгенерированных замыканием, используйте stream::unfold:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

В вашем случае вы можете использовать stream::unfold:

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}
...