Нерестовые задания с нестатическим временем жизни в Токио - PullRequest
0 голосов
/ 20 декабря 2018

У меня есть ядро ​​Tokio, основной задачей которого является запуск веб-сокета (клиента).Когда я получаю некоторые сообщения от сервера, я хочу выполнить новую задачу, которая обновит некоторые данные.Ниже приведен минимальный ошибочный пример:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}

, который вызывает эту ошибку:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      

Проблема заключается в том, что новые задачи, которые создаются через дескриптор, должны быть статическими.Та же проблема описана здесь .К сожалению, мне неясно, как я могу решить эту проблему.Даже некоторые попытки с и Arc и Mutex (которые на самом деле не нужны для однопоточного приложения) не увенчались успехом.

Поскольку события происходят довольно быстро в ландшафте Tokio, ямне интересно, каково текущее лучшее решение.Есть ли у вас какие-либо предложения?

edit

Решение Peter Hall работает для приведенного выше примера.К сожалению, когда я построил неудачный пример, я сменил реактор Tokio, думая, что они будут похожи.Используя tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}

, я получаю:

error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21                                                         
|                                                                            
17 |         self.handle.spawn(future::ok(()).and_then(move |_x| {              
|                     ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|                                                                            
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`

Так что в этом случае мне нужны Arc и Mutex, даже если весь кододнопоточный

1 Ответ

0 голосов
/ 20 декабря 2018

В однопоточной программе вам не нужно использовать Arc;Rc достаточно:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

Дело в том, что вам больше не нужно беспокоиться о времени жизни, поскольку каждый клон Rc действует так, как будто он владеет данными, а не обращается к ним черезссылка на self.Внутренний Cell (или RefCell для не Copy типов) необходим, потому что Rc не может быть разыменованным, поскольку он был клонирован.


spawn метод tokio::runtime::current_thread::Handle требует, чтобы будущее было Send, что и является причиной проблемы в обновлении вашего вопроса.Существует объяснение (в некотором роде), почему это так в этой проблеме Tokio Github .

Вы можете использовать tokio::runtime::current_thread::spawn вместо метода Handle, который всегда будетзапустить будущее в текущем потоке, и не требует, чтобы будущее было Send.Вы можете заменить self.handle.spawn в приведенном выше коде, и он будет отлично работать.

Если вам нужно использовать метод на Handle, вам также придется прибегнуть к Arc и Mutex (или RwLock), чтобы удовлетворить требование Send:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}

Если ваши данные действительно usize, вы также можете использовать AtomicUsize вместо Mutex<usize>, но я лично считаю, что работать с ним так же неудобно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...