Возможно, это не совсем то, что вы хотели, но это похоже на то, что вы просили.
use {
futures::{
prelude::*,
Async, Poll,
stream::{
Stream, Fuse,
},
sync::mpsc,
},
std::{
thread,
time::{Duration, Instant},
}
};
#[derive(Debug)]
enum Event<T> {
Short(T),
Long(T),
}
#[derive(Debug)]
struct Debounce<S: Stream> {
stream: Fuse<S>,
debouncetime: Duration,
timestamp: Instant,
last: S::Item,
emitted: bool,
}
impl<S> Debounce<S>
where S: Stream, S::Item: Copy + Default + PartialEq {
fn new(s: S) -> Self {
Self{
stream: s.fuse(),
debouncetime: Duration::from_millis(2300),
timestamp: Instant::now(),
last: Default::default(),
emitted: true,
}
}
}
impl<S> Stream for Debounce<S>
where S: Stream, S::Item: Copy + Default + PartialEq
{
type Item = Event<S::Item>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Event<S::Item>>, S::Error> {
if !self.emitted && self.timestamp.elapsed() >= self.debouncetime {
self.emitted = true;
return Ok(Some(Event::Long(self.last)).into())
}
match self.stream.poll()? {
Async::Ready(Some(item)) => {
if item != self.last {
self.last = item;
self.timestamp = Instant::now();
self.emitted = false;
}
Ok(Some(Event::Short(item)).into())
}
Async::Ready(None) => Ok(None.into()),
Async::NotReady => Ok(Async::NotReady)
}
}
}
fn main() {
let (mut tx, rx) = mpsc::channel(1);
thread::spawn(move || {
for i in vec![1, 2, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7] {
tx = tx.send(i).wait().unwrap();
thread::sleep(Duration::from_secs(1));
}
});
let result = Debounce::new(rx).collect().wait();
dbg!(result.unwrap());
}