Как добавить специальную логику NotReady в Tokio-Io? - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь создать Stream, который будет ждать, пока определенный символ не окажется в буфере.Я знаю, что read_until() на BufRead, но на самом деле мне нужно специальное решение, так как это шаг вперед для реализации ожидания, пока в буфере не появится определенная строка (или, например, не найдено совпадение с регулярным выражением).

В моем проекте, где я впервые столкнулся с проблемой, проблема заключалась в том, что будущая обработка просто зависала, когда я получал Ready(_) из внутреннего будущего и возвращал NotReady из своей функции.Я обнаружил, что не должен этого делать за документы (последний абзац).Однако то, что я не получил, - это то, что является реальной альтернативой, которая обещана в этом параграфе.Я прочитал всю опубликованную документацию на сайте Tokio, и на данный момент для меня это не имеет смысла.

Итак, мой текущий код следующий.К сожалению, я не мог сделать его проще и меньше, так как он уже сломан.Текущий результат таков:

Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
<ad infinum>

Ожидаемый результат - получение Ok(Ready(_)) из него при печати W и W' и ожидание определенного символа в буфере.

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_io_timeout;
extern crate tokio_process;

use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io_timeout::TimeoutReader;
use tokio_process::CommandExt;

use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct Process {
    child: tokio_process::Child,
    stdout: Arc<Mutex<tokio_io_timeout::TimeoutReader<tokio_process::ChildStdout>>>,
}

impl Process {
    fn new(
        command: &str,
        reader_timeout: Option<Duration>,
        core: &tokio_core::reactor::Core,
    ) -> Self {
        let mut cmd = Command::new(command);
        let cat = cmd.stdout(Stdio::piped());
        let mut child = cat.spawn_async(&core.handle()).unwrap();

        let stdout = child.stdout().take().unwrap();
        let mut timeout_reader = TimeoutReader::new(stdout);
        timeout_reader.set_timeout(reader_timeout);
        let timeout_reader = Arc::new(Mutex::new(timeout_reader));

        Self {
            child,
            stdout: timeout_reader,
        }
    }
}

fn work() -> Result<(), ()> {
    let window = Arc::new(Mutex::new(Vec::new()));

    let mut core = Core::new().unwrap();
    let process = Process::new("cat", Some(Duration::from_secs(20)), &core);

    let mark = Arc::new(Mutex::new(b'c'));

    let read_until_stream = poll_fn({
        let window = window.clone();
        let timeout_reader = process.stdout.clone();
        move || -> Poll<Option<u8>, std::io::Error> {
            let mut buf = [0; 8];
            let poll;
            {
                let mut timeout_reader = timeout_reader.lock().unwrap();
                poll = timeout_reader.poll_read(&mut buf);
            }
            match poll {
                Ok(Async::Ready(0)) => Ok(Async::Ready(None)),
                Ok(Async::Ready(x)) => {
                    {
                        let mut window = window.lock().unwrap();
                        println!("W: {:?}", *window);
                        println!("buf: {:?}", &buf[0..x]);
                        window.extend(buf[0..x].into_iter().map(|x| *x));
                        println!("W': {:?}", *window);
                        if let Some(_) = window.iter().find(|c| **c == *mark.lock().unwrap()) {
                            Ok(Async::Ready(Some(1)))
                        } else {
                            Ok(Async::NotReady)
                        }
                    }
                }
                Ok(Async::NotReady) => Ok(Async::NotReady),
                Err(e) => Err(e),
            }
        }
    });

    let _stream_thread = thread::spawn(move || {
        for o in read_until_stream.wait() {
            println!("{:?}", o);
        }
    });

    match core.run(process.child) {
        Ok(_) => {}
        Err(e) => {
            println!("Child error: {:?}", e);
        }
    }

    Ok(())
}

fn main() {
    work().unwrap();
}

Это полный пример проекта .

1 Ответ

0 голосов
/ 17 мая 2018

Если вам нужно больше данных, вам нужно снова позвонить poll_read, пока вы не найдете то, что искали, или poll_read вернет NotReady.

Возможно, вы захотите избежать зацикливания в одной задаче дляслишком долго, так что вы можете создать себе yield_task функцию для вызова вместо нее, если poll_read не вернул NotReady;он гарантирует, что ваша задача будет вызвана снова как можно скорее после запуска других ожидающих задач.

Чтобы использовать ее, просто запустите return yield_task();.

fn yield_inner() {
    use futures::task;
    task::current().notify();
}

#[inline(always)]
pub fn yield_task<T, E>() -> Poll<T, E> {
    yield_inner();
    Ok(Async::NotReady)
}

Также см. futures-rs # 354: Обращайтесь с долго работающими, всегда готовыми фьючерсами справедливо # 354 .


С новым API асинхронного / ожидающего выхода futures::task::current больше нет;вместо этого вам понадобится ссылка std::task::Context, которая предоставляется в качестве параметра для нового метода std::future::Future::poll.

Если вы уже реализуете вручнуюstd::future::Future черта, которую вы можете просто вставить:

context.waker().wake_by_ref();
return std::task::Poll::Pending;

Или создать себе Future -всполняющий тип, который дает ровно один раз:

pub struct Yield {
    ready: bool,
}

impl core::future::Future for Yield {
    type Output = ();

    fn poll(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
        let this = self.get_mut();
        if this.ready {
            core::task::Poll::Ready(())
        } else {
            cx.waker().wake_by_ref();
            this.ready = true; // ready next round
            core::task::Poll::Pending
        }
    }
}

pub fn yield_task() -> Yield {
    Yield { ready: false }
}

А затем используйте его в async коде так:

yield_task().await;
...