Как я могу разделить, отменить и присоединиться к потокам? - PullRequest
3 голосов
/ 01 ноября 2019

Я немного изо всех сил пытаюсь понять инфраструктуру вокруг futures::streams.

Предположим, у меня есть поток u8 предметов. Я хотел бы преобразовать его в поток Event элементов, в котором:

enum Event {
  Short(u8),
  Long(u8),
}

Мой результирующий поток должен выдавать Event::Short(x) всякий раз, когда основной поток испускает x,Он должен также излучать Event::Long(x) всякий раз, когда базовый поток не изменяет свое значение x в течение N секунд.

Концептуальные шаги будут:

  1. Разделить поток на поток a и b
  2. Применить отладку к потоку b, заставляя его излучать x, только когда x не изменился в течение N секунд.
  3. Присоединяйтесь к двум потокам.

Я полагаю, что присоединение будет обработано select. Но как я могу разделить исходный поток на два, учитывая, что его элементы Copy? И есть ли такая утилита, как механизм устранения неполадок, в futures или tokio?

1 Ответ

0 голосов
/ 03 ноября 2019

Возможно, это не совсем то, что вы хотели, но это похоже на то, что вы просили.

use {
    futures::{
        prelude::*,
        Async, Poll,
        stream::{
            Stream, Fuse,
        },
        sync::mpsc,
    },
    std::{
        thread,
        time::{Duration, Instant},
    }
};

#[derive(Debug)]
enum Event<T> {
    Short(T),
    Long(T),
}


#[derive(Debug)]
struct Debounce<S: Stream> {
    stream: Fuse<S>,
    debouncetime: Duration,
    timestamp: Instant,
    last: S::Item,
    emitted: bool,
}

impl<S> Debounce<S>
    where S: Stream, S::Item: Copy + Default + PartialEq {
    fn new(s: S) -> Self {
        Self{
            stream: s.fuse(),
            debouncetime: Duration::from_millis(2300),
            timestamp: Instant::now(),
            last: Default::default(),
            emitted: true,
        }
    }
}

impl<S> Stream for Debounce<S>
    where S: Stream, S::Item: Copy + Default + PartialEq
{
    type Item = Event<S::Item>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Event<S::Item>>, S::Error> {
        if !self.emitted && self.timestamp.elapsed() >= self.debouncetime {
            self.emitted = true;
            return Ok(Some(Event::Long(self.last)).into())
        }

        match self.stream.poll()? {
            Async::Ready(Some(item)) => {
                if item != self.last {
                    self.last = item;
                    self.timestamp = Instant::now();
                    self.emitted = false;
                }

                Ok(Some(Event::Short(item)).into())
            }
            Async::Ready(None) => Ok(None.into()),
            Async::NotReady => Ok(Async::NotReady)
        }
    }
}

fn main() {
    let (mut tx, rx) = mpsc::channel(1);

    thread::spawn(move || {
        for i in vec![1, 2, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7] {
            tx = tx.send(i).wait().unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    let result = Debounce::new(rx).collect().wait();

    dbg!(result.unwrap());
}
...