Не собираюсь лгать, я с @shepmaster об этом, ваш вопрос довольно неясен.Тем не менее, чувствует , как будто вы пытаетесь сделать что-то, что mpsc
часть futures
не предназначена для выполнения.
В любом случае.Время объяснения.
Всякий раз, когда вы объединяете / компонуете потоки (или фьючерсы!), Каждый отдельный метод композиции занимает self
, а не &self
или &mut self
, как я думаю, вы могли надеяться.
В тот момент, когда вы добираетесь до этого своего блока кода:
{
let mut maybe_stream = arc.lock().unwrap();
let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"
let rx = stream.for_each(|_| Ok(()));
tokio::spawn(rx);
}
... Поток извлекается из Arc<Option<Receiver<T>>>
, когда вы take()
его, и его содержимое заменяется на None
.Затем вы создаете его на реакторе Токио, который начинает обработку этой части.Это rx
теперь в цикле и больше не доступно для вас.Кроме того, ваш maybe_stream
теперь содержит None
.
После задержки вы затем пытаетесь take()
содержимое Arc<Option<Receiver<T>>>
(строка A).Так как теперь ничего не осталось, вы остались ни с чем, и поэтому не осталось ничего, чтобы закрыть.Ваш код содержит ошибки.
Вместо того, чтобы передавать mpsc::Receiver
и надеяться уничтожить его, используйте механизм для остановки самого потока.Вы можете сделать это самостоятельно или использовать для этого ящик типа stream-cancel
.
Здесь сделана DIY-версия, измененная с вашего кода:
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let c_b_copy = Arc::clone(&circuit_breaker);
tokio::spawn(
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.and_then(move |_| {
// We set the CB to true in order to stop processing of the stream
circuit_breaker.store(true, Ordering::Relaxed);
Ok(())
}),
);
{
let rx2 = rx.for_each(|e| {
println!("{:?}", e);
Ok(())
});
tokio::spawn(rx2);
}
tokio::spawn(
Interval::new_interval(std::time::Duration::from_millis(100))
.take(100)
// take_while causes the stream to continue as long as its argument returns a future resolving to true.
// In this case, we're checking every time if the circuit-breaker we've introduced is false
.take_while(move |_| {
future::ok(
c_b_copy.load(Ordering::Relaxed) == false
);
})
.map_err(|e| {
eprintln!("Interval error?! {:?}", e);
})
.fold((tx, 0), |(tx, i), _| {
tx.send(i as u32)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
);
Ok(())
}));
}
Playground
Добавленный take_while()
позволяет вам работать либо с содержимым потока, либо с внешним предикатом, чтобы продолжить или остановить поток.Обратите внимание, что, хотя мы используем AtomicBool
, нам все еще нужен Arc
из-за 'static
требований к сроку службы Tokio.
Изменение потока
После некоторого обсуждения вкомментарии, это решение может больше подойти для вашего случая использования.Я эффективно реализовал поток разветвления, охватываемый автоматическим выключателем.Волшебство происходит здесь:
impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
match self.inner.as_mut() {
Some(ref mut r) => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
match breaker.status {
false => {
let item = r.poll();
match &item {
&Ok(Async::Ready(Some(ref i))) => {
breaker.registry.iter_mut().for_each(|sender| {
sender.try_send(i.clone()).expect("Dead channel");
});
item
},
_ => item
}
},
true => Ok(Async::Ready(None))
}
}
_ => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
// Stream is over, drop all the senders
breaker.registry = vec![];
Ok(Async::Ready(None))
}
}
}
}
Если индикатор состояния установлен в false, вышеуказанный поток опрашивается;Затем результат отправляется всем слушателям.Если результат poll
равен Async::Ready(None)
(указывая, что поток завершен), все каналы слушателя закрываются.
Если для индикатора состояния установлено значение true, все каналы слушателя закрываются, иПоток возвращает Async::Ready(None)
(и исключен из исполнения Токио).
Объект FanOut
является клонируемым, но только начальный экземпляр будет делать что-либо.