У меня есть будущее Токио, которое никогда не завершается (rx
- это Receiver
, а sock
- это Токио UdpSocket
).Он в основном считывает пакеты из очереди пакетов и передает их через сокет:
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
Он выполняется до строки poll_send_to
(println!
непосредственно перед выполнением poll_send_to
, println!
сразу посленет), а затем ждет бесконечно, не посылая пакет.
Я заменил вышеприведенное будущее на следующее, чтобы убедиться, что это не проблема с сокетом (у меня были некоторые проблемы с тем, что, как мне кажется, были нестабильными уведомлениями ранее):
poll_fn(move || {
let packet = vec![0;10];
let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e))
Это будущее сработало отлично - оно отправило пакет, как ожидалось, и вышло из программы.
Не думаю, что проблема в каналах сообщений, учитывая, что rx
может poll
успешно и печатает сообщение println
.Я не думаю, что проблема с сокетом либо, учитывая, что второе будущее работает.Я наблюдаю за пакетами напрямую через Wireshark, поэтому я не думаю, что это проблема с моими наблюдениями.
Я довольно новичок в Rust и Tokio, поэтому возможно, что я упускаю из виду какой-то основной факт (например, не может try_ready
дважды в одном и том же будущем, будущее не возобновляется с того места, где оно остановилось ранее, и т. д.).
Можете ли вы помочь мне выяснить проблему с первым будущим?
use futures::future::lazy;
use futures::stream::Stream;
use futures::try_ready;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::future::poll_fn;
use tokio::prelude::Future;
fn main() {
let mut sock = UdpSocket::bind(&SocketAddr::from_str("127.0.0.1:8000").expect("Parse error"))
.expect("Bind error");
let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, SocketAddr)>(2000);
tokio::run(lazy(move || {
//----------------- This future works ----------------//
// tokio::spawn(
// poll_fn(move || {
// let packet = vec![70; 10];
// let to = SocketAddr::from_str("127.0.0.1:8001").expect("Parse error");
// try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// Ok(futures::Async::Ready(()))
// })
// .map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
// );
//----------------- This future doesn't ----------------//
tokio::spawn(
poll_fn(move || {
match try_ready!(rx
.poll()
.map_err(|_e| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Poll error")))
{
Some((packet, to)) => {
// This is printed
println!(
"Rx: Received {} bytes for {}: {:?}",
packet.len(),
to,
packet.as_slice(),
);
try_ready!(sock.poll_send_to(packet.as_slice(), &to));
// This is never printed
println!("Sent");
}
None => println!("Rx end"),
}
Ok(futures::Async::Ready(()))
})
.map_err(|e: tokio::io::Error| println!("Error: {:?}", e)),
);
//----------------- This future queues a packet ----------------//
tokio::spawn(
poll_fn(move || {
try_ready!(tx.poll_ready());
tx.try_send((
vec![70; 10],
SocketAddr::from_str("127.0.0.1:8001").expect("Parse error"),
))
.expect("Send error");
// Wait permanently so message channel doesn't get disconnected
// Achieved differently in production
Ok(futures::Async::NotReady)
})
.map_err(|e: tokio::sync::mpsc::error::SendError| println!("Error: {:?}", e)),
);
Ok(())
}));
}
Репо