Как я могу запустить набор функций одновременно с повторяющимся интервалом без одновременного запуска одной и той же функции с использованием Tokio? - PullRequest
0 голосов
/ 23 мая 2019

Моя цель - запустить N функций одновременно, но я не хочу больше порождать их, пока все они не завершатся. Это что у меня есть :

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

Каждую секунду я порождаю 5 функций, но теперь я хотел бы подождать, пока все функции не закончатся, прежде чем создавать больше.

Исходя из моего понимания (я, вероятно, неправильно понял идею), я возвращаю Future в другом будущем

task (Interval ----------------------+ (outer future)
    for i in 0..5 {                  |
        tokio::spawn(  ----+         | 
            // my function | (inner) |
            Ok(())         |         |
        )              ----+         |
    }                                |
    Ok(()) --------------------------+

Я застрял, пытаясь дождаться окончания внутреннего будущего.

Ответы [ 2 ]

2 голосов
/ 23 мая 2019

Вы можете достичь этого, объединив свои рабочие фьючерсы так, чтобы они все работали параллельно, но должны все закончить вместе. Затем вы можете присоединиться к этому с задержкой в ​​1 секунду для того же обоснования. Оберните это в цикл, чтобы запустить его навсегда (или 5 итераций для демонстрации).

use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay};  // 0.1.18

fn main() {
    let repeat_count = Some(5);

    let forever = future::loop_fn(repeat_count, |repeat_count| {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

        // Run all this again
        wait.map(move |_| {
            if let Some(0) = repeat_count {
                Loop::Break(())
            } else {
                Loop::Continue(repeat_count.map(|c| c - 1))
            }
        })
    });

    tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
    vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    future::ok(())
        .inspect(move |_| eprintln!("page {} starting", name))
        .and_then(move |_| ez_delay_ms(time_ms))
        .inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done

Смотри также:

1 голос
/ 23 мая 2019

Из моего понимания (я, наверное, неправильно понимаю идею) я возвращая Future в другое будущее

Вы не ошиблись, но в коде, который вы указали, единственным возвращаемым будущим является Ok(()), который реализует IntoFuture. tokio::spawn просто порождает новое задание в DefaultExecutor Токио.

Если я понимаю из вашего вопроса, вы хотите порождать следующую партию , когда предыдущая одна завершена, но если предыдущая сделана до 1 секунды, вы хотите закончить 1 секунда перед порождением следующей партии.

Лучше было бы реализовать собственное будущее и самостоятельно провести опрос, но это можно сделать примерно так:

  • Используя join_all для сбора пакетных заданий. Это новое будущее, которое ожидает завершения собранных фьючерсов.
  • Для ожидания 1 секунды вы можете использовать атомарное состояние. Если он заблокирован для галочки, он ожидает освобождения состояния.

Вот код ( Детская площадка ):

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::time::{self, Duration, Instant};

use tokio::prelude::*;
use tokio::timer::{Delay, Interval};

use futures::future::join_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

fn main() {
    let locker = Arc::new(AtomicBool::new(false));

    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .map_err(|e| panic!("interval errored; err={:?}", e))
        .for_each(move |interval| {
            let is_locked = locker.load(Ordering::SeqCst);
            println!("Interval: {:?} --- {:?}", interval, is_locked);

            if !is_locked {
                locker.store(true, Ordering::SeqCst);
                println!("locked");

                let futures: Vec<_> = (0..5)
                    .map(|i| {
                        lazy(move || {
                            println!("Running Task-{}", i);
                            // mock delay
                            Delay::new(Instant::now() + Duration::from_millis(100 - i))
                                .then(|_| Ok(()))
                        })
                        .and_then(move |_| {
                            println!("Task-{} is done", i);
                            Ok(())
                        })
                    })
                    .collect();

                let unlocker = locker.clone();
                tokio::spawn(join_all(futures).and_then(move |_| {
                    unlocker.store(false, Ordering::SeqCst);
                    println!("unlocked");

                    Ok(())
                }));
            }

            Ok(())
        });

    tokio::run(task.then(|_| Ok(())));
}

Выход:

Interval: Instant { tv_sec: 4036783, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-4 is done
Task-3 is done
Task-2 is done
Task-1 is done
Task-0 is done
unlocked
Interval: Instant { tv_sec: 4036784, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-3 is done
Task-4 is done
Task-0 is done
Task-1 is done
Task-2 is done
unlocked

Внимание! : Пожалуйста, проверьте Комментарий Шепмастера

Даже для демонстрации, вы не должны использовать thread: sleep в фьючерсах. Есть лучшие альтернативы

...