Почему эта задержка в будущем poll () не работает в моем пользовательском типе потока? - PullRequest
1 голос
/ 10 апреля 2019

Я хочу печатать «Hello» раз в секунду.

Цитирование документа:

Фьючерсы используют модель, основанную на опросе.Потребитель будущего неоднократно вызывает функцию опроса.Будущее затем пытается завершить.Если будущее может завершиться, оно возвращает Async :: Ready (значение).Если будущее не может быть завершено из-за блокировки внутреннего ресурса (например, сокета TCP), он возвращает Async :: NotReady.

Функция My poll возвращает NotReady if Delay s возвращает NotReady, но на стандартный вывод ничего не выводится.

use futures::{Async, Future, Stream}; // 0.1.25
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.15

struct SomeStream;

impl Stream for SomeStream {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let when = Instant::now() + Duration::from_millis(1000);
        let mut task = Delay::new(when).map_err(|e| eprintln!("{:?}", e));
        match task.poll() {
            Ok(Async::Ready(value)) => {}
            Ok(Async::NotReady) => return Ok(Async::NotReady),
            Err(err) => return Err(()),
        }
        Ok(Async::Ready(Some("Hello".to_string())))
    }
}

fn main() {
    let s = SomeStream;
    let future = s
        .for_each(|item| {
            println!("{:?}", item);
            Ok(())
        })
        .map_err(|e| {});
    tokio::run(future);
}

1 Ответ

2 голосов
/ 10 апреля 2019

Основная проблема заключается в том, что управление состоянием отсутствует.Вы создаете новое будущее Delay каждый раз, когда поток опрашивается, вместо того, чтобы удерживать его до тех пор, пока оно не будет разрешено.Это приведет к тому, что вы никогда не увидите никаких элементов, выходящих из потока, так как эти фьючерсы опрашиваются только один раз, вероятно, каждый раз получая NotReady.

Вам необходимо отслеживать будущее задержки в вашем типе SomeStream.В этом случае можно использовать опцию, чтобы также определить, нужно ли нам создавать новую задержку.

#[derive(Debug, Default)]
struct SomeStream {
    delay: Option<Delay>,
}

Последующий код для SomeStream::poll, с лучшей обработкой ошибок и более идиоматическими конструкциями,может выглядеть примерно так:

impl Stream for SomeStream {
    type Item = String;
    type Error = Box<dyn std::error::Error + Send + Sync>; // generic error

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        let delay = self.delay.get_or_insert_with(|| {
            let when = Instant::now() + Duration::from_millis(1000);
            Delay::new(when)
        });

        match delay.poll() {
            Ok(Async::Ready(value)) => {
                self.delay = None;
                Ok(Async::Ready(Some("Hello".to_string())))
            },
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => Err(err.into()),
        }
    }
}

Или, что еще лучше, с помощью макроса try_ready!, который возвращает ошибки и сигналы NotReady с меньшим количеством шаблонов.

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
    let delay = self.delay.get_or_insert_with(|| {
        let when = Instant::now() + Duration::from_millis(1000);
        Delay::new(when)
    });

    try_ready!(delay.poll());

    // tick!
    self.delay = None;
    Ok(Async::Ready(Some("Hello".to_string())))
}

( Детская площадка )

...