[tokio-rs] [документация] Несколько асинхронных «подпрограмм» с примером общего состояния? - PullRequest
0 голосов
/ 26 апреля 2020

Обычным шаблоном для Node.js приложений является разделение их на множество «подпрограмм», которые разделяют некоторые состояния. Конечно, все «вспомогательные приложения» должны обрабатываться асинхронно.

Вот простой пример такого приложения Node с тремя «вспомогательными приложениями»:

  1. Интервал timer => Каждые 10 секунд общий itv_counter увеличивается
  2. TCP-сервер => Для каждого полученного TCP-сообщения общий tcp_counter увеличивается
  3. UDP-сервер => Для Каждое полученное UDP-сообщение увеличивает общий udp_counter

Каждый раз, когда увеличивается один из счетчиков, должны быть напечатаны все три счетчика (следовательно, почему «подпрограммам» необходимо совместно использовать состояние) .

Вот реализация в Node. Приятной особенностью Node является то, что вы можете предположить, что почти все операции ввода-вывода по умолчанию обрабатываются асинхронно. Для разработчика нет когнитивных накладных расходов.

const dgram = require('dgram');
const net = require('net');
const tcp_port = 3000;
const udp_port = 3001;
const tcp_listener = net.createServer();
const udp_listener = dgram.createSocket('udp4');

// state shared by the 3 asynchronous applications
const shared_state = {
    itv_counter: 0,
    tcp_counter: 0,
    udp_counter: 0,
};

// itv async app: increment itv_counter every 10 seconds and print shared state
setInterval(() => {
    shared_state.itv_counter += 1;
    console.log(`itv async app: ${JSON.stringify(shared_state)}`);
}, 10_000);

// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
tcp_listener.on('connection', (client) => {
    client.on('data', (_data) => {
        shared_state.tcp_counter += 1;
        console.log(`tcp async app: ${JSON.stringify(shared_state)}`);
    });
});
tcp_listener.listen(tcp_port, () => {
    console.log(`TCP listener on port ${tcp_port}`);
});

// udp async app: increment udp_counter every time a UDP message is received and print shared state
udp_listener.on('message', (_message, _client) => {
    shared_state.udp_counter += 1;
    console.log(`udp async app: ${JSON.stringify(shared_state)}`);
});
udp_listener.on('listening', () => {
    console.log(`UDP listener on port ${udp_port}`);
});
udp_listener.bind(udp_port);

Теперь вот реализация в Rust с Tokio в качестве асинхронной среды выполнения.

use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::{TcpListener, UdpSocket};
use tokio::prelude::*;

// state shared by the 3 asynchronous applications
#[derive(Clone, Debug)]
struct SharedState {
    state: Arc<Mutex<State>>,
}

#[derive(Debug)]
struct State {
    itv_counter: usize,
    tcp_counter: usize,
    udp_counter: usize,
}

impl SharedState {
    fn new() -> SharedState {
        SharedState {
            state: Arc::new(Mutex::new(State {
                itv_counter: 0,
                tcp_counter: 0,
                udp_counter: 0,
            })),
        }
    }
}

#[tokio::main]
async fn main() {
    let shared_state = SharedState::new();
    // itv async app: increment itv_counter every 10 seconds and print shared state
    let itv_shared_state = shared_state.clone();
    let itv_handle = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(10));
        interval.tick().await;
        loop {
            interval.tick().await;
            let mut state = itv_shared_state.state.lock().unwrap();
            state.itv_counter += 1;
            println!("itv async app: {:?}", state);
        }
    });
    // tcp async app: increment tcp_counter every time a TCP message is received and print shared state
    let tcp_shared_state = shared_state.clone();
    let tcp_handle = tokio::spawn(async move {
        let mut tcp_listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
        println!("TCP listener on port 3000");
        while let Ok((mut tcp_stream, _)) = tcp_listener.accept().await {
            let tcp_shared_state = tcp_shared_state.clone();
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                while let Ok(byte_count) = tcp_stream.read(&mut buffer).await {
                    if byte_count == 0 {
                        break;
                    }
                    let mut state = tcp_shared_state.state.lock().unwrap();
                    state.tcp_counter += 1;
                    println!("tcp async app: {:?}", state);
                }
            });
        }
    });
    // udp async app: increment udp_counter every time a UDP message is received and print shared state
    let udp_shared_state = shared_state.clone();
    let udp_handle = tokio::spawn(async move {
        let mut udp_listener = UdpSocket::bind("127.0.0.1:3001").await.unwrap();
        println!("UDP listener on port 3001");
        let mut buffer = [0; 1024];
        while let Ok(_byte_count) = udp_listener.recv(&mut buffer).await {
            let mut state = udp_shared_state.state.lock().unwrap();
            state.udp_counter += 1;
            println!("udp async app: {:?}", state);
        }
    });
    itv_handle.await.unwrap();
    tcp_handle.await.unwrap();
    udp_handle.await.unwrap();
}

Прежде всего, поскольку я не супер комфортно с Tokio и asyn c Rust, однако, в этой реализации могут быть вещи, которые совершенно неверны, или плохая практика. Пожалуйста, дайте мне знать, если это так (например, я понятия не имею, нужны ли три JoinHandle .await в самом конце). Тем не менее, он ведет себя так же, как реализация Node для моих простых тестов.

Но я до сих пор не уверен, что это эквивалентно под капотом с точки зрения асинхронности. Должно ли быть tokio::spawn для каждого обратного вызова в приложении Node? В этом случае я должен обернуть tcp_stream.read() и udp_listener.recv() в другой tokio::spawn, чтобы имитировать c обратные вызовы Node для TCP on('data') и UDP on('message') соответственно. Не уверен ...

Какая реализация Tokio будет полностью эквивалентна приложению Node.js с точки зрения асинхронности? В общем, какое хорошее эмпирическое правило знать, когда что-то должно быть заключено в tokio::spawn?

1 Ответ

1 голос
/ 26 апреля 2020

Я вижу, что у вас есть три разных счетчика для ваших задач, и поэтому я думаю, что есть смысл использовать токен вашей структуры состояний и переключать его между задачами. Поэтому каждая задача отвечает за обновление своего счетчика. В качестве предложения я предлагаю использовать tokio :: syn c :: mps c :: channel и реализовать три значения mps c, каждое из которых направлено от одной задачи к следующей.

Конечно, если между задачами есть разница в периоде обновления, есть риск, что некоторые значения обновляются немного позже, но я думаю, что в общем случае это можно игнорировать.

...