Наблюдаемый, который будет излучать только при подписке на - PullRequest
0 голосов
/ 26 июня 2018

Я хочу создать наблюдаемое, которое будет испускать элементы только тогда, когда подписчики его слушают. Подписчики могут быть добавлены или удалены в любое время, может быть длительная задержка, когда никакие подписчики не подключены, прежде чем новые подключаются снова. Я думаю, что один из возможных способов работы:

observable = Observable.defer(new Callable<ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> call() throws Exception {
            final AtomicInteger counter = new AtomicInteger();

            return Observable.create(new ObservableOnSubscribe<Long>() {

                @Override
                public void subscribe(ObservableEmitter<Long> e) throws Exception {
                    emitter = e;
                }
            }).doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    counter.incrementAndGet();
                    startEmitting(emitter);
                }
            }).doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    if (counter.decrementAndGet() == 0) {
                        stopEmitting(emitter);
                    }
                }
            });
        }
    });

Это решение, вероятно, сработает, однако Наблюдаемое никогда не закончится. Это проблема? Закончив с функцией stopEmitting, я думаю, мне нужно будет создать нового наблюдателя в следующий раз, когда кто-нибудь захочет подписаться? Кроме того, способ, которым мне нужно передать эмиттер в функции onSubscribe или onDispose, кажется странным, и мне интересно, является ли он потокобезопасным?

Может ли кто-нибудь порекомендовать лучшее решение?

1 Ответ

0 голосов
/ 26 июня 2018

Я не очень разбираюсь в rx-java2, но у меня есть несколько рекомендаций по наблюдаемой схеме.

Вы можете создать объект с именем ObservableEmitter. В этом классе вы можете создать такой метод подписки (подписчик подписчика) и метод emit ().

Абонентский интерфейс затем реализуется вашими подписчиками. Я бы назвал метод получения (сообщение сообщение).

В коде это будет выглядеть так:

public interface Subscriber {
  void receive(Message msg);
}

public class ObservableEmitter {
 private List<Subscriber> subscribers = new ArrayList<Subscriber>();
 public subscribe(Subscriber sub) {
  subscribers.add(sub);
 }
 public void emit(Message msg) {
   for(Subscriber sub : subscribers) {
     sub.receive(msg);
   }
 }
}

Таким образом, вы отправите сообщение, только если есть подписчик. В вашем минимальном примере отсутствует код: startEmitting () и способ вызова observable

...