Я пытаюсь понять, как работает futures::sync::mpsc::Receiver
.В приведенном ниже примере поток получателя спит в течение двух секунд, а отправитель отправляет каждую секунду.
Я ожидаю, что отправитель должен быть заблокирован из-за ожидания и отправки после освобождения буфера.
Вместо этого я вижу, что через некоторое время он заблокирован.Увеличение буфера канала только увеличивает время, пока он не будет заблокирован.
Что я должен сделать, чтобы отправитель отправлял данные, когда буфер был доступен, и оказывал некоторое противодавление на отправителя в таких случаях?futures::sync::mpsc::channel
имеет собственную документацию, но я не понимаю, как правильно ее использовать.
extern crate futures;
extern crate tokio_core;
use std::{thread, time};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
let mut core = Core::new().expect("Failed to create core");
let remote = core.remote();
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || loop {
let tx = tx.clone();
let delay = time::Duration::from_secs(1);
thread::sleep(delay);
let f = ::futures::done::<(), ()>(Ok(()));
remote.spawn(|_| {
f.then(|res| {
println!("Sending");
tx.send(res).wait();
println!("Sent");
Ok(())
})
});
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let f2 = rx.for_each(|res| {
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
});
core.run(f2).expect("Core failed to run");
}