Создание потока значений при вызове async fns? - PullRequest
0 голосов
/ 23 июня 2019

Я не могу понять, как заставить struct (или нет) предоставить Stream , где I await, выполняет асинхронные функции для получения данных, необходимых для значений потока.

Я пытался реализовать черту Stream напрямую, но у меня возникают проблемы, потому что я хотел бы использовать асинхронные вещи, такие как await ing, компилятор не хочет, чтобы я вызывать асинхронные функции.

Я предполагаю, что мне не хватает какой-то предыстории о том, какова цель этой библиотеки Stream, и я просто атакую ​​это неправильно, и, возможно, мне вообще не следует смотреть на Stream, но я не знаю, куда еще обратиться. Я видел другие функции в модуле Stream, которые могли бы быть полезны, но я не уверен, как я могу сохранить любое состояние и использовать эти функции.

В качестве слегка упрощенной версии моей действительной цели, предположим, что я хочу предоставить поток 64-байтовых векторов из объекта AsyncRead (т. Е. Потока tcp), но также сохранить небольшое состояние внутри любой логики, которая в итоге выдает значения для Поток, в этом примере, счетчик.

pub struct Receiver<T> where T: AsyncRead + Unpin {
    readme: T,
    num: u64,
}

// ..code for a simple `new() -> Self` function..

impl<T> Stream for Receiver<T> where T: AsyncRead + Unpin {
    type Item = Result<Vec<u8>, io::Error>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut buf : [u8; 64] = [0; 64];
        match self.readme.read_exact(&mut buf).await {
            Ok(()) => {
                self.num += 1;
                Poll::Ready(Some(Ok(buf.to_vec())))
            }
            Err(e) => Poll::Ready(Some(Err(e)))
        }
    }
}

Это не удается построить, сказав

error[E0728]: `await` is only allowed inside `async` functions and blocks

Спасибо за вашу помощь!


Edit:

Я использую rustc 1.36.0-nightly (d35181ad8 2019-05-20), и мой Cargo.toml выглядит так:

[dependencies]
futures-preview = { version = "0.3.0-alpha.16", features = ["compat", "io-compat"] }
pin-utils = "0.1.0-alpha.4"

Ответы [ 2 ]

0 голосов
/ 23 июня 2019

Вы можете сделать это с gen-stream crate:

#![feature(generators, generator_trait, gen_future)]

use {
    futures::prelude::*,
    gen_stream::{gen_await, GenTryStream},
    pin_utils::unsafe_pinned,
    std::{
        io,
        marker::PhantomData,
        pin::Pin,
        sync::{
            atomic::{AtomicU64, Ordering},
            Arc,
        },
        task::{Context, Poll},
    },
};

pub type Inner = Pin<Box<dyn Stream<Item = Result<Vec<u8>, io::Error>> + Send>>;

pub struct Receiver<T> {
    inner: Inner,
    pub num: Arc<AtomicU64>,
    _marker: PhantomData<T>,
}

impl<T> Receiver<T> {
    unsafe_pinned!(inner: Inner);
}

impl<T> From<T> for Receiver<T>
where
    T: AsyncRead + Unpin + Send + 'static,
{
    fn from(mut readme: T) -> Self {
        let num = Arc::new(AtomicU64::new(0));

        Self {
            inner: Box::pin(GenTryStream::from({
                let num = num.clone();
                static move || loop {
                    let mut buf: [u8; 64] = [0; 64];
                    match gen_await!(readme.read_exact(&mut buf)) {
                        Ok(()) => {
                            num.fetch_add(1, Ordering::Relaxed);
                            yield Poll::Ready(buf.to_vec())
                        }
                        Err(e) => return Err(e),
                    }
                }
            })),
            num,
            _marker: PhantomData,
        }
    }
}

impl<T> Stream for Receiver<T>
where
    T: AsyncRead + Unpin,
{
    type Item = Result<Vec<u8>, io::Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.inner().poll_next(cx)
    }
}
0 голосов
/ 23 июня 2019

Копия ответа / вставлена ​​из reddit сообщения пользователем Matthias247 :

К сожалению, в настоящее время это невозможно - потоки должны быть реализованырука и не может использовать асинхронный FN.Возможно ли изменить это в будущем, неясно.

Вы можете обойти это, определив другую черту Stream, которая использует Futures, например:

trait Stream<T> { 
   type NextFuture: Future<Output=T>;

   fn next(&mut self) -> Self::NextFuture; 
}

Эта статья и эта проблема с фьючерсами содержит дополнительную информацию.

...