Как запустить фьючерс, содержащий заимствованный TcpStream одновременно? - PullRequest
1 голос
/ 02 февраля 2020

Я пытаюсь заставить этот фрагмент кода выполняться одновременно, а не последовательно, поскольку число пиров может быть большим значением. Я использую async_std 1.4 и rust 1.41

pub struct Peer {
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],
}

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()> {
    for peer in peers.values() {
        let mut stream = &*peer.tcp_stream;
        stream.write_all(&bincode::serialize(&message)?).await?;
    }
    Ok(())
}

Я безуспешно пытался использовать метод futures::future::join_all, поскольку для переноса будущего, которое я создал и использовал в async_std::task::spawn, требуется состояние c продолжительность жизни. Вот что я попробовал:

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) {
    let handles = peers.values().into_iter().map(|peer| {
        task::spawn(
            async {
                let mut stream = &*peer.tcp_stream;
                if let Err(err) = stream
                    .write_all(&bincode::serialize(&message).unwrap())
                    .await
                {
                    error!("Error when writing to tcp_stream: {}", err);
                }
            }
        )
    });
    futures::future::join_all(handles).await;
}

Я уверен, что есть какой-то метод, который мне не хватает, спасибо за любую помощь!

Ответы [ 2 ]

1 голос
/ 02 февраля 2020

Поскольку вы пытаетесь отправить сообщение одновременно, каждая задача должна иметь свою собственную копию сообщения:

use async_std::{task, net::TcpStream};
use futures::{future, io::AsyncWriteExt};
use serde::Serialize;
use std::{
    collections::HashMap,
    error::Error,
    sync::Arc,
};

pub struct Peer {
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],
}

#[derive(Serialize)]
struct Protocol;

async fn send_to_all_peers(
    message: Protocol,
    peers: &HashMap<String, Peer>)
    -> Result<(), Box<dyn Error>>
{
    let msg = bincode::serialize(&message)?;
    let handles = peers.values()
        .map(|peer| {
            let msg = msg.clone();
            let socket = peer.tcp_stream.clone();
            task::spawn(async move {
                let mut socket = &*socket;
                socket.write_all(&msg).await
            })
        });

    future::try_join_all(handles).await?;
    Ok(())
}
0 голосов
/ 02 февраля 2020

Вы пробовали что-то вроде

let handles = peers.values().into_iter().map(|peer| {
   let mut stream = &*peer.tcp_stream;
   stream.write_all(&bincode::serialize(&message).unwrap())
}
let results = futures::future::join_all(handles).await

?

Обратите внимание, что замыкание .map не ожидает, а прямо вверх возвращает будущее, которое затем передается в join_all, и тогда ждал.

...