У меня есть случай, когда мне нужно создать реактивный поток, состоящий из нескольких элементов, который может быть выполнен двумя различными способами.
Моя цепочка выглядит примерно так:
В верхней части цепочки у меня есть несколько Flowable
производителей, каждый из которых регулярно опрашивает разные удаленные ресурсы и отправляет сообщения, когда получаетновые результаты.
Каждый из этих производителей имеет одного промежуточного подписчика, который преобразует сообщения.
В свою очередь, все эти промежуточные подписчики имеют одного корневого подписчика, который объединяет преобразованные события извсех в один Flowable
Я хотел бы иметь возможность отменить любую из подписок корневого подписчика, и чтобы это событие отмены подписки в основном вызывало отмену подписки (это слово?) для завершения.
Другими словами, я хочу, чтобы процесс опроса прекратился, если никто не слушает.Если мне когда-нибудь понадобится перезапустить опрос, я просто перестрою новую цепочку.
Существуют ли какие-либо операторы RxJava, которые позволяют мне создавать такого рода отношения между производителем и подписчиком?А если нет, то каков будет самый простой способ сделать это?
У меня есть какое-то решение прямо сейчас, но оно не кажется очень хорошим решением и требует большого количества шаблонного кода для каждого элемента вцепь.Это выглядит примерно так:
private class AutoUnsubscribing {
private final AtomicReference<Subscription> upstreamSubscription = new AtomicReference<>(null);
private final PublishProcessor<LedgerEvent> publisher = PublishProcessor.create();
public Flowable<LedgerEvent> getFlowable(Flowable<LedgerEvent> upstream) {
upstream.subscribe(new Subscriber<LedgerEvent>() {
@Override
public void onSubscribe(Subscription s) {
upstreamSubscription.set(s);
}
@Override
public void onNext(LedgerEvent ledgerEvent) {
publisher.onNext(ledgerEvent);
}
@Override
public void onError(Throwable t) {
publisher.onError(t);
}
@Override
public void onComplete() {
publisher.onComplete();
}
});
return publisher.doOnCancel(() -> {
if (!publisher.hasSubscribers()) {
// no more subscribers. unsubscribe from upstream and complete this stream
upstreamSubscription.get().cancel();
upstreamSubscription.set(null);
publisher.onComplete();
}
});
}
}
Но должно быть лучшее решение, чем это.