Чтение всех доступных сообщений из mpsc UnboundedReceiver без ненужной блокировки - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть futures::sync::mpsc::unbounded канал.Я могу отправлять сообщения на UnboundedSender<T>, но у меня возникают проблемы при получении их с UnboundedReciever<T>.
. Я использую канал для отправки сообщений в поток пользовательского интерфейса, и у меня есть функция, которая вызывается каждый кадр, и я бынравится читать все доступные сообщения из канала в каждом кадре, не блокируя поток, когда нет доступных сообщений.

Из того, что я прочитал, метод Future::poll является своего рода тем, что мне нужно, япросто опрос, и если я получаю Async :: Ready, я что-то делаю с сообщением, а если нет, я просто возвращаюсь из функции.
Проблема в том, что poll вызывает панику, когда нет контекста задачи (яя не уверен, что это значит или что с этим делать).

То, что я пытался:

let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = match receiver.by_ref().collect().poll() {
    Async::Ready(items_vec) => // do something on UI with items,
    _ => return None
}

это паникует, потому что у меня нет контекста задачи.

Также попробовал:

let (sender, receiver) = unbounded(); // somewhere in the code, doesn't matter
// ...
let fut = receiver.by_ref().collect(); // how do I run the future?
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut); // this blocks the thread when there are no items in the receiver

Я хотел бы помочь с чтением UnboundedReceiver<T> без блокировки потока, когда в потоке нет элементов (просто ничего не делать).

Спасибо!

1 Ответ

0 голосов
/ 11 января 2019

Вы используете фьючерсы неправильно - вам нужно Runtime и немного больше шаблонов, чтобы это заработало:


    extern crate tokio;
    extern crate futures;

    use tokio::prelude::*;
    use futures::future::{lazy, ok};
    use futures::sync::mpsc::unbounded;
    use tokio::runtime::Runtime;

    fn main() {
        let (sender, receiver) = unbounded::<i64>();
        let receiver = receiver.for_each(|result| {
            println!("Got: {}", result);
            Ok(())
        });

        let rt = Runtime::new().unwrap();
        rt.executor().spawn(receiver);

        let lazy_future = lazy(move || {
            sender.unbounded_send(1).unwrap();
            sender.unbounded_send(2).unwrap();
            sender.unbounded_send(3).unwrap();
            ok::<(), ()>(())
        });

        rt.block_on_all(lazy_future).unwrap();
    }

Дальнейшее чтение, из Модель времени исполнения Токио :

[...] Чтобы использовать Tokio и успешно выполнять задачи, приложение должно запустить исполнителя и необходимые драйверы для ресурсов, от которых зависят задачи приложения.Это требует значительного шаблона.Чтобы управлять образцом, Tokio предлагает несколько вариантов исполнения.Среда выполнения - это исполнитель в комплекте со всеми необходимыми драйверами для питания ресурсов Tokio.Вместо того чтобы управлять всеми различными компонентами Tokio по отдельности, среда выполнения создается и запускается за один вызов.

Tokio предлагает одновременную среду исполнения и однопоточную среду исполнения ,Параллельная среда выполнения поддерживается многопоточным исполнителем, выполняющим кражу работы.Однопоточная среда выполнения выполняет все задачи и драйверы в текущем потоке.Пользователь может выбрать среду выполнения с характеристиками, наиболее подходящими для приложения.

...