Многоадресные UDP-пакеты с использованием фьючерсов Tokio - PullRequest
3 голосов
/ 13 июня 2019

Я играю с Tokio и Rust, и, например, я пытаюсь написать простой UDP-прокси, который будет принимать пакеты UDP на одном сокете и отправлять его нескольким другим адресатам.Однако я сталкиваюсь с ситуацией, когда мне нужно отправить полученный пакет на несколько адресов, и я не уверен, как это сделать идиоматическим образом.

Код У меня так далеко:

extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() {
    let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
    let forwarder = {
        let socket = UdpSocket::bind(&listen_address).unwrap();
        let peers = vec![
            "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
            "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
        ];
        UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
            move |(bytes, _from)| {
                // These are the problematic lines
                for peer in peers.iter() {
                    socket.send_dgram(&bytes, &peer);
                }
                Ok(())
            },
        )
    };

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
}

Проблемные линии пытаются отправить полученный пакет по нескольким другим адресам, используя вновь связанный сокет.

В существующих примерах все пакеты перенаправляются в один пункт назначения или внутренне используют каналы mpsc для связи между внутренними задачами.Я не думаю, что это необходимо и что это можно сделать без необходимости создавать более одной задачи на сокет прослушивания.

Обновление: Благодаря @ Ömer-erden я получилэтот код, который работает.

extern crate bytes;
extern crate futures;

use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
    let socket = UdpSocket::bind(&listen_address)?;
    let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
    let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
    let forwarder = reader.for_each(move |(bytes, _from)| {
        for peer in peers.iter() {
            writer.start_send((bytes.clone().into(), peer.clone()))?;
        }
        writer.poll_complete()?;
        Ok(())
    });

    tokio::run({
        forwarder
            .map_err(|err| println!("Error: {}", err))
            .map(|_| ())
    });
    Ok(())
}

Обратите внимание, что:

  • Нет необходимости вызывать poll_completion для каждого start_send: его нужно просто вызвать после всех start_sendбыло отправлено.

  • По некоторым причинам содержимое peer распределяется между вызовами (но нет ошибки компилятора), генерируя ошибку 22 (которая обычно происходит из-за того, чтоневерный адрес присваивается sendto(2)).

    Глядя в отладчик, совершенно ясно, что во второй раз адрес однорангового узла указывает на недопустимую память.Вместо этого я решил клонировать peer.

  • Я удалил вызовы на unwrap() и вместо этого передал Result.

1 Ответ

1 голос
/ 13 июня 2019

В настоящее время в вашем коде есть логическая ошибка, вы пытаетесь связать один и тот же адрес дважды, чтобы использовать оба сокета как отправителя и получателя соответственно, вместо этого вы можете использовать Stream a Sinks , UdpFramed имеет функциональные возможности для обеспечения тот. Пожалуйста, смотрите Раковина :

Sink - это значение, в которое другие значения могут отправляться асинхронно.

let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
    let (mut socket_sink, socket_stream) =
        UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).split();
    let peers = vec![
        "192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
        "192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
    ];

    socket_stream.for_each(move |(bytes, _from)| {
        for peer in peers.iter() {
            socket_sink.start_send((bytes.clone().into(), *peer));
            socket_sink.poll_complete();
        }
        Ok(())
    })
};

tokio::run({
    forwarder
        .map_err(|err| println!("Error: {}", err))
        .map(|_| ())
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...