Как говорится в сообщении об ошибке:
разрешение несоответствия типов <ForEach<Receiver<Ping>, Result<(), futures::Never>, [closure]>
как <futures::Future>::Item == ()
spawn_local
требует, чтобы переданное ему будущее имело связанный с тип Item
тип единицы / пустой кортеж / ()
. Ваше будущее не вернет этот тип.
должно занять ()
будущее, которое я передаю ему
Я не знаю, почему вы считаете, что это правда. for_each
реализует Future
путем , возвращая поток как Item
.
Это можно исправить с помощью map
для сброса потока:
executor.spawn_local(rx_1.for_each(move |Ping(size)| {
// ...
}).map(|_| ()));
Это не позволяет вашему коду компилироваться, но исправляет вашу ошибку.
Заставить код работать с большими изменениями. Вот одна возможность, представленная без комментариев, поскольку она не имеет ничего общего с вашим вопросом:
extern crate futures;
use futures::{
channel::mpsc::{self, Receiver, Sender},
executor::LocalPool,
prelude::*,
};
struct Ping(usize);
fn pinger(
rx: Receiver<Ping>,
tx: Sender<Ping>,
id: &'static str,
) -> impl Future<Item = (), Error = Never> {
rx.map_err(Never::never_into::<Box<std::error::Error>>)
.fold(tx, move |tx, Ping(size)| {
println!("{}: {}", id, size);
if size >= 10 {
println!("{}: Done", id);
tx.close().err_into().right_future()
} else {
tx.send(Ping(size + 1)).err_into().left_future()
}
})
.map(|_| ())
.map_err(move |e| panic!("Task {} failed: {}", id, e))
}
fn main() {
let mut pool = LocalPool::new();
let mut executor = pool.executor();
let (tx_1, rx_1) = mpsc::channel(1);
let (tx_2, rx_2) = mpsc::channel(1);
let tx_ignite = tx_1.clone();
executor.spawn_local(pinger(rx_1, tx_2, "Rx 1")).unwrap();
executor.spawn_local(pinger(rx_2, tx_1, "Rx 2")).unwrap();
executor
.spawn_local({
tx_ignite
.send(Ping(0))
.map(drop)
.map_err(|e| panic!("{:?}", e))
})
.unwrap();
pool.run(&mut executor);
}