Есть ли способ отключить `tokio :: runtime :: current_thread :: Runtime`? - PullRequest
0 голосов
/ 13 декабря 2018

Я использую tokio::runtime::current_thread::Runtime и хочу иметь возможность запустить будущее и остановить реактор в том же потоке.Пример на странице не показывает, как остановить время выполнения.Есть ли способ, которым я могу это сделать?

1 Ответ

0 голосов
/ 10 февраля 2019

Среда выполнения автоматически отключится, когда по завершении будущего, если вы используете block_on:

use std::time::{Duration, Instant};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15

fn main() {
    let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");

    let two_seconds_later = Instant::now() + Duration::from_secs(2);

    runtime
        .block_on({
            Delay::new(two_seconds_later)
                .inspect(|_| eprintln!("future complete"))
        })
        .expect("Unable to run future");
}

Если вам нужно отменить будущее, вы можете создать что-то, что вызовет будущее pollчтобы добиться успеха.Вот очень простая (и, вероятно, не очень производительная) версия такой оболочки:

use std::{
    sync::{Arc, Mutex},
    thread,
    time::{Duration, Instant},
};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15

fn main() {
    let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");

    let a_long_time = Instant::now() + Duration::from_secs(3600);
    let future = Delay::new(a_long_time).inspect(|_| eprintln!("future complete"));
    let (future, cancel) = Cancelable::new(future);

    let another_thread = thread::spawn(|| {
        eprintln!("Another thread started");
        thread::sleep(Duration::from_secs(2));
        eprintln!("Another thread canceling the future");
        cancel();
        eprintln!("Another thread exiting");
    });

    runtime.block_on(future).expect("Unable to run future");

    another_thread.join().expect("The other thread panicked");
}

#[derive(Debug)]
struct Cancelable<F> {
    inner: F,
    info: Arc<Mutex<CancelInfo>>,
}

#[derive(Debug, Default)]
struct CancelInfo {
    cancelled: bool,
    task: Option<task::Task>,
}

impl<F> Cancelable<F> {
    fn new(inner: F) -> (Self, impl FnOnce()) {
        let info = Arc::new(Mutex::new(CancelInfo::default()));
        let cancel = {
            let info = info.clone();
            move || {
                let mut info = info.lock().unwrap();
                info.cancelled = true;
                if let Some(task) = &info.task {
                    task.notify();
                }
            }
        };
        let me = Cancelable { inner, info };
        (me, cancel)
    }
}

impl<F> Future for Cancelable<F>
where
    F: Future<Item = ()>,
{
    type Item = F::Item;
    type Error = F::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let mut info = self.info.lock().unwrap();

        if info.cancelled {
            Ok(Async::Ready(()))
        } else {
            let r = self.inner.poll();

            if let Ok(Async::NotReady) = r {
                info.task = Some(task::current());
            }

            r
        }
    }
}
...