Как извлекать сообщения из неограниченной очереди каждые N секунд и порождать их в обработчике Tokio? - PullRequest
0 голосов
/ 12 октября 2018

Я пытаюсь извлекать сообщения (которые сами являются фьючерсами) из неограниченной очереди каждые N секунд и порождает их в обработчик Tokio.

Я пробовал десятки вариантов, но не могу найти правильный подход.Похоже, что это возможно, но я всегда сталкиваюсь с несоответствием типов в будущем или сталкиваюсь с проблемами заимствования.

Это код, который показывает более или менее то, что я хочу:

let fut = Interval::new_interval(Duration::from_secs(1))
        .for_each(|num| vantage_dequeuer.into_future() )
        .for_each(|message:VantageMessage |{
            handle.spawn(message);
            return Ok(());
        })
        .map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);

Полный код:

extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::future::ok;
use futures::sync::mpsc;
use futures::{Future, Stream};
use std::thread;
use std::time::Duration;
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

fn main() {
    let (enqueuer, dequeuer) = mpsc::unbounded();
    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        return Ok(());
    }));
    enqueuer.unbounded_send(new_fut);
    let joinHandle = worker(Some(dequeuer));
    joinHandle.join();
}

/*
    Every second extract one message from dequeuer (or wait if not available)
    and spawn it in the core
*/
fn worker(
    mut vantage_dequeuer: Option<mpsc::UnboundedReceiver<VantageMessage>>,
) -> thread::JoinHandle<()> {
    let dequeuer = dequeuer.take().unwrap();
    let joinHandle = thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let fut = Interval::new_interval(Duration::from_secs(1))
            .for_each(|num| vantage_dequeuer.into_future())
            .for_each(|message: VantageMessage| {
                handle.spawn(message);
                return Ok(());
            })
            .map_err(|e| panic!("delay errored; err={:?}", e));

        core.run(fut);
        println!("Returned!");
    });
    return joinHandle;
}

Детская площадка

error[E0425]: cannot find value `dequeuer` in this scope
  --> src/main.rs:33:20
   |
33 |     let dequeuer = dequeuer.take().unwrap();
   |                    ^^^^^^^^ not found in this scope

error[E0599]: no method named `into_future` found for type `std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>>` in the current scope
  --> src/main.rs:38:46
   |
38 |             .for_each(|num| vantage_dequeuer.into_future())
   |                                              ^^^^^^^^^^^
   |
   = note: the method `into_future` exists but the following trait bounds were not satisfied:
           `&mut std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>> : futures::Stream`

1 Ответ

0 голосов
/ 12 октября 2018

Interval и UnboundedReceiver оба являются потоками, поэтому я бы использовал Stream::zip для их объединения:

Сжатый поток ожидает, пока оба потока произведутэлемент, а затем возвращает эту пару.Если произойдет ошибка, эта ошибка будет немедленно возвращена.Если какой-либо поток заканчивается, то сжатый поток также завершится.

extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::{
    future::ok,
    sync::mpsc,
    {Future, Stream},
};
use std::{thread, time::Duration};
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

pub fn main() {
    let (tx, rx) = mpsc::unbounded();

    let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
        println!("Message!");
        Ok(())
    }));
    tx.unbounded_send(new_fut).expect("Unable to send");
    drop(tx); // Close the sending side

    worker(rx).join().expect("Thread had a panic");
}

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        core.run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    handle.spawn(message);
                    Ok(())
                })
        })
        .expect("Unable to run reactor");
        println!("Returned!");
    })
}

Обратите внимание, что на самом деле не ждет завершения какого-либо из порожденных фьючерсов до закрытия реактора.вниз.Если вы хотите, я бы переключился на tokio::run и tokio::spawn:

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
    thread::spawn(|| {
        tokio::run({
            Interval::new_interval(Duration::from_secs(1))
                .map_err(|e| panic!("delay errored; err={}", e))
                .zip(queue)
                .for_each(|(_, message)| {
                    tokio::spawn(message);
                    Ok(())
                })
        });
        println!("Returned!");
    })
}
...