Совместное выполнение Rust с Futures и Tokio - PullRequest
0 голосов
/ 18 ноября 2018

У меня есть некоторый код Rust, который в настоящее время выглядит следующим образом:

fn read_stdin(mut tx: mpsc::Sender<String>) {
    loop {
        // read from stdin and send value over tx.
    }
}

fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
    thread::sleep(time::Duration::from_millis(n));
    println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
    future::ok(())
}

fn main() {
    let (stdin_tx, stdin_rx) = mpsc::channel(0);
    thread::spawn(move || read_stdin(stdin_tx));

    let server = stdin_rx
        .map(|data| data.trim().parse::<u64>().unwrap_or(0))
        .for_each(|n| tokio::spawn(sleep_for(n * 100)));
    tokio::run(server);
}

Он использует токио и фьючерсы, чтобы выполнить некоторую "тяжелую" работу процессора (эмулируется функцией sleep_for)и затем выводит некоторые вещи в stdout.

Когда я его запускаю, все работает нормально, и я получаю этот вывод

2
[00:00:00.800] slept for 200 ms
10
1
[00:00:01.800] slept for 1000 ms
[00:00:01.900] slept for 100 ms

Первый вывод со значением 2точно так, как ожидалось, и я вижу метку времени, напечатанную после 200 мс.Но для следующих входов становится ясно, что функция sleep_for выполняется последовательно, а не одновременно.

Вывод, который я хочу увидеть, является

2
[00:00:00.800] slept for 200 ms
10
1
[00:00:00.900] slept for 100 ms
[00:00:01.900] slept for 1000 ms

Кажется, чточтобы получить вывод, который я ищу, я хочу выполнить sleep_for(10) и sleep_for(1) одновременно.Как мне поступить в Rust с фьючерсами и токио?

(Примечание: фактические значения временных меток не важны, я использую их больше, чтобы показать порядок выполнения в программе)

1 Ответ

0 голосов
/ 18 ноября 2018

Нашел решение с использованием ящика futures-timer.

use chrono::Local;
use futures::{future, sync::mpsc, Future, Sink, Stream};
use futures_timer::Delay;
use std::{io::stdin, thread, time::Duration};

fn read_stdin(mut tx: mpsc::Sender<String>) {
    let stdin = stdin();
    loop {
        let mut buf = String::new();
        stdin.read_line(&mut buf).unwrap();
        tx = tx.send(buf).wait().unwrap()
    }
}

fn main() {
    let (stdin_tx, stdin_rx) = mpsc::channel(0);
    thread::spawn(move || read_stdin(stdin_tx));

    let server = stdin_rx
        .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
        .for_each(|delay| {
            println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
            tokio::spawn({
                Delay::new(Duration::from_millis(delay))
                    .and_then(move |_| {
                        println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
                        future::ok(())
                    })
                    .map_err(|e| panic!(e))
            })
        });

    tokio::run(server);
}

Проблема заключается в том, что вместо того, чтобы позволить будущему становиться припаркованным, а затем уведомлять о текущей задаче, код, представленный в вопросе, просто спал в потоке, и поэтому никакого прогресса достичь не удалось.

Обновление: теперь я только что натолкнулся на tokio-timer, который кажется стандартным способом сделать это.

...