Как я могу достичь этого требования с помощью Rx Java - PullRequest
0 голосов
/ 13 декабря 2018

У меня есть состояние (Enum), которое содержит (хорошие, некритические, критические) значения

Итак, требование:

  1. должно срабатывать, когда состояние переходит в некритическое состояниесостояние.
  2. должно срабатывать, когда состояние переходит в критическое состояние.
  3. должно срабатывать, когда состояние остается в критическом состоянии в течение 15 секунд.

Ввод:

publishSubject.onNext("Good")
publishSubject.onNext("Critcal") 
publishSubject.onNext("Critcal") 
publishSubject.onNext("NonCritical")  
publishSubject.onNext("Critacal") 
publishSubject.onNext("Critical") 
publishSubject.onNext("Good")
and so on...

См. Структуру кода для справки:

    var publishSubject = PublishSubject.create<State>()
    publishSubject.onNext(stateObject)


    publishSubject
            /* Business Logic Required Here ?? */
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                AppLogger.printLog("Trigger Success --> ")
            }

Пожалуйста, помогите, Заранее спасибо,

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Вы можете использовать distinctUntilChanged() для подавления событий, которые не меняют состояние.Отфильтруйте обычные события, используя filter().

. Используйте оператор switchMap(), чтобы создать новую подписку при изменении состояния.Когда состояние «критическое», используйте оператор interval() для ожидания 15 секунд.Если состояние изменится в течение этих 15 секунд, switchMap() отменит подписку и повторно подпишется на новую наблюдаемую.

publishSubject
  .distinctUntilChanged()
  .subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .filter( state -> state != State.Normal )
  .switchMap( state -> {
                   if (state == State.Critical) {
                     return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
                        .map(v -> State.Critical); // Note 2
                   }
                   return Observable.just( State.Noncritical );
                 })
  .subscribe( ... );
  1. interval() получает начальное значение 0, вызывая егонемедленно выдать значение.Через 15 секунд будет выдано следующее значение и т. Д.
  2. Оператор map() превращает Long, испускаемый interval(), в
0 голосов
/ 13 декабря 2018

Первые две части ваших требований должны быть объединены в одну.Вы просите, чтобы цепочка была запущена при NonCritical и Critical событиях, поэтому цепочка должна не быть запущенной для события Good.Точно так же вам нужно вызвать событие, только если состояние отличается от предыдущего события.Для этого должно быть достаточно двух .filter событий:

var lastKnownState: State = null

publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(this::checkStateDiffers)       // Check we have a new state
        .filter { state -> state != State.Good }        // Check event is good
        .subscribe {
            AppLogger.printLog("Trigger Success --> ")
        }

...

private fun checkStateDiffers(val state: State): Boolean {
     val isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}

Требование времени ожидания немного сложнее.Оператор RxJava timeout() дает возможность выдачи ошибки, когда ничего нового не было получено в течение определенного периода времени.Однако я предполагаю, что вы хотите продолжать слушать события даже после получения тайм-аута.Аналогично, если мы просто отправим еще одно событие Critical, оно будет отброшено первым filter.Поэтому в этом случае я бы порекомендовал второй одноразовый прибор, который просто прослушивает этот тайм-аут.

Disposable timeoutDisp = publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(15, TimeUnit.SECONDS)
        .onErrorResumeNext(State.Timeout)
        .filter { state -> state == State.Timeout }
        .filter { state -> lastKnownState == State.Critical }
        .subscribe {
            AppLogger.printLog("Timeout Success --> ")
        }

Также настройте checkStateDiffers(), чтобы не сохранять это состояние Timeout в первой цепочке.

private fun checkStateDiffers(val state: State): Boolean {
     if (state == State.Timeout) return true

     var isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}
...