Я хочу создать наблюдаемое, которое будет испускать элементы только тогда, когда подписчики его слушают. Подписчики могут быть добавлены или удалены в любое время, может быть длительная задержка, когда никакие подписчики не подключены, прежде чем новые подключаются снова.
Я думаю, что один из возможных способов работы:
observable = Observable.defer(new Callable<ObservableSource<Long>>() {
@Override
public ObservableSource<Long> call() throws Exception {
final AtomicInteger counter = new AtomicInteger();
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
emitter = e;
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
counter.incrementAndGet();
startEmitting(emitter);
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
if (counter.decrementAndGet() == 0) {
stopEmitting(emitter);
}
}
});
}
});
Это решение, вероятно, сработает, однако Наблюдаемое никогда не закончится. Это проблема?
Закончив с функцией stopEmitting, я думаю, мне нужно будет создать нового наблюдателя в следующий раз, когда кто-нибудь захочет подписаться?
Кроме того, способ, которым мне нужно передать эмиттер в функции onSubscribe или onDispose, кажется странным, и мне интересно, является ли он потокобезопасным?
Может ли кто-нибудь порекомендовать лучшее решение?