Отправить каждому фьючерсу :: sync :: mpsc :: Sender в массиве - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть динамическая коллекция futures::sync::mpsc::Sender, и я хотел бы отправить сообщение каждому из них для каждого входящего соединения.

У меня оно работает с UnboundedSender, потому что я могу просто сделатьэто (см. ниже), но Sender потребляет себя, поэтому мне нужно удалить и заново вставить его в Vec после отправки.Как я могу это сделать?Если Sender блокируется, он не должен отправлять больше сообщений, а вместо этого переключиться на обработку входящих соединений на приемнике.

Ниже описана реализация UnboundedSender, моя неудачная попытка сделать это в противном случае закомментированавстроенный (просто замените предыдущую строку закомментированной).

UnboundedSender (работает)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
    let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
    let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
    let stream = stream::iter_ok::<_, ()>(values)
        .for_each(|v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::unbounded();
                    senders.push(sender);
                    current_thread::spawn(receiver.for_each(|v| {
                        println!("Received {}", v);
                        Ok(())
                    }))

                },
                -1 => {
                    println!("Closing channels");
                    senders.clear();
                },
                x => {
                    for sender in senders.iter() {
                        println!("Sending {}", x);
                        sender.unbounded_send(x).unwrap();
                    }
                },
            }
            Ok(())
        });

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

Отправитель (не работает)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
    let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
    let mut senders = Vec::<mpsc::Sender<i8>>::new();
    let stream = stream::iter_ok::<_, ()>(values)
        .for_each(|v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::channel(1);
                    senders.push(sender);
                    current_thread::spawn(receiver.for_each(|v| {
                        println!("Received {}", v);
                        Ok(())
                    }))

                },
                -1 => {
                    println!("Closing channels");
                    senders.clear();
                },
                x => {
                    for sender in senders.iter() {
                        println!("Sending {}", x);
                        sender.send(x);
                        //^error[E0507]: cannot move out of borrowed content
                    }
                },
            }
            Ok(())
        });

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

Ответы [ 2 ]

0 голосов
/ 10 декабря 2018

Я думаю, что я решил это - хитрость в том, чтобы передать senders и продолжать передавать его по цепочке фьючерсов.Это не обрабатывает -1 для очистки отправителей, но расширение является простым.

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink, Future, IntoFuture};
use futures::sync::mpsc;
use futures::future::Either;

fn main() {
    let values = vec![0, 1, 0, 2, 3];
    let stream = stream::iter_ok::<Vec<i8>, mpsc::SendError<i8>>(values)
        .fold(Vec::new(), |mut senders, v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::channel(0);
                    senders.push(sender);
                    let idx = senders.len();
                    current_thread::spawn(receiver.for_each(move |v| {
                        println!("Received {} in channel {}", v, idx);
                        Ok(())
                    }));
                    Either::A(Ok(senders).into_future())
                },
                value => {
                    println!("Sending {}...", value);
                    Either::B(stream::iter_ok(senders).and_then(move |tx| {
                        tx.send(value)
                    }).collect().map(move |senders| {
                        println!("Sent {}.", value);
                        senders
                    }))
                },
            }
        }).map(drop);

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

Это выводит:

Adding channel
Sending 1...
Received 1 in channel 1
Sent 1.
Adding channel
Sending 2...
Received 2 in channel 1
Received 2 in channel 2
Sent 2.
Sending 3...
Received 3 in channel 1
Received 3 in channel 2
Sent 3.
Done!
0 голосов
/ 30 ноября 2018

AFAIK, у вас есть две основные проблемы: send() берут на себя владение Sender, поэтому вам нужно куда-то клонировать, если вы хотите использовать его позже, а также оно возвращает будущее, которое вы должны каким-то образом обработать.

Существуют различные способы решения этих проблем, вот один:

extern crate futures;
extern crate tokio;

use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};

fn main() {
    let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
    let mut senders = vec![]; // remove annotations
    let stream = stream::iter_ok(values).for_each(move |v| { // move senders
        match v {
            0 => {
                println!("Adding channel");
                let (sender, receiver) = mpsc::channel(1);
                senders.push(sender);
                tokio::spawn(receiver.for_each(|v| {
                    println!("Received {}", v);
                    Ok(())
                }));
            }
            -1 => {
                println!("Closing channels");
                senders.clear();
            }
            x => {
                for sender in senders.iter() {
                    let send = sender
                        .clone() // clone sender
                        .send(x)
                        .map(move |_| println!("Sending {}", x))
                        .map_err(|e| eprintln!("error = {:?}", e));
                    tokio::spawn(send); // spawn the task
                }
            }
        }
        Ok(())
    });

    tokio::run(stream);
    println!("Done!");
}
...