Объединить последние значения вложенных наблюдаемых по требованию - PullRequest
0 голосов
/ 22 сентября 2018

У меня есть несколько настраиваемых полей, каждое из которых имеет значение, которое можно получить с помощью BehaviorSubject<Value>.То, что поля показаны, основано на том, что я получаю от API, поэтому в итоге у меня есть n количество BehaviorSubject<Value> с.Я хотел бы сгруппировать эти значения вместе в Observable<List<Value>>, где список содержит последние значения из этих полей (порядок не имеет значения).Проблема, однако, заключается в том, что эти поля не все доступны одновременно, поскольку они создаются во время загрузки пользовательского интерфейса, поэтому я не могу использовать Observable.combineLatest со списком субъектов.

То, что я сейчас сделал, - это ясоздал следующую переменную:

private val values = BehaviorSubject.create<Pair<Int, Value>>()

Я использую этот предмет для подписки на все предметы поля, но сначала сопоставляю предметы с их положением и составляю пару.

fieldSubject.map {
    Pair(position, value)
}.subscribe(values)

Затем я хочу сгруппировать значения, основанные на их позиции в паре, и получить Observable<List<Value>>, где список содержит самые последние значения из каждой позиции.Однако я не знаю, как действовать после группировки их, используя groupBy:

values.groupBy {
    it.first
}

Это приводит к Observable<GroupedObservable<Pair, Value>>>.Наконец, вот как я думаю, я должен добраться до Observable<List<Value>>, но я не знаю, что делать отсюда.

1 Ответ

0 голосов
/ 25 сентября 2018

groupBy здесь не кажется мне полезным.

Накопление значений обычно выполняется с использованием scan, здесь вы можете использовать такое преобразование, как:

values.scanWith({ arrayOfNulls<Value>(N) }) { acc, (index, value) ->
    acc.copyOf().apply {
        set(index, value)
    }
}
    .map { it.filterNotNull() }

Вы можетевозможно, обойдется без copyOf(), но я думаю, что в прошлом я сталкивался с проблемами, когда функция аккумулятора была не чистой.

Кстати, вы можете написать position to value вместо Pair(position, value).

Кроме того, вы можете использовать merge для получения Observable<Pair<Int, Value>> вместо создания BehaviorSubject и подписки его вручную на все поля:

Observable.mergeArray(
    fieldX.map { 0 to it },
    fieldY.map { 1 to it },
    fieldZ.map { 2 to it }
    // ...
)

Таким образом, в целом вы могли бы иметь функциюэто все для вас:

inline fun <reified T : Any> accumulateLatest(vararg sources: Observable<out T>): Observable<List<T>> {
    return Observable.merge(sources.mapIndexed { index, observable ->
        observable.map { index to it }
    })
        .scanWith({ arrayOfNulls<T>(sources.size) }) { acc, (index, value) ->
            acc.copyOf().apply {
                set(index, value)
            }
        }
        .map { it.filterNotNull() }
}

А потом просто позвоните:

accumulateLatest(fieldX, fieldY, fieldZ)
    .subscribe {
        println("Latest list: $it")
    }
...