Я играю с Tokio и Rust, и, например, я пытаюсь написать простой UDP-прокси, который будет принимать пакеты UDP на одном сокете и отправлять его нескольким другим адресатам.Однако я сталкиваюсь с ситуацией, когда мне нужно отправить полученный пакет на несколько адресов, и я не уверен, как это сделать идиоматическим образом.
Код У меня так далеко:
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() {
let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
let socket = UdpSocket::bind(&listen_address).unwrap();
let peers = vec![
"192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
"192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
];
UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
move |(bytes, _from)| {
// These are the problematic lines
for peer in peers.iter() {
socket.send_dgram(&bytes, &peer);
}
Ok(())
},
)
};
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
}
Проблемные линии пытаются отправить полученный пакет по нескольким другим адресам, используя вновь связанный сокет.
В существующих примерах все пакеты перенаправляются в один пункт назначения или внутренне используют каналы mpsc для связи между внутренними задачами.Я не думаю, что это необходимо и что это можно сделать без необходимости создавать более одной задачи на сокет прослушивания.
Обновление: Благодаря @ Ömer-erden я получилэтот код, который работает.
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
let socket = UdpSocket::bind(&listen_address)?;
let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
let forwarder = reader.for_each(move |(bytes, _from)| {
for peer in peers.iter() {
writer.start_send((bytes.clone().into(), peer.clone()))?;
}
writer.poll_complete()?;
Ok(())
});
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
Ok(())
}
Обратите внимание, что:
Нет необходимости вызывать poll_completion
для каждого start_send
: его нужно просто вызвать после всех start_send
было отправлено.
По некоторым причинам содержимое peer
распределяется между вызовами (но нет ошибки компилятора), генерируя ошибку 22 (которая обычно происходит из-за того, чтоневерный адрес присваивается sendto(2)
).
Глядя в отладчик, совершенно ясно, что во второй раз адрес однорангового узла указывает на недопустимую память.Вместо этого я решил клонировать peer
.
Я удалил вызовы на unwrap()
и вместо этого передал Result
.