Я провожу некоторые сетевые эксперименты, чтобы измерить, как быстро Rust может отправлять TCP-пакеты по сети.
Для этого я генерирую вектор из 100 случайных пакетов с произвольным размером, а затем зацикливаюсь на этом массиве., отправка пакета и чтение его обратно.
Для нескольких тысяч пакетов это работает нормально, однако программа блокирует, если количество отправляемых пакетов составляет 50 КБ или более.
Сначала я подумалэто может быть некоторое ограничение в ядре / стеке TCP, и я написал эквивалентную (я думаю) Java-программу.Удивительно, но Java работает нормально (он заканчивается независимо от целевого числа пакетов), но Rust застревает.
Я наблюдал это в macOS 10.14.2 с Java 9.0.4 и Linux с OpenJDK 9.
Эти программы эквивалентны?Если нет, то в чем принципиальная разница?
Если они эквивалентны, чем объясняется поведение Rust?
Rust
extern crate argparse;
extern crate rand;
use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Instant;
use argparse::{ArgumentParser, Store, StoreTrue};
use rand::Rng;
fn start_server(port: u16) {
let addr = format!("{}:{}", "0.0.0.0", port);
let listener = TcpListener::bind(addr).expect("Could not bind");
for stream in listener.incoming() {
match stream {
Err(e) => eprintln!("failed: {}", e),
Ok(stream) => {
handle_client(stream);
}
}
}
}
fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
println!("Incoming connection from: {}", stream.peer_addr()?);
let mut buf = [0; 512];
loop {
let bytes_read = stream.read(&mut buf)?;
if bytes_read == 0 {
return Ok(());
}
stream.write(&buf[..bytes_read])?;
}
}
fn start_client(port: u16, packets_to_send: u32, packet_size: u32) {
let addr = format!("{}:{}", "127.0.0.1", port);
let mut stream = TcpStream::connect(addr).expect("Could not connect to server");
let nb_packets = 100;
let start = Instant::now();
let random_packets = gen_random_packets(nb_packets, packet_size);
let stop = Instant::now().duration_since(start);
println!("Packets generated, took {:?}", stop);
let mut packets_sent = 1;
let start = Instant::now();
loop {
let mut buffer = [0 as u8; 32];
let index = packets_to_send % nb_packets as u32;
//println!("sent: {} missing: {} mod:{}", packets_sent, packets_to_send, index);
let to_send = &random_packets[index as usize];
stream.write(to_send).expect("Failed to write to server");
stream
.read(&mut buffer)
.expect("Failed to read from server.");
packets_sent += 1;
if packets_sent > packets_to_send {
break;
}
if (packets_sent % 1000) == 0 {
println!("Packets sent {} / total {}", packets_sent, packets_to_send);
}
}
let stop = Instant::now().duration_since(start);
println!("Wrote {} packets in {:?}", packets_sent, stop)
}
fn gen_random_packets(nb: u16, size: u32) -> Vec<Vec<u8>> {
let mut random_bytes = Vec::with_capacity(nb as usize);
for _ in 0..nb {
random_bytes.push((0..size).map(|_| rand::random::<u8>()).collect());
}
random_bytes
}
fn main() {
// default configuration
let mut port = 8888;
let mut server = false;
let mut packets_to_send = 100;
let mut packet_size = 256;
//read config parameters from stdin, if any
{
let mut ap = ArgumentParser::new();
ap.set_description("Rust network.");
ap.refer(&mut server).add_option(
&["--server"],
StoreTrue,
"Whether to spawn a server or a client, defaults to client.",
);
ap.refer(&mut port)
.add_option(&["--port"], Store, "Listening port");
ap.refer(&mut packets_to_send).add_option(
&["--packets"],
Store,
"Number of packets to send",
);
ap.refer(&mut packet_size)
.add_option(&["--packet-size"], Store, "Packet size");
ap.parse_args_or_exit();
}
if server {
start_server(port);
} else {
println!(
"Starting client, connecting to port {}. Sending {} packets.",
port, packets_to_send
);
start_client(port, packets_to_send, packet_size);
}
}
Суть
Java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Random;
public class Network {
public static void main(String[] args) throws IOException {
System.out.println(Arrays.toString(args));
int port = Integer.valueOf(args[0]);
boolean server =Boolean.valueOf(args[1]);
int packets_to_send = Integer.valueOf(args[2]);
if(server){
start_server(port);
} else {
start_client(port,packets_to_send);
}
}
private static void start_server(int port) throws IOException {
System.out.println("Starting server");
ServerSocket ss = new ServerSocket(port);
while (true) {
System.out.println("Listening");
Socket sock = ss.accept();
try{
OutputStream os = sock.getOutputStream();
InputStream is = sock.getInputStream();
byte[] buffer = new byte[512];
while (true) {
is.read(buffer);
os.write(buffer);
}
} catch (Exception e){
System.out.println("Exception e"+e.getMessage());
}
}
}
private static void start_client(int port, int packets_to_send) throws IOException {
System.out.println("Starting client");
Socket sock = new Socket("localhost",port);
OutputStream os = sock.getOutputStream();
InputStream is = sock.getInputStream();
Random rand = new Random();
byte[][] packets = new byte[100][256];
long start = System.currentTimeMillis();
for(int i = 0; i < packets.length; i++) {
rand.nextBytes(packets[i]);
}
System.out.println(String.format("Packets generated: %d", (System.currentTimeMillis() - start)));
int packets_sent=1;
byte[] buff = new byte[256];
start = System.currentTimeMillis();
while(packets_sent < packets_to_send) {
os.write(packets[ packets_to_send % packets.length]);
is.read(buff);
packets_sent++;
}
System.out.println(String.format("Packets sent: %d Time: %d", packets_sent, System.currentTimeMillis() - start));
System.out.println("Done!");
}
}
Суть