Первые две части ваших требований должны быть объединены в одну.Вы просите, чтобы цепочка была запущена при NonCritical
и Critical
событиях, поэтому цепочка должна не быть запущенной для события Good
.Точно так же вам нужно вызвать событие, только если состояние отличается от предыдущего события.Для этого должно быть достаточно двух .filter
событий:
var lastKnownState: State = null
publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.filter(this::checkStateDiffers) // Check we have a new state
.filter { state -> state != State.Good } // Check event is good
.subscribe {
AppLogger.printLog("Trigger Success --> ")
}
...
private fun checkStateDiffers(val state: State): Boolean {
val isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}
Требование времени ожидания немного сложнее.Оператор RxJava timeout()
дает возможность выдачи ошибки, когда ничего нового не было получено в течение определенного периода времени.Однако я предполагаю, что вы хотите продолжать слушать события даже после получения тайм-аута.Аналогично, если мы просто отправим еще одно событие Critical
, оно будет отброшено первым filter
.Поэтому в этом случае я бы порекомендовал второй одноразовый прибор, который просто прослушивает этот тайм-аут.
Disposable timeoutDisp = publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.timeout(15, TimeUnit.SECONDS)
.onErrorResumeNext(State.Timeout)
.filter { state -> state == State.Timeout }
.filter { state -> lastKnownState == State.Critical }
.subscribe {
AppLogger.printLog("Timeout Success --> ")
}
Также настройте checkStateDiffers()
, чтобы не сохранять это состояние Timeout
в первой цепочке.
private fun checkStateDiffers(val state: State): Boolean {
if (state == State.Timeout) return true
var isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}