Как использовать UdpSocket от Tokio для обработки сообщений на сервере 1: настройка N клиентов? - PullRequest
0 голосов
/ 13 мая 2018

Что я хочу сделать:

... написать (1) сервер / (N) клиентов (network-game-) архитектура, которая использует UDP-сокеты в качестве базовой базы для связи.

  • Сообщения отправляются как Vec<u8>, закодированные с помощью bincode (ящик)

  • Я также хочу иметь возможность периодически отправлять дейтаграммы, которые могут превышать типичные максимальные MTU из ~ 1500 bytes и быть правильно собранные на стороне получателя, включая отправку сообщений ack и т. Д. (я полагаю, мне придется самому это реализовать, верно?)

Для UdpSocket Я думал об использовании реализации tokio и, возможно, framed. Я не уверен, является ли это хорошим выбором, хотя, по-видимому, это приведет к ненужному этапу сопоставления Vec<u8> (сериализуется bincode) с Vec<u8> (необходимо UdpCodec из tokio) ( ?)

Рассмотрим этот минимальный пример кода:

Cargo.toml (сервер)

bincode = "1.0"
futures = "0.1"
tokio-core = "^0.1"

(Serde и serde-derive используются в shared ящике, где определен протокол!)

(я хочу заменить tokio-core на tokio как можно скорее)

fn main() -> () {
    let addr = format!("127.0.0.1:{port}", port = 8080);
    let addr = addr.parse::<SocketAddr>().expect(&format!("Couldn't create valid SocketAddress out of {}", addr));

    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = UdpSocket::bind(&addr, &handle).expect(&format!("Couldn't bind socket to address {}", addr));


    let udp_future = socket.framed(MyCodec {}).for_each(|(addr, data)| {
        socket.send_to(&data, &addr); // Just echo back the data
        Ok(())
    });

    core.run(udp_future).unwrap();
}

struct MyCodec;

impl UdpCodec for MyCodec {
    type In = (SocketAddr, Vec<u8>);
    type Out = (SocketAddr, Vec<u8>);

    fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
        Ok((*src, buf.to_vec()))
    }

    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr {
        let (addr, mut data) = msg;
        buf.append(&mut data);
        addr
    }
}

Проблема здесь:

let udp_future = socket.framed (MyCodec {}). For_each (| (адрес, данные) | { | ------ значение перемещено сюда ^^^^^^^^^^^^^^ значение, полученное здесь после перемещения | = примечание: перемещение происходит, потому что socket имеет тип tokio_core::net::UdpSocket, который не реализует черту Copy

Ошибка имеет смысл, но я не уверен, как бы я создал такой простой эхо-сервис. На самом деле обработка сообщения включает в себя немного больше логики ofc, но ради минимального примера этого должно быть достаточно, чтобы дать приблизительное представление.

Мой обходной путь - уродливый хак: создание второго сокета.

1 Ответ

0 голосов
/ 14 мая 2018

Вот подпись UdpSocket::framed из документации Токио:

pub fn framed<C: UdpCodec>(self, codec: C) -> UdpFramed<C>

Обратите внимание, что требуется self, а не &self; то есть вызов этой функции потребляет сокет. Оболочка UdpFramed владеет базовым сокетом, когда вы вызываете это. Ваша ошибка компиляции говорит вам, что вы перемещаете socket при вызове этого метода, но вы также пытаетесь заимствовать socket внутри своего замыкания (для вызова send_to).

Это, вероятно, не то, что вы хотите для реального кода. Весь смысл использования framed() состоит в том, чтобы превратить ваш сокет в нечто более высокое, чтобы вы могли отправлять элементы вашего кодека напрямую, вместо того, чтобы собирать дейтаграммы. Использование send или send_to непосредственно на сокете, вероятно, нарушит создание протокола вашего сообщения. В этом коде, где вы пытаетесь реализовать простой эхо-сервер, вам вообще не нужно использовать framed. Но если вы хотите получить свой торт и съесть его и использовать framed и send_to, к счастью UdpFramed по-прежнему позволяет заимствовать базовый UdpSocket, используя get_ref. Вы можете решить вашу проблему следующим образом:

let framed = {
    let socket = UdpSocket::bind(&addr, &handle).expect(&format!("Couldn't bind socket to address {}", addr));
    socket.framed(MyCodec {})
}

let udp_future = framed.for_each(|(addr, data)| {
    info!(self.logger, "Udp packet received from {}: length: {}", addr, data.len());
    framed.get_ref().send_to(&data, &addr); // Just echo back the data

    Ok(())
});

Я не проверял этот код, поскольку (как правильно заметил Шепмастер) у вашего фрагмента кода есть другие проблемы, но он все равно должен дать вам представление. Я повторю свое предупреждение от ранее: если вы сделаете это в реальном коде, это нарушит используемый вами сетевой протокол. В документации get_ref это выглядит так:

Обратите внимание, что следует позаботиться о том, чтобы не вмешиваться в основной поток поступающих данных, так как это может повредить поток кадров, с которыми иначе будет работать.


Чтобы ответить на новую часть вашего вопроса: да, вам нужно самостоятельно выполнить сборку, а это значит, что вашему кодеку действительно нужно выполнить кадрирование байтов, которые вы отправляете. Как правило, это может включать последовательность запуска , которая не может происходить в Vec<u8>. Начальная последовательность позволяет вам распознать начало следующего сообщения после потери пакета (что часто случается с UDP). Если в Vec<u8> нет последовательности байтов, которая не может появиться, вам нужно избегать ее, когда это произойдет. Затем вы можете отправить длину сообщения, а затем сами данные; или просто данные, сопровождаемые конечной последовательностью и контрольной суммой, чтобы вы знали, что ни одна не была потеряна. У этих дизайнов есть свои плюсы и минусы, и это большая тема сама по себе.

Вам также нужно, чтобы ваш UdpCodec содержал данные: карту от SocketAddr до частично пересобранного сообщения, которое в данный момент выполняется. В decode, если вам дано начало сообщения, скопируйте его на карту и верните Ok. Если вам дана середина сообщения, и у вас уже есть начало сообщения на карте (для этого SocketAddr), добавьте буфер в существующий буфер и верните Ok. Когда вы доберетесь до конца сообщения, верните все это и очистите буфер. Методы UdpCodec принимают &mut self, чтобы включить этот вариант использования. ( NB Теоретически вам также следует иметь дело с пакетами, поступающими не по порядку, но на самом деле это довольно редко в реальном мире.)

encode намного проще: вам просто нужно добавить ту же рамку и скопировать сообщение в буфер.

Позвольте мне повторить здесь, что вам не нужно и не следует использовать базовый сокет после вызова framed() для него. UdpFramed является как источником, так и приемником, поэтому этот объект также используется для отправки ответов. Вы даже можете использовать split(), чтобы извлечь из него отдельные реализации Stream и Sink, если это облегчит владение вашим приложением.


В целом, теперь я видел, с какой проблемой вы боретесь, я бы рекомендовал использовать несколько сокетов TCP вместо UDP. Если вам нужен надежный протокол с установлением соединения, TCP уже существует и делает это за вас. Очень просто потратить много времени на создание «надежного» слоя поверх UDP, который одновременно медленнее и менее надежен, чем TCP.

...