Как реализовать `Future` /` Stream`, который опрашивает `asyn c fn (& mut self)`? - PullRequest
2 голосов
/ 18 апреля 2020

У меня есть следующая структура

struct Test;

impl Test {
    async fn function(&mut self) {}
}

Я хочу реализовать std::future::Future (ну, на самом деле futures::Stream, но это в основном то же самое) на Test, который будет опрашивать function. Моя первая попытка выглядела примерно так

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

Очевидно, это не сработало. Я понял, что function должен быть вызван один раз, возвращенный Future должен быть сохранен где-то в структуре, а затем сохраненное будущее должно быть опрошено. Итак, я попробовал это

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>);
impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

И, ну, не случайно это тоже не сработало

Проблема в том, что после того, как я позвонил function() - я взял &mut ссылка на Test, из-за этого я не могу изменить переменную Test, и поэтому - не могу сохранить возвращенные Future внутри Test.


Обновление 1:

Я пришел к самому проклятому и небезопасному решению, о котором я могу подумать (вдохновленный this )

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        // fuck the police
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

Я ДЕЙСТВИТЕЛЬНО надеюсь, что есть другой способ

Ответы [ 2 ]

1 голос
/ 25 апреля 2020

Хотя бывают случаи, когда вы можете захотеть сделать что-то похожее на то, что вы пытаетесь совершить sh здесь, это редкость. Поэтому большинство людей, читающих это, может быть, даже OP, могут sh реструктурировать таким образом, чтобы состояние структуры и данные, используемые для одного асинхронного c выполнения, были разными объектами.

Но, чтобы ответить на ваш вопрос, да, это несколько возможно. Если вы не хотите прибегать к небезопасному коду, вам нужно будет использовать Mutex и Arc. Все поля, которыми вы будете sh манипулировать внутри async fn function, должны быть заключены в Mutex, а сама функция примет Arc<Self>. Пример этого приведен в блоке кода ниже.

Однако я должен подчеркнуть, что это не очень хорошее решение, и вы, вероятно, не хотите этого делать. В зависимости от вашего конкретного случая c ваше решение может отличаться, но мое предположение о том, что OP пытается достичь sh при использовании Stream s, было бы лучше решено чем-то похожим на эту суть, которую я также написал: https://gist.github.com/TimLuq/83a35453405f4c6e0f63fb2a0caa9f6e.

use core::future::Future;
use core::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}
impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}
impl Future for Test {
    type Output = ();
    fn poll(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}
1 голос
/ 21 апреля 2020

Я не уверен, чего вы хотите достичь и почему, но я подозреваю, что вы пытаетесь реализовать Future for Test, основываясь на каком-то древнем учебнике или недоразумении и просто усложняя вещи.

Вы не Необходимо реализовать Future вручную. async функция

async fn function(...) {...}

- это на самом деле просто синтаксический сахар, переведенный за кулисы во что-то вроде

fn function(...) -> Future<()> {...}

Все, что вам нужно сделать, это использовать результат функции одинаково как любое будущее, например, используйте await или вызовите блокировку реактора до его завершения. Например, основываясь на вашей первой версии, вы можете просто позвонить:

let mut test = Test{};
test.function().await;

UPDATE1

Исходя из ваших описаний, я все еще думаю, что вы пытаетесь переосмыслить этот минимальный рабочий фрагмент без необходимости вручную реализовать Future для чего угодно:

async fn asyncio() { println!("Doing async IO"); }

struct Test {
    count: u32,
}

impl Test {
    async fn function(&mut self) {
        asyncio().await;
        self.count += 1;
    }
}

#[tokio::main]
async fn main() {
    let mut test = Test{count: 0};
    test.function().await;
    println!("Count: {}", test.count);
}
...