Основная проблема заключается в том, что управление состоянием отсутствует.Вы создаете новое будущее Delay
каждый раз, когда поток опрашивается, вместо того, чтобы удерживать его до тех пор, пока оно не будет разрешено.Это приведет к тому, что вы никогда не увидите никаких элементов, выходящих из потока, так как эти фьючерсы опрашиваются только один раз, вероятно, каждый раз получая NotReady
.
Вам необходимо отслеживать будущее задержки в вашем типе SomeStream
.В этом случае можно использовать опцию, чтобы также определить, нужно ли нам создавать новую задержку.
#[derive(Debug, Default)]
struct SomeStream {
delay: Option<Delay>,
}
Последующий код для SomeStream::poll
, с лучшей обработкой ошибок и более идиоматическими конструкциями,может выглядеть примерно так:
impl Stream for SomeStream {
type Item = String;
type Error = Box<dyn std::error::Error + Send + Sync>; // generic error
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
match delay.poll() {
Ok(Async::Ready(value)) => {
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
}
Или, что еще лучше, с помощью макроса try_ready!
, который возвращает ошибки и сигналы NotReady
с меньшим количеством шаблонов.
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let delay = self.delay.get_or_insert_with(|| {
let when = Instant::now() + Duration::from_millis(1000);
Delay::new(when)
});
try_ready!(delay.poll());
// tick!
self.delay = None;
Ok(Async::Ready(Some("Hello".to_string())))
}
( Детская площадка )