Выберите из списка сокетов, используя фьючерсы - PullRequest
6 голосов
/ 08 июля 2019

Я пробую все еще нестабильный синтаксис ожидания асинхронности в ночной Rust 1.38 с futures-preview = "0.3.0-alpha.16" и runtime = "0.3.0-alpha.6". Это действительно круто, но документов (пока) мало, и я застрял.

Чтобы выйти за рамки базовых примеров Я хотел бы создать приложение, которое:

  1. Принимает TCP-соединения для данного порта;
  2. Передает все данные, полученные из любого соединения, на все активные соединения.

Существующие документы и примеры позволили мне зайти так далеко:

#![feature(async_await)]
#![feature(async_closure)]

use futures::{
    prelude::*,
    select,
    future::select_all,
    io::{ReadHalf, WriteHalf, Read},
};

use runtime::net::{TcpListener, TcpStream};

use std::io;

async fn read_stream(mut reader: ReadHalf<TcpStream>) -> (ReadHalf<TcpStream>, io::Result<Box<[u8]>>) {
    let mut buffer: Vec<u8> = vec![0; 1024];
    match reader.read(&mut buffer).await {
        Ok(len) => {
            buffer.truncate(len);
            (reader, Ok(buffer.into_boxed_slice()))
        },
        Err(err) => (reader, Err(err)),
    }
}

#[runtime::main]
async fn main() -> std::io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080")?;
    println!("Listening on {}", listener.local_addr()?);

    let mut incoming = listener.incoming().fuse();
    let mut writers: Vec<WriteHalf<TcpStream>> = vec![];
    let mut reads = vec![];

    loop {
        select! {
            maybe_stream = incoming.select_next_some() => {
                let (mut reader, writer) = maybe_stream?.split();
                writers.push(writer);
                reads.push(read_stream(reader).fuse());
            },
            maybe_read = select_all(reads.iter()) => {
                match maybe_read {
                    (reader, Ok(data)) => {
                        for writer in writers {
                            writer.write_all(data).await.ok(); // Ignore errors here
                        }
                        reads.push(read_stream(reader).fuse());
                    },
                    (reader, Err(err)) => {
                        let reader_addr = reader.peer_addr().unwrap();
                        writers.retain(|writer| writer.peer_addr().unwrap() != reader_addr);
                    },
                }
            }
        }
    }
}

Это не с:

error: recursion limit reached while expanding the macro `$crate::dispatch`
  --> src/main.rs:36:9
   |
36 | /         select! {
37 | |             maybe_stream = incoming.select_next_some() => {
38 | |                 let (mut reader, writer) = maybe_stream?.split();
39 | |                 writers.push(writer);
...  |
55 | |             }
56 | |         }
   | |_________^
   |
   = help: consider adding a `#![recursion_limit="128"]` attribute to your crate
   = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)

Это очень запутанно. Может я неправильно использую select_all()? Любая помощь в его создании приветствуется!

Для полноты моей Cargo.toml:

[package]
name = "async-test"
version = "0.1.0"
authors = ["xxx"]
edition = "2018"

[dependencies]
runtime = "0.3.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }

1 Ответ

2 голосов
/ 08 июля 2019

На случай, если кто-то подписался, я наконец-то взломал это.Этот код работает:

#![feature(async_await)]
#![feature(async_closure)]
#![recursion_limit="128"]

use futures::{
    prelude::*,
    select,
    stream,
    io::ReadHalf,
    channel::{
        oneshot,
        mpsc::{unbounded, UnboundedSender},
    }
};

use runtime::net::{TcpListener, TcpStream};

use std::{
    io,
    net::SocketAddr,
    collections::HashMap,
};

async fn read_stream(
    addr: SocketAddr,
    drop: oneshot::Receiver<()>,
    mut reader: ReadHalf<TcpStream>,
    sender: UnboundedSender<(SocketAddr, io::Result<Box<[u8]>>)>
) {
    let mut drop = drop.fuse();
    loop {
        let mut buffer: Vec<u8> = vec![0; 1024];
        select! {
            result = reader.read(&mut buffer).fuse() => {
                match result {
                    Ok(len) => {
                        buffer.truncate(len);
                        sender.unbounded_send((addr, Ok(buffer.into_boxed_slice())))
                            .expect("Channel error");
                        if len == 0 {
                            return;
                        }
                    },
                    Err(err) => {
                        sender.unbounded_send((addr, Err(err))).expect("Channel error");
                        return;
                    }
                }
            },
            _ = drop => {
                return;
            },
        }
    }
}

enum Event {
    Connection(io::Result<TcpStream>),
    Message(SocketAddr, io::Result<Box<[u8]>>),
}

#[runtime::main]
async fn main() -> std::io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080")?;
    eprintln!("Listening on {}", listener.local_addr()?);

    let mut writers = HashMap::new();
    let (sender, receiver) = unbounded();

    let connections = listener.incoming().map(|maybe_stream| Event::Connection(maybe_stream));
    let messages = receiver.map(|(addr, maybe_message)| Event::Message(addr, maybe_message));
    let mut events = stream::select(connections, messages);

    loop {
        match events.next().await {
            Some(Event::Connection(Ok(stream))) => {
                let addr = stream.peer_addr().unwrap();
                eprintln!("New connection from {}", addr);

                let (reader, writer) = stream.split();
                let (drop_sender, drop_receiver) = oneshot::channel();

                writers.insert(addr, (writer, drop_sender));
                runtime::spawn(read_stream(addr, drop_receiver, reader, sender.clone()));
            },
            Some(Event::Message(addr, Ok(message))) => {
                if message.len() == 0 {
                    eprintln!("Connection closed by client: {}", addr);
                    writers.remove(&addr);
                    continue;
                } 
                eprintln!("Received {} bytes from {}", message.len(), addr);
                if &*message == b"quit\n" {
                    eprintln!("Dropping client {}", addr);
                    writers.remove(&addr);
                    continue;
                }
                for (&other_addr, (writer, _)) in &mut writers {
                    if addr != other_addr {
                        writer.write_all(&message).await.ok(); // Ignore errors
                    }
                }
            },
            Some(Event::Message(addr, Err(err))) => {
                eprintln!("Error reading from {}: {}", addr, err);
                writers.remove(&addr);
            },
            _ => panic!("Event error"),
        }
    }
}

Я использую channel и запускаю задачу чтения для каждого клиента.Особое внимание нужно было уделить тому, чтобы читатели были довольны писателями: вот почему используется oneshot будущее.Когда oneshot::Sender отбрасывается, будущее oneshot::Receiver преобразуется в отмененное состояние, что является механизмом уведомления для задачи чтения, которая знает, что пора остановиться.Чтобы продемонстрировать, что это работает, мы отбрасываем клиента, как только получаем сообщение «quit».

К сожалению, есть (на первый взгляд бесполезное) предупреждение о неиспользованном JoinHandle из вызова runtime::spawn, иЯ действительно не знаю, как это устранить.

...