Как закрыть измененный и исполняющий поток `futures :: sync :: mpsc :: Receiver`? - PullRequest
0 голосов
/ 23 декабря 2018

Я хотел бы иметь возможность что-то сделать в этом направлении, чтобы асинхронно закрывать поток Receiver:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::stream::AndThen;
use futures::sync::mpsc::Receiver;
use futures::{Future, Sink, Stream};
use std::sync::{Arc, Mutex};

use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let arc = Arc::new(Mutex::<Option<AndThen<Receiver<u32>, _, _>>>::new(None));

        {
            let mut and_then = arc.lock().unwrap();
            *and_then = Some(rx.and_then(|num| {
                println!("{}", num);
                Ok(())
            }));
        }

        let arc_clone = arc.clone();
        // This is the part I'd like to be able to do
        // After one second, close the `Receiver` so that future
        // calls to the `Sender` don't call the callback above in the
        // closure passed to `rx.and_then`
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    let mut maybe_stream = arc_clone.lock().unwrap();
                    match maybe_stream.take() {
                        Some(stream) => stream.into_inner().close(),
                        None => eprintln!("Can't close non-existent stream"), // line "A"
                    }
                    Ok(())
                }),
        );

        {
            let mut maybe_stream = arc.lock().unwrap();
            let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

            let rx = stream.for_each(|_| Ok(()));
            tokio::spawn(rx);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Детская площадка

Однакострока A запускается, потому что мне нужно переместить поток в строку B, чтобы вызвать на нем .for_each.Если я не позвоню .for_each (или как-то так), я не смогу выполнить AndThen, насколько я знаю.Я не могу вызвать .for_each без фактического перемещения объекта, потому что for_each - это метод перемещения.

Могу ли я сделать то, что пытаюсь сделать?Кажется, что это определенно должно быть возможно, но, возможно, я упускаю что-то очевидное.

Я использую фьючерсы на 0,1 и Токио на 0,1.

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

Не собираюсь лгать, я с @shepmaster об этом, ваш вопрос довольно неясен.Тем не менее, чувствует , как будто вы пытаетесь сделать что-то, что mpsc часть futures не предназначена для выполнения.

В любом случае.Время объяснения.

Всякий раз, когда вы объединяете / компонуете потоки (или фьючерсы!), Каждый отдельный метод композиции занимает self, а не &self или &mut self, как я думаю, вы могли надеяться.

В тот момент, когда вы добираетесь до этого своего блока кода:

    {
        let mut maybe_stream = arc.lock().unwrap();
        let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

        let rx = stream.for_each(|_| Ok(()));
        tokio::spawn(rx);
    }

... Поток извлекается из Arc<Option<Receiver<T>>>, когда вы take() его, и его содержимое заменяется на None.Затем вы создаете его на реакторе Токио, который начинает обработку этой части.Это rx теперь в цикле и больше не доступно для вас.Кроме того, ваш maybe_stream теперь содержит None.

После задержки вы затем пытаетесь take() содержимое Arc<Option<Receiver<T>>> (строка A).Так как теперь ничего не осталось, вы остались ни с чем, и поэтому не осталось ничего, чтобы закрыть.Ваш код содержит ошибки.

Вместо того, чтобы передавать mpsc::Receiver и надеяться уничтожить его, используйте механизм для остановки самого потока.Вы можете сделать это самостоятельно или использовать для этого ящик типа stream-cancel.

Здесь сделана DIY-версия, измененная с вашего кода:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
        let c_b_copy = Arc::clone(&circuit_breaker);
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    // We set the CB to true in order to stop processing of the stream
                    circuit_breaker.store(true, Ordering::Relaxed);
                    Ok(())
                }),
        );

        {
            let rx2 = rx.for_each(|e| {
                println!("{:?}", e);
                Ok(())
            });
            tokio::spawn(rx2);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(100))
                .take(100)
                // take_while causes the stream to continue as long as its argument returns a future resolving to true.
                // In this case, we're checking every time if the circuit-breaker we've introduced is false
                .take_while(move |_| {
                    future::ok(
                        c_b_copy.load(Ordering::Relaxed) == false
                    );
                })
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Playground

Добавленный take_while() позволяет вам работать либо с содержимым потока, либо с внешним предикатом, чтобы продолжить или остановить поток.Обратите внимание, что, хотя мы используем AtomicBool, нам все еще нужен Arc из-за 'static требований к сроку службы Tokio.

Изменение потока

После некоторого обсуждения вкомментарии, это решение может больше подойти для вашего случая использования.Я эффективно реализовал поток разветвления, охватываемый автоматическим выключателем.Волшебство происходит здесь:

impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {

    type Item = S::Item;

    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.inner.as_mut() {
            Some(ref mut r) => {
                let mut breaker = self.breaker.write().expect("Poisoned lock");
                match breaker.status {
                    false => {
                        let item = r.poll();
                        match &item {
                            &Ok(Async::Ready(Some(ref i))) => {
                                breaker.registry.iter_mut().for_each(|sender| {
                                    sender.try_send(i.clone()).expect("Dead channel");
                                });
                                item
                            },
                            _ => item
                        }
                    },
                    true => Ok(Async::Ready(None))
                }
            }
            _ => {

                let mut breaker = self.breaker.write().expect("Poisoned lock");
                // Stream is over, drop all the senders

                breaker.registry = vec![];
                Ok(Async::Ready(None))
            }
        }
    }
}

Если индикатор состояния установлен в false, вышеуказанный поток опрашивается;Затем результат отправляется всем слушателям.Если результат poll равен Async::Ready(None) (указывая, что поток завершен), все каналы слушателя закрываются.

Если для индикатора состояния установлено значение true, все каналы слушателя закрываются, иПоток возвращает Async::Ready(None) (и исключен из исполнения Токио).

Объект FanOut является клонируемым, но только начальный экземпляр будет делать что-либо.

0 голосов
/ 24 декабря 2018

Вы можете использовать ящик типа stream-cancel для достижения этой цели.Здесь я использовал потоковую обертку Valved, которая берет существующий поток и возвращает значение, которое вы можете использовать для отмены потока позже:

use futures::{
    future::lazy,
    {Future, Sink, Stream},
}; // 0.1.25
use stream_cancel::Valved; // 0.4.4
use tokio::timer::{Delay, Interval}; // 0.1.13

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);
        let (trigger, rx) = Valved::new(rx);

        tokio::spawn({
            rx.for_each(|num| {
                println!("{}", num);
                Ok(())
            })
        });

        tokio::spawn({
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .map(move |_| trigger.cancel()),
        });

        tokio::spawn({
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| eprintln!("Interval error?! {:?}", e))
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        });

        Ok(())
    }));
}

В корзине есть другиетипы, подходящие для немного разных вариантов использования, обязательно ознакомьтесь с документацией.

См. Ответ Себастьена Рено , чтобы узнать, как реализовать это самостоятельно.

...