Rxjava - выпустить на PublishSubject перед подпиской - PullRequest
1 голос
/ 30 апреля 2019

Рассмотрим сценарий, в котором у нас есть поток, генерирующий строки, и мы хотим сохранить строки в файлах.

Я использую PublishSubject, и это прекрасно работает:

Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")

Однако, это не работает (только some-string2 доставляется)

Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")

Есть ли способ заставить работать второй сценарий?

То есть, можем ли мы изменить PublishSubject, чтобы удостовериться, что он буферизует события до тех пор, пока подписчик их не использует?

Обратите внимание, что BehaviorSubject не вариант, потому что повторная подписка вызывает другое сохранение файла. В нем нет понятия «поглощающие события».

Я обнаружил UnicastSubject, который в значительной степени мне нужен, за исключением того, что происходит сбой с IllegalStateException, когда я отписываюсь, а затем повторно подписываюсь с другим подписчиком.


Вариант использования:

Предположим, у нас есть приложение для Android. Это делает сетевой запрос, в конце сетевого запроса он должен показать диалог. Пока выполняется запрос, пользователь создает фоновое приложение. На этом этапе мы отписываем наших наблюдателей, которые прослушивают сигнал для отображения диалога.

Сетевой запрос возвращается и сигнал, показывающий, что диалог запускается в поток. Никто не слушает в этот момент. Пользователь на первом плане приложения. Новый абонент подключается к диспетчеру сетевых запросов (ViewModel). На этом этапе я бы хотел, чтобы «неиспользованный» сигнал доставлялся абоненту.

Примечание: я не могу использовать тему поведения. Если я это сделаю, то каждый раз, когда пользователь фоном и на переднем плане приложения, появится диалоговое окно. Я хочу, чтобы событие было обработано и завершено после отображения диалогового окна.

1 Ответ

0 голосов
/ 02 мая 2019

Сделал еще несколько исследований и нашел это:

https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463

из: Очередь как Тема в RxJava

Обратите внимание, что он использует реле, но вы можете легко заменить Relay на Subject, если не хотите вносить другую зависимость.

Я открыт для других решений и, возможно, критикую, почему это решение не является хорошим.

/**
 * Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
 * to any other Observer.
 * <p>
 * This relay holds an unbounded internal buffer.
 * <p>
 * This relay allows only a single Observer at a time to be subscribed to it.
 * <p>
 * If more than one Observer attempts to subscribe to this Relay at the same time, they
 * will receive an IllegalStateException.
 *
 * @param <T> the value type received and emitted by this Relay subclass
 */
public final class CacheRelay<T> extends Relay<T> {

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();

    private CacheRelay() {
    }

    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }

    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}
...