Различное поведение сети в Rust и Java при максимально быстрой отправке пакетов - PullRequest
0 голосов
/ 01 февраля 2019

Я провожу некоторые сетевые эксперименты, чтобы измерить, как быстро Rust может отправлять TCP-пакеты по сети.

Для этого я генерирую вектор из 100 случайных пакетов с произвольным размером, а затем зацикливаюсь на этом массиве., отправка пакета и чтение его обратно.

Для нескольких тысяч пакетов это работает нормально, однако программа блокирует, если количество отправляемых пакетов составляет 50 КБ или более.

Сначала я подумалэто может быть некоторое ограничение в ядре / стеке TCP, и я написал эквивалентную (я думаю) Java-программу.Удивительно, но Java работает нормально (он заканчивается независимо от целевого числа пакетов), но Rust застревает.

Я наблюдал это в macOS 10.14.2 с Java 9.0.4 и Linux с OpenJDK 9.

  • Эти программы эквивалентны?Если нет, то в чем принципиальная разница?

  • Если они эквивалентны, чем объясняется поведение Rust?

Rust

extern crate argparse;
extern crate rand;

use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Instant;

use argparse::{ArgumentParser, Store, StoreTrue};
use rand::Rng;

fn start_server(port: u16) {
    let addr = format!("{}:{}", "0.0.0.0", port);
    let listener = TcpListener::bind(addr).expect("Could not bind");
    for stream in listener.incoming() {
        match stream {
            Err(e) => eprintln!("failed: {}", e),
            Ok(stream) => {
                handle_client(stream);
            }
        }
    }
}

fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
    println!("Incoming connection from: {}", stream.peer_addr()?);
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }
        stream.write(&buf[..bytes_read])?;
    }
}

fn start_client(port: u16, packets_to_send: u32, packet_size: u32) {
    let addr = format!("{}:{}", "127.0.0.1", port);
    let mut stream = TcpStream::connect(addr).expect("Could not connect to server");

    let nb_packets = 100;
    let start = Instant::now();
    let random_packets = gen_random_packets(nb_packets, packet_size);
    let stop = Instant::now().duration_since(start);
    println!("Packets generated, took {:?}", stop);

    let mut packets_sent = 1;
    let start = Instant::now();
    loop {
        let mut buffer = [0 as u8; 32];

        let index = packets_to_send % nb_packets as u32;
        //println!("sent: {} missing: {} mod:{}", packets_sent, packets_to_send, index);

        let to_send = &random_packets[index as usize];
        stream.write(to_send).expect("Failed to write to server");

        stream
            .read(&mut buffer)
            .expect("Failed to read from server.");

        packets_sent += 1;

        if packets_sent > packets_to_send {
            break;
        }

        if (packets_sent % 1000) == 0 {
            println!("Packets sent {} / total {}", packets_sent, packets_to_send);
        }
    }

    let stop = Instant::now().duration_since(start);

    println!("Wrote {} packets in {:?}", packets_sent, stop)
}

fn gen_random_packets(nb: u16, size: u32) -> Vec<Vec<u8>> {
    let mut random_bytes = Vec::with_capacity(nb as usize);

    for _ in 0..nb {
        random_bytes.push((0..size).map(|_| rand::random::<u8>()).collect());
    }

    random_bytes
}

fn main() {
    // default configuration
    let mut port = 8888;
    let mut server = false;
    let mut packets_to_send = 100;
    let mut packet_size = 256;
    //read config parameters from stdin, if any
    {
        let mut ap = ArgumentParser::new();
        ap.set_description("Rust network.");
        ap.refer(&mut server).add_option(
            &["--server"],
            StoreTrue,
            "Whether to spawn a server or a client, defaults to client.",
        );
        ap.refer(&mut port)
            .add_option(&["--port"], Store, "Listening port");
        ap.refer(&mut packets_to_send).add_option(
            &["--packets"],
            Store,
            "Number of packets to send",
        );
        ap.refer(&mut packet_size)
            .add_option(&["--packet-size"], Store, "Packet size");
        ap.parse_args_or_exit();
    }

    if server {
        start_server(port);
    } else {
        println!(
            "Starting client, connecting to port {}. Sending {} packets.",
            port, packets_to_send
        );
        start_client(port, packets_to_send, packet_size);
    }
}

Суть

Java

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Random;

public class Network {


    public static  void main(String[] args) throws IOException {

        System.out.println(Arrays.toString(args));
        int port = Integer.valueOf(args[0]);
        boolean server =Boolean.valueOf(args[1]);
        int packets_to_send = Integer.valueOf(args[2]);


        if(server){
        start_server(port);
        } else  {

            start_client(port,packets_to_send);

        }
    }

    private static void start_server(int port) throws IOException {

        System.out.println("Starting server");
        ServerSocket ss = new ServerSocket(port);

        while (true) {
            System.out.println("Listening");
            Socket sock = ss.accept();

            try{

            OutputStream os = sock.getOutputStream();
            InputStream is = sock.getInputStream();


            byte[] buffer = new byte[512];
            while (true) {
                is.read(buffer);
                os.write(buffer);
            }
            } catch (Exception e){
                System.out.println("Exception e"+e.getMessage());
            }
        }
    }

    private static void start_client(int port, int packets_to_send) throws IOException {

        System.out.println("Starting client");
        Socket sock = new Socket("localhost",port);

        OutputStream os = sock.getOutputStream();
        InputStream is = sock.getInputStream();

        Random rand = new Random();
        byte[][] packets = new byte[100][256];

        long start = System.currentTimeMillis();
        for(int i = 0; i < packets.length; i++) {
            rand.nextBytes(packets[i]);
        }

        System.out.println(String.format("Packets generated: %d", (System.currentTimeMillis() - start)));

        int packets_sent=1;

        byte[] buff = new byte[256];


        start = System.currentTimeMillis();
        while(packets_sent < packets_to_send) {

            os.write(packets[ packets_to_send % packets.length]);

            is.read(buff);
            packets_sent++;
        }

        System.out.println(String.format("Packets sent: %d Time: %d", packets_sent, System.currentTimeMillis() - start));

        System.out.println("Done!");

    }
}

Суть

1 Ответ

0 голосов
/ 01 февраля 2019

Вы пишете 256 октетов:

let mut packet_size = 256;

, но вы читаете только 32 в вашем клиенте:

let mut buffer = [0 as u8; 32];

Таким образом, буфер записи на сервере заполнен, потому что клиент не делаетпрочитайте все доступные данные, поэтому сервер заблокирует свой вызов записи, ожидая, что клиент прочитает пакет, и клиент также заблокирует вызов записи, потому что сервер прекратил чтение сокета.

Вы можетеисправьте это, увеличив размер буфера чтения:

let mut buffer = [0u8; 512];

Или лучше, если вы всегда читаете сокет, когда есть доступные данные, это должна делать любая нетривиальная программа.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...