Цель этой функции - создать поток, который периодически генерирует значения, пока не встретит поток, соответствующий предикату.
Вот код скелета, который я придумал:
class Watcher<T : Any>(
/**
* Emits the data associated with the provided id
*/
private val callable: (id: String) -> T,
/**
* Checks if the provided value marks the observable as complete
*/
private val predicate: (id: String, value: T) -> Boolean
) {
private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()
fun watch(id: String): Observable<T> {
// reuse obesrvable if exists
val existing = watchPool[id]
if (existing != null)
return existing
val value = callable(id)
if (predicate(id, value)) return Observable.just(value)
// create new observable to fetch until complete,
// then remove from the map once complete
val observable = Observable.fromCallable<T> {
callable(id)
}.repeatWhen { /* What to put here? */ }.doOnComplete {
watchPool.remove(id)
}.distinctUntilChanged()
watchPool[id] = observable
return observable
}
}
Например, если у меня есть следующие перечисления:
enum class Stage {
CREATED, PROCESSING, DELIVERING, FINISHED
}
И какой-нибудь вызываемый, который вернёт нужную стадию, я должен быть в состоянии пройти вызываемое и предикатную проверку, если stage == FINISHED
, и опрашивать, пока не получу событие FINISHED
.
Проблема, с которой я столкнулся, заключается в создании наблюдаемого, когда полученное событие не является окончательным. В этом случае наблюдаемое должно продолжать опрашивать события, пока оно не получит событие, соответствующее предикату, или пока у него больше не будет подписчиков.
Это наблюдаемое должно:
- Не опрашивать, пока не получит хотя бы одного подписчика
- Опрос каждые х секунд
- Пометить себя как завершенное, если
predicate
вернет true
- Заполните себя, если когда-нибудь перейдете от> 0 подписчиков к 0 подписчикам
Использование пулов часов просто для того, чтобы два потока, просматривающие один и тот же идентификатор, не опрашивали вдвое больше раз. Удаление наблюдаемых с карты также просто, чтобы не накапливаться. По той же причине наблюдаемые, которые испускают только одну переменную, не сохраняются для справки.
Как мне добавить функциональность для точек, добавленных выше?
Я сошлюсь на одну существующую проблему RxJava Github , которая мне показалась полезной, но насколько я знаю, она не позволяет предикатам иметь дело со значением, испускаемым вызываемым объектом.