Как я могу запускать набор функций с повторяющимся интервалом без одновременного запуска одной и той же функции, используя только стандартную библиотеку Rust? - PullRequest
1 голос
/ 22 мая 2019

Я хотел бы использовать Rust для создания простого планировщика, чтобы запускать несколько одновременных функций в определенное время, но не запускать больше, если они еще не завершены.

Например, если заданный интервал равен одной секунде, планировщик должен запускать функции и не запускать больше, если предыдущие функции не вернулись. Цель состоит в том, чтобы не запускать одну и ту же функцию несколько раз.

Я создал рабочий пример с Go следующим образом:

package main

import (
    "fmt"
    "sync"
    "time"
)

func myFunc(wg *sync.WaitGroup) {
    fmt.Printf("now: %+s\n", time.Now())
    time.Sleep(3 * time.Second)
    wg.Done()
}

func main() {
    quit := make(chan bool)

    t := time.NewTicker(time.Second)
    go func() {
        for {
            select {
            case <-t.C:
                var wg sync.WaitGroup
                for i := 0; i <= 4; i++ {
                    wg.Add(1)
                    go myFunc(&wg)
                }
                wg.Wait()
                fmt.Printf("--- done ---\n\n")
            case <-quit:
                return
            }
        }
    }()

    <-time.After(time.Minute)
    close(quit)
}

Поскольку в стандартной библиотеке Rust я не нашел что-то вроде NewTicker Go, я использовал Tokio и придумали

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("I am i: {}", i);
                    thread::sleep(time::Duration::from_secs(3));
                    Ok(())
                }));
            }
            Ok(())
        })
        .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

Проблема, с которой я сталкиваюсь при таком подходе, состоит в том, что задачи не ждут вызова предыдущих функций, поэтому функции запускаются снова, независимо от того, выполнялись ли они ранее, я здесь упускаю что-то вроде sync.WaitGroup в Go. Что можно использовать для достижения тех же результатов, что и в рабочем примере?

Можно ли добиться этого, используя только стандартную библиотеку? Это в основном для целей обучения, вероятно, есть довольно простой способ сделать это, и я мог бы избежать дополнительной сложности.

В конце я хотел бы периодически отслеживать некоторые сайты по HTTP (получить только возвращенный код состояния), но не запрашивать их все снова, пока у меня не появятся все ответы.

1 Ответ

1 голос
/ 22 мая 2019

Так как вы хотите параллелизма и будете использовать только стандартную библиотеку, тогда вы в основном должны использовать потоки.

Здесь мы запускаем поток для каждой функции для каждой итерации цикла планировщика, позволяя им запускаться впараллельно.Затем мы ждем завершения всех функций, предотвращая одновременное выполнение одной и той же функции дважды одновременно.

use std::{
    thread,
    time::{Duration, Instant},
};

fn main() {
    let scheduler = thread::spawn(|| {
        let wait_time = Duration::from_millis(500);

        // Make this an infinite loop
        // Or some control path to exit the loop
        for _ in 0..5 {
            let start = Instant::now();
            eprintln!("Scheduler starting at {:?}", start);

            let thread_a = thread::spawn(a);
            let thread_b = thread::spawn(b);

            thread_a.join().expect("Thread A panicked");
            thread_b.join().expect("Thread B panicked");

            let runtime = start.elapsed();

            if let Some(remaining) = wait_time.checked_sub(runtime) {
                eprintln!(
                    "schedule slice has time left over; sleeping for {:?}",
                    remaining
                );
                thread::sleep(remaining);
            }
        }
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

Вы также можете использовать Barrier для запуска каждой функции в потокеодин раз, а затем синхронизировать их все в конце выполнения:

use std::{
    sync::{Arc, Barrier},
    thread,
    time::Duration,
};

fn main() {
    let scheduler = thread::spawn(|| {
        let barrier = Arc::new(Barrier::new(2));

        fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
            move || {
                // Make this an infinite loop
                // Or some control path to exit the loop
                for _ in 0..5 {
                    f();
                    barrier.wait();
                }
            }
        }

        let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
        let thread_b = thread::spawn(with_barrier(barrier.clone(), b));

        thread_a.join().expect("Thread A panicked");
        thread_b.join().expect("Thread B panicked");
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

Я бы не стал использовать или этих решений лично.Я бы нашел ящик , где кто-то другой написал и протестировал нужный мне код.

См. Также:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...