Обычным шаблоном для Node.js приложений является разделение их на множество «подпрограмм», которые разделяют некоторые состояния. Конечно, все «вспомогательные приложения» должны обрабатываться асинхронно.
Вот простой пример такого приложения Node с тремя «вспомогательными приложениями»:
- Интервал timer => Каждые 10 секунд общий
itv_counter
увеличивается - TCP-сервер => Для каждого полученного TCP-сообщения общий
tcp_counter
увеличивается - UDP-сервер => Для Каждое полученное UDP-сообщение увеличивает общий
udp_counter
Каждый раз, когда увеличивается один из счетчиков, должны быть напечатаны все три счетчика (следовательно, почему «подпрограммам» необходимо совместно использовать состояние) .
Вот реализация в Node. Приятной особенностью Node является то, что вы можете предположить, что почти все операции ввода-вывода по умолчанию обрабатываются асинхронно. Для разработчика нет когнитивных накладных расходов.
const dgram = require('dgram');
const net = require('net');
const tcp_port = 3000;
const udp_port = 3001;
const tcp_listener = net.createServer();
const udp_listener = dgram.createSocket('udp4');
// state shared by the 3 asynchronous applications
const shared_state = {
itv_counter: 0,
tcp_counter: 0,
udp_counter: 0,
};
// itv async app: increment itv_counter every 10 seconds and print shared state
setInterval(() => {
shared_state.itv_counter += 1;
console.log(`itv async app: ${JSON.stringify(shared_state)}`);
}, 10_000);
// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
tcp_listener.on('connection', (client) => {
client.on('data', (_data) => {
shared_state.tcp_counter += 1;
console.log(`tcp async app: ${JSON.stringify(shared_state)}`);
});
});
tcp_listener.listen(tcp_port, () => {
console.log(`TCP listener on port ${tcp_port}`);
});
// udp async app: increment udp_counter every time a UDP message is received and print shared state
udp_listener.on('message', (_message, _client) => {
shared_state.udp_counter += 1;
console.log(`udp async app: ${JSON.stringify(shared_state)}`);
});
udp_listener.on('listening', () => {
console.log(`UDP listener on port ${udp_port}`);
});
udp_listener.bind(udp_port);
Теперь вот реализация в Rust с Tokio в качестве асинхронной среды выполнения.
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::{TcpListener, UdpSocket};
use tokio::prelude::*;
// state shared by the 3 asynchronous applications
#[derive(Clone, Debug)]
struct SharedState {
state: Arc<Mutex<State>>,
}
#[derive(Debug)]
struct State {
itv_counter: usize,
tcp_counter: usize,
udp_counter: usize,
}
impl SharedState {
fn new() -> SharedState {
SharedState {
state: Arc::new(Mutex::new(State {
itv_counter: 0,
tcp_counter: 0,
udp_counter: 0,
})),
}
}
}
#[tokio::main]
async fn main() {
let shared_state = SharedState::new();
// itv async app: increment itv_counter every 10 seconds and print shared state
let itv_shared_state = shared_state.clone();
let itv_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
interval.tick().await;
loop {
interval.tick().await;
let mut state = itv_shared_state.state.lock().unwrap();
state.itv_counter += 1;
println!("itv async app: {:?}", state);
}
});
// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
let tcp_shared_state = shared_state.clone();
let tcp_handle = tokio::spawn(async move {
let mut tcp_listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
println!("TCP listener on port 3000");
while let Ok((mut tcp_stream, _)) = tcp_listener.accept().await {
let tcp_shared_state = tcp_shared_state.clone();
tokio::spawn(async move {
let mut buffer = [0; 1024];
while let Ok(byte_count) = tcp_stream.read(&mut buffer).await {
if byte_count == 0 {
break;
}
let mut state = tcp_shared_state.state.lock().unwrap();
state.tcp_counter += 1;
println!("tcp async app: {:?}", state);
}
});
}
});
// udp async app: increment udp_counter every time a UDP message is received and print shared state
let udp_shared_state = shared_state.clone();
let udp_handle = tokio::spawn(async move {
let mut udp_listener = UdpSocket::bind("127.0.0.1:3001").await.unwrap();
println!("UDP listener on port 3001");
let mut buffer = [0; 1024];
while let Ok(_byte_count) = udp_listener.recv(&mut buffer).await {
let mut state = udp_shared_state.state.lock().unwrap();
state.udp_counter += 1;
println!("udp async app: {:?}", state);
}
});
itv_handle.await.unwrap();
tcp_handle.await.unwrap();
udp_handle.await.unwrap();
}
Прежде всего, поскольку я не супер комфортно с Tokio и asyn c Rust, однако, в этой реализации могут быть вещи, которые совершенно неверны, или плохая практика. Пожалуйста, дайте мне знать, если это так (например, я понятия не имею, нужны ли три JoinHandle .await
в самом конце). Тем не менее, он ведет себя так же, как реализация Node для моих простых тестов.
Но я до сих пор не уверен, что это эквивалентно под капотом с точки зрения асинхронности. Должно ли быть tokio::spawn
для каждого обратного вызова в приложении Node? В этом случае я должен обернуть tcp_stream.read()
и udp_listener.recv()
в другой tokio::spawn
, чтобы имитировать c обратные вызовы Node для TCP on('data')
и UDP on('message')
соответственно. Не уверен ...
Какая реализация Tokio будет полностью эквивалентна приложению Node.js с точки зрения асинхронности? В общем, какое хорошее эмпирическое правило знать, когда что-то должно быть заключено в tokio::spawn
?