Как создать Flowable, который завершается, как только его последний подписчик отменяет свою подписку - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть случай, когда мне нужно создать реактивный поток, состоящий из нескольких элементов, который может быть выполнен двумя различными способами.

Моя цепочка выглядит примерно так:

В верхней части цепочки у меня есть несколько Flowable производителей, каждый из которых регулярно опрашивает разные удаленные ресурсы и отправляет сообщения, когда получаетновые результаты.

Каждый из этих производителей имеет одного промежуточного подписчика, который преобразует сообщения.

В свою очередь, все эти промежуточные подписчики имеют одного корневого подписчика, который объединяет преобразованные события извсех в один Flowable

Я хотел бы иметь возможность отменить любую из подписок корневого подписчика, и чтобы это событие отмены подписки в основном вызывало отмену подписки (это слово?) для завершения.

Другими словами, я хочу, чтобы процесс опроса прекратился, если никто не слушает.Если мне когда-нибудь понадобится перезапустить опрос, я просто перестрою новую цепочку.

Существуют ли какие-либо операторы RxJava, которые позволяют мне создавать такого рода отношения между производителем и подписчиком?А если нет, то каков будет самый простой способ сделать это?

У меня есть какое-то решение прямо сейчас, но оно не кажется очень хорошим решением и требует большого количества шаблонного кода для каждого элемента вцепь.Это выглядит примерно так:

private class AutoUnsubscribing {
    private final AtomicReference<Subscription> upstreamSubscription = new AtomicReference<>(null);
    private final PublishProcessor<LedgerEvent> publisher = PublishProcessor.create();

    public Flowable<LedgerEvent> getFlowable(Flowable<LedgerEvent> upstream) {
        upstream.subscribe(new Subscriber<LedgerEvent>() {
            @Override
            public void onSubscribe(Subscription s) {
                upstreamSubscription.set(s);
            }

            @Override
            public void onNext(LedgerEvent ledgerEvent) {
                publisher.onNext(ledgerEvent);
            }

            @Override
            public void onError(Throwable t) {
                publisher.onError(t);
            }

            @Override
            public void onComplete() {
                publisher.onComplete();
            }
        });

        return publisher.doOnCancel(() -> {
            if (!publisher.hasSubscribers()) {
                // no more subscribers. unsubscribe from upstream and complete this stream
                upstreamSubscription.get().cancel();
                upstreamSubscription.set(null);
                publisher.onComplete();
            }
        });
    }
}

Но должно быть лучшее решение, чем это.

1 Ответ

0 голосов
/ 07 марта 2019

Теперь, когда я приобрел немного больше опыта с RxJava, я могу сформулировать ответ на свой вопрос.По сути, наша цепочка следующая:

+----------+      +-------------+      +----------+
| consumer |←-----| transformer |←-----| producer |
+----------+      +-------------+      +----------+

Что было действительно необходимо, так это способ сделать transformer отменяемым таким образом, чтобы при отмене его подписка на producerбыл отменен (так что producer мог выполнить некоторую очистку перед тем, как завершить себя), И transformer завершил себя (так, чтобы consumer узнал об этом).

Лучшее решение, которое я смог найти, было реализоватьоператор detachable, подобный следующему:

public static <T> FlowableTransformer<T, T> detachable(
            Consumer<Disposable> disposableConsumer) {
    PublishProcessor<Object> stopper = PublishProcessor.create();
    Disposable disposable = new Disposable() {
        private AtomicBoolean disposed = new AtomicBoolean(false);

        @Override
        public void dispose() {
            if (disposed.compareAndSet(false, true)) {
                stopper.onNext(new Object());
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed.get();
        }
    };

    return upstream -> upstream.takeUntil(stopper)
            .doOnSubscribe(s -> disposableConsumer.accept(disposable));

}

Который затем можно использовать таким образом:

AtomicReference<Disposable> transformerDisposable = new AtomicReference<>();

producer
    .compose(detachable(transformerDisposable::set))
    .subscriber(consumer)

Тогда transfomer можно отменить по желанию, используя

transformerDisposable.dispose()

Что действительно заставит producer назвать его собственным onUnsubscribe, а consumer получить уведомление onComplete.

...