Вы можете передать атомное c логическое значение, которое вы затем используете для пометки задачи как требующей отмены. (Я не уверен, что использую соответствующий Ordering
для вызовов load
/ store
, что, вероятно, требует дополнительного рассмотрения)
Вот модифицированная версия вашего кода, которая выводит
0
1
Sum timed out (expected)
2
Interrupted...
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let some_bool = Arc::new(AtomicBool::new(true));
let some_bool_clone = some_bool.clone();
let handle =
task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
let guarded = time::timeout(Duration::from_millis(250), handle);
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
some_bool.store(false, Ordering::Relaxed);
}
}
}
детская площадка
Невозможно добиться того, чтобы это происходило автоматически при сбросе фьючерсов / ручек с текущим Tokio. Некоторая работа в этом направлении ведется в http://github.com/tokio-rs/tokio/issues/1830 и http://github.com/tokio-rs/tokio/issues/1879.
Однако вы можете получить что-то подобное, обернув фьючерсы в пользовательский тип.
Вот пример, который выглядит почти так же, как исходный код, но с добавлением простого типа оболочки в модуле. Было бы еще более эргономично c, если бы я реализовал Future<T>
для типа обертки, который просто перенаправляет на завернутую ручку, но это оказалось утомительным.
mod blocking_cancelable_task {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task;
pub struct BlockingCancelableTask<T> {
pub h: Option<tokio::task::JoinHandle<T>>,
flag: Arc<AtomicBool>,
}
impl<T> Drop for BlockingCancelableTask<T> {
fn drop(&mut self) {
eprintln!("Dropping...");
self.flag.store(false, Ordering::Relaxed);
}
}
impl<T> BlockingCancelableTask<T>
where
T: Send + 'static,
{
pub fn new<F>(f: F) -> BlockingCancelableTask<T>
where
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
let flag = Arc::new(AtomicBool::new(true));
let flag_clone = flag.clone();
let h = task::spawn_blocking(move || f(&flag_clone));
BlockingCancelableTask { h: Some(h), flag }
}
}
pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
where
T: Send + 'static,
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
BlockingCancelableTask::new(f)
}
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
}
}
}
детская площадка