Накопить и выбросить поведение в RxJava 2 - PullRequest
0 голосов
/ 21 июня 2019

У меня есть класс Status, представляющий некоторый статус задачи. У меня есть метод со следующей подписью

def update(existing: Status, update: Status): Status

Функция принимает старый статус и новое обновление и возвращает новый объект Status, объединяющий эти два.

Поведение, которое я хочу получить от моего пользовательского предмета, следующее:

  • Поведение записи

    • Он должен содержать экземпляр Status. Давайте назовем это current
    • Я должен иметь возможность подписаться на данный Flowable<Status>
    • При каждом новом обновлении статуса (через вышеуказанное Flowable) он будет вызывать метод update, как указано выше, и сохранять результат в current.
  • Поведение при чтении

    • Периодическое чтение. Должно быть в состоянии периодически отображать текущее состояние. Что-то вроде Flowable.interval(1 second).map(i->current)

Я могу достичь вышеуказанного, используя класс StatusHolder для поведения записи (которое выполняет удержание и обновление) и отдельную подписку на Flowable.interval(1 second).map(i->holder.current).

После реализации этого я натолкнулся на концепцию Subject, которая является одновременно Observable и Observer. Функциональность, которую я хочу, похожа на эту. Мне нужен класс, который может как получать, так и излучать Status объектов, но при этом ему необходимо выполнить некоторые вычисления.

Я смотрел на существующие реализации Subject, и я не думаю, что какие-либо из них естественным образом поддерживают это поведение. Во-вторых, они работают на Observable, а не Flowable, поэтому мне нужно сделать toFlowable и toObservable, чтобы использовать это.

Есть ли лучший способ реализовать это поведение?

1 Ответ

0 голосов
/ 22 июня 2019

Давайте определим source как ваш источник Flowable<Status>.

Flowable<Status> source = ...

Тогда вы можете попробовать это:

Flowable.interval(1, TimeUnit.SECONDS)
        .withLatestFrom(source.scan(this::update), (i, status) -> status)
        .share();
  • .interval() оператор периодически генерирует длинныйзначение.
  • .scan() оператор «накапливает» элементы, излучаемые исходным потоком.В вашем случае аккумулятор .update функция
  • .withLatestFrom() объединяет два потока.Это заменит вашу .map(i->holder.current)
  • .share(), позволяющую всем подписчикам совместно использовать одну подписку.


Небольшое примечание:

Существует Flowable версия Subject, называемая Processor.

...