Канал связи между задачами - PullRequest
0 голосов
/ 08 марта 2019

Я пытаюсь установить канал связи между одним гипер сервисом и одним Токио потоком.Проблема в том, что компилятор работает со следующей ошибкой:

закрытие равно FnOnce, поскольку оно перемещает переменную tx_queue из своего окружения.

После чтенияобъяснение, предоставленное rustc --explain E0525, показывает, что tokio :: sync :: mpsc :: Sender реализует Clone, но не реализует Copy (если я что-то не заметил).

Так что я немного застрял.Как я могу заставить мою службу отправлять сообщения в поток Tokio через канал tokio::sync::mpsc?Я уверен, что упускаю что-то очевидное, но не вижу, что: /

Выдержка из проблемного кода (изменена, чтобы сделать его короче по запросу @ E_net4):

    extern crate hyper;
    extern crate tokio;
    extern crate tokio_signal;

    use futures::Stream;
    use hyper::rt::Future;
    use hyper::service::service_fn_ok;
    use hyper::{Body, Request, Response, Server};

    use futures::sink::Sink;
    use futures::sync::{mpsc, oneshot};
    use futures::{future, stream};

    fn main() {
        let mut runtime = tokio::runtime::Runtime::new().unwrap();

        let (tx1, rx1) = oneshot::channel::<()>();

        let (tx_queue, rx_queue) = mpsc::channel(10);

        // ----

        runtime.spawn(start_queue(rx_queue));

        // ----

        let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
            service_fn_ok(move |_: Request<Body>| {
                tx_queue.send(1);
                Response::new(Body::from("Hello World!"))
            })
        });

        let graceful = http_server
            .with_graceful_shutdown(rx1)
            .map_err(|err| eprintln!("server error: {}", err))
            .and_then(|_| {
                dbg!("stopped");
                // TODO: stop order queue listener
                Ok(())
            });

        dbg!("HTTP server listening ...");

        runtime.spawn(graceful);

        // ----

        tx1.send(()).unwrap();

        dbg!("exited");
    }

    pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
        #[derive(Eq, PartialEq)]
        enum Item {
            Value(usize),
            Tick,
            Done,
        }

        let items = rx
            .map(Item::Value)
            .chain(stream::once(Ok(Item::Done)))
            .take_while(|item| future::ok(*item != Item::Done));

        items
            .fold(0, |num, _item| {
                dbg!("x");
                future::ok(num)
            })
            .map(|_| ())
    }

Весь коддоступно здесь: https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e

Спасибо:)

1 Ответ

2 голосов
/ 08 марта 2019

futures::sync::mpsc::Sender::send потребляет Sender и создает объект Send, который является будущим, который необходимо запустить до завершения, чтобы фактически отправить данные. Если канал заполнен, он будет блокироваться, пока кто-то еще не получит от канала. По завершении он возвращает вам Sender, который вы могли бы использовать для отправки дополнительных данных.

В этом случае я не думаю, что вы можете структурировать код только с одним экземпляром Sender. Вам необходимо его клонировать, чтобы при каждом вызове сервисной функции создавался новый клон. Обратите внимание, что теперь оба закрытия move:

    let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
        // This closure has one instance of tx_queue that was moved-in here.
        // Now we make a copy to be moved into the closure below.
        let tx_queue = tx_queue.clone();
        service_fn_ok(move |_: Request<Body>| {
            // This closure has one instance of tx_queue, but it will be called
            // multiple times, so it can not consume it. It must make a copy
            // before consuming it.
            tx_queue.clone().send(111);
            Response::new(Body::from("Hello World!"))
        })
    });

Но это даст вам следующее предупреждение:

warning: unused `futures::sink::send::Send` that must be used

Как я уже сказал, send просто дает вам будущее, которое необходимо запустить, чтобы фактически выполнить отправку. Если вы проигнорируете возвращаемое значение, ничего не произойдет. В этом случае было бы лучше, если бы spawn было выполнено как отдельная задача (чтобы он не блокировал ответ клиенту). Чтобы вызвать его, вам нужен исполнитель из среды выполнения, который также должен быть клонирован для внутреннего закрытия:

    let executor = runtime.executor();
    let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
        let tx_queue = tx_queue.clone();
        let executor = executor.clone();
        service_fn_ok(move |_: Request<Body>| {
            executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
                // TODO: Handle the error differenty!
                panic!("Error in mpsc {:?}", err);
            }));
            Response::new(Body::from("Hello World!"))
        })
    });
...