Как я могу прекратить запуск синхронного кода, если в будущем его обертка будет отброшена? - PullRequest
3 голосов
/ 30 января 2020

У меня есть асинхронный код, который вызывает синхронный код, выполнение которого занимает некоторое время, поэтому я следовал советам, изложенным в Каков наилучший подход для инкапсуляции блокирующего ввода-вывода в future-rs? . Однако у моего асинхронного кода есть тайм-аут, после которого меня больше не интересует результат синхронного вычисления:

use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation() -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
    }
    sum
}

#[tokio::main]
async fn main() {
    let handle = task::spawn_blocking(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => eprintln!("Sum timed out (expected)"),
    }
}

Запуск этого кода показывает, что тайм-аут срабатывает, но синхронный код также продолжает работать:

0
1
Sum timed out (expected)
2
3
4
5
6
7
8
9

Как я могу прекратить запуск синхронного кода, когда в будущем его обертка будет отброшена?

Я не ожидаю, что компилятор волшебным образом сможет чтобы остановить мой синхронный код. Я аннотировал строку с «точкой прерывания», где мне нужно было бы вручную поставить какую-то проверку, чтобы рано выйти из моей функции, но я не знаю, как легко получить уведомление о том, что результат spawn_blocking (или ThreadPool::spawn_with_handle, для чистого кода на основе фьючерсов) был удален.

1 Ответ

4 голосов
/ 30 января 2020

Вы можете передать атомное c логическое значение, которое вы затем используете для пометки задачи как требующей отмены. (Я не уверен, что использую соответствующий Ordering для вызовов load / store, что, вероятно, требует дополнительного рассмотрения)

Вот модифицированная версия вашего кода, которая выводит

0
1
Sum timed out (expected)
2
Interrupted...
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let some_bool = Arc::new(AtomicBool::new(true));

    let some_bool_clone = some_bool.clone();
    let handle =
        task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
            some_bool.store(false, Ordering::Relaxed);
        }
    }
}

детская площадка


Невозможно добиться того, чтобы это происходило автоматически при сбросе фьючерсов / ручек с текущим Tokio. Некоторая работа в этом направлении ведется в http://github.com/tokio-rs/tokio/issues/1830 и http://github.com/tokio-rs/tokio/issues/1879.

Однако вы можете получить что-то подобное, обернув фьючерсы в пользовательский тип.

Вот пример, который выглядит почти так же, как исходный код, но с добавлением простого типа оболочки в модуле. Было бы еще более эргономично c, если бы я реализовал Future<T> для типа обертки, который просто перенаправляет на завернутую ручку, но это оказалось утомительным.

mod blocking_cancelable_task {
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use tokio::task;

    pub struct BlockingCancelableTask<T> {
        pub h: Option<tokio::task::JoinHandle<T>>,
        flag: Arc<AtomicBool>,
    }

    impl<T> Drop for BlockingCancelableTask<T> {
        fn drop(&mut self) {
            eprintln!("Dropping...");
            self.flag.store(false, Ordering::Relaxed);
        }
    }

    impl<T> BlockingCancelableTask<T>
    where
        T: Send + 'static,
    {
        pub fn new<F>(f: F) -> BlockingCancelableTask<T>
        where
            F: FnOnce(&AtomicBool) -> T + Send + 'static,
        {
            let flag = Arc::new(AtomicBool::new(true));
            let flag_clone = flag.clone();
            let h = task::spawn_blocking(move || f(&flag_clone));
            BlockingCancelableTask { h: Some(h), flag }
        }
    }

    pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
    where
        T: Send + 'static,
        F: FnOnce(&AtomicBool) -> T + Send + 'static,
    {
        BlockingCancelableTask::new(f)
    }
}

use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
        }
    }
}

детская площадка

...