RxObservable, который повторяется, пока не будет найдено ожидаемое значение - PullRequest
0 голосов
/ 18 мая 2018

Цель этой функции - создать поток, который периодически генерирует значения, пока не встретит поток, соответствующий предикату.

Вот код скелета, который я придумал:

class Watcher<T : Any>(
        /**
         * Emits the data associated with the provided id
         */
        private val callable: (id: String) -> T,
        /**
         * Checks if the provided value marks the observable as complete
         */
        private val predicate: (id: String, value: T) -> Boolean
) {

    private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()

    fun watch(id: String): Observable<T> {
        // reuse obesrvable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = callable(id)
        if (predicate(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.fromCallable<T> {
            callable(id)
        }.repeatWhen { /* What to put here? */ }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged()
        watchPool[id] = observable
        return observable
    }

}

Например, если у меня есть следующие перечисления:

enum class Stage {
    CREATED, PROCESSING, DELIVERING, FINISHED
}

И какой-нибудь вызываемый, который вернёт нужную стадию, я должен быть в состоянии пройти вызываемое и предикатную проверку, если stage == FINISHED, и опрашивать, пока не получу событие FINISHED.

Проблема, с которой я столкнулся, заключается в создании наблюдаемого, когда полученное событие не является окончательным. В этом случае наблюдаемое должно продолжать опрашивать события, пока оно не получит событие, соответствующее предикату, или пока у него больше не будет подписчиков.

Это наблюдаемое должно:

  • Не опрашивать, пока не получит хотя бы одного подписчика
  • Опрос каждые х секунд
  • Пометить себя как завершенное, если predicate вернет true
  • Заполните себя, если когда-нибудь перейдете от> 0 подписчиков к 0 подписчикам

Использование пулов часов просто для того, чтобы два потока, просматривающие один и тот же идентификатор, не опрашивали вдвое больше раз. Удаление наблюдаемых с карты также просто, чтобы не накапливаться. По той же причине наблюдаемые, которые испускают только одну переменную, не сохраняются для справки.

Как мне добавить функциональность для точек, добавленных выше? Я сошлюсь на одну существующую проблему RxJava Github , которая мне показалась полезной, но насколько я знаю, она не позволяет предикатам иметь дело со значением, испускаемым вызываемым объектом.

1 Ответ

0 голосов
/ 18 мая 2018

В итоге я просто использовал takeUntil и использовал метод интервала наблюдения для опроса.

abstract class RxWatcher<in T : Any, V : Any> {

    /**
     * Emits the data associated with the provided id
     * At a reasonable point, emissions should return a value that returns true with [isCompleted]
     * This method should be thread safe, and the output should not depend on the number of times this method is called
     */
    abstract fun emit(id: T): V

    /**
     * Checks if the provided value marks the observable as complete
     * Must be thread safe
     */
    abstract fun isCompleted(id: T, value: V): Boolean

    /**
     * Polling interval in ms
     */
    open val pollingInterval: Long = 1000

    /**
     * Duration between events in ms for which the observable should time out
     * If this is less than or equal to [pollingInterval], it will be ignored
     */
    open val timeoutDuration: Long = 5 * 60 * 1000

    private val watchPool: MutableMap<T, Observable<V>> = ConcurrentHashMap()

    /**
     * Returns an observable that will emit items every [pollingInterval] ms until it [isCompleted]
     *
     * The observable will be reused if there is polling, so the frequency remains constant regardless of the number of
     * subscribers
     */
    fun watch(id: T): Observable<V> {
        // reuse observable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = emit(id)
        if (isCompleted(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.interval(pollingInterval, TimeUnit.MILLISECONDS, Schedulers.io()).map {
            emit(id)
        }.takeUntil {
            isCompleted(id, it)
        }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged().run {
            if (timeoutDuration > pollingInterval) timeout(timeoutDuration, TimeUnit.MILLISECONDS)
            else this
        }
        watchPool[id] = observable
        return observable
    }

    /**
     * Clears the observables from the watch pool
     * Note that existing subscribers will not be affected
     */
    fun clear() {
        watchPool.clear()
    }

}
...