токио с multiqueue иногда висит, иногда работает - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь сопоставить ящик multiqueue с Tokio, чтобы реализовать что-то в духе издателя / подписчика, делая Stream s, которые можно повторять.Я не уверен в эффективности (мне могут понадобиться десятки или сотни слушателей, которые будут фильтровать элементы, и один издатель будет публиковать где-то около 10 сообщений в миллисекунду), поэтому я хотел бы сравнить подход перед тем, как взять на себя обязательствоЭто.Однако прямо сейчас я сталкиваюсь со странной ошибкой, когда иногда tokio::timer::Interval, похоже, вообще не срабатывает.

Полный код приведен ниже:

#![feature(test)]

extern crate futures;
extern crate multiqueue;
extern crate test;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use futures::future::lazy;
    use futures::sync::mpsc::{channel, Receiver, Sender};
    use futures::{Async, Poll, Stream};
    use futures::{Future, Sink};
    use test::Bencher;
    use tokio::timer::Interval;

    #[bench]
    fn bench_many(b: &mut Bencher) {
        tokio::run(lazy(|| {
            let (tx, rx) = multiqueue::mpmc_fut_queue(1000);
            tokio::spawn(
                Interval::new_interval(std::time::Duration::from_micros(100))
                    .take(100)
                    .map(|_| 100)
                    .map_err(|e| {
                        eprintln!("Got interval error = {:?}", e);
                    })
                    .fold(tx, |tx, num| {
                        println!("Sending {}", num);
                        tx.send(num).map_err(|e| println!("send err = {:?}", e))
                    })
                    .map(|_| ()),
            );

            for i in 0..3 {
                println!("Starting");
                let rx = rx.clone();
                tokio::spawn(rx.for_each(move |num| {
                    println!("{} Got a num! {}", i, num);
                    Ok(())
                }));
            }

            Ok(())
        }));
    }
}

иЯ запускаю его с cargo bench.futures для версии "0.1", tokio для версии "0.1" и multiqueue для версии "0.3".

Иногда весь тест завершается многими сообщениями "[0-2] Получил num! 100 "и" Sending 100 ", но иногда он зависает либо посередине (после нескольких сообщений" Sending "и" Got a "), либо зависает только с 3 сообщениями" Starting ".

Я подозреваю, что это может быть связано с количеством задач, которые я могу запустить одновременно с tokio, но я не совсем понимаю, почему это ограничение, с которым я столкнусь, поскольку оба типа задачЯ порождаю время, необходимое для исполнения исполнителю.

Как я могу сделать это более надежным?

...