У меня есть динамическая коллекция futures::sync::mpsc::Sender
, и я хотел бы отправить сообщение каждому из них для каждого входящего соединения.
У меня оно работает с UnboundedSender
, потому что я могу просто сделатьэто (см. ниже), но Sender
потребляет себя, поэтому мне нужно удалить и заново вставить его в Vec
после отправки.Как я могу это сделать?Если Sender
блокируется, он не должен отправлять больше сообщений, а вместо этого переключиться на обработку входящих соединений на приемнике.
Ниже описана реализация UnboundedSender
, моя неудачная попытка сделать это в противном случае закомментированавстроенный (просто замените предыдущую строку закомментированной).
UnboundedSender (работает)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::unbounded();
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.unbounded_send(x).unwrap();
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
Отправитель (не работает)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::Sender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.send(x);
//^error[E0507]: cannot move out of borrowed content
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}