После нескольких дней борьбы за то, что кажется простым делом, я прихожу к вам, ребята:)
Идея проста.У меня есть два потока / наблюдаемые, «левый» и «правый».Я хочу, чтобы элементы из «правого» буфера / сбора / агрегирования в «текущий» элемент в «левом».
Таким образом, каждый элемент в «левом» определяет новое «окно», в то время как все «правые» элементы будут связываться сэто окно, пока не появится новый «левый» элемент.Итак, для визуализации:
Задание:
«влево»: | - A - - - - - B - - C - - - - |
«вправо»:| - 1 - 2 - 3 -4 - 5 - 6 - - - |
'результат': | - - - - - - - -x - - -y - - - -z |(Pair<Left, List<Right>>
)
Где: A, 1 ; В, 4 (т. Х); C (поэтому y) испускаются одновременно
Итак: x = пара (A, [1,2,3]), y = пара (B, [4, 5])
И: «вправо» и «результат» завершаются / завершаются, когда «левый» делает
Итак: z = пара (C, [6]) - испускается в результате «левого» завершения
----
РЕДАКТИРОВАТЬ 2 - ОКОНЧАТЕЛЬНОЕ РЕШЕНИЕ!
InЧтобы объединить «правые» элементы со следующим «левым», а не с предыдущим, я изменил код на более короткий / более простой:
fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
return this.share().run {
zipWith(right.buffer(this), BiFunction { left, rightList ->
Pair(left, rightList)
})
}
}
РЕДАКТИРОВАТЬ 1 - исходное решение!
Взятый из ответа @ Mark (принятого) ниже, вот что я придумал.
Он разделен на более мелкие методы, потому что я также делаю multiRightGroupJoin()
, чтобы объединить столько (правильных) потоков, сколько я хочу,
fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
return this.share().let { thisObservable -> //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
thisObservable.flatMapSingle { t -> //treat each 'left' as a Single
bufferRightOnSingleLeft(thisObservable, t, right)
}
}
}
Где:
private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
: Single<Pair<T, MutableList<R>>> {
return right.buffer(left) //buffer 'right' until 'left' onNext() (for each 'left' Single)
.map { Pair(leftSingleItem, it) }
.first(Pair(leftSingleItem, emptyList())) //should be only 1 (list). THINK firstOrError
}
----
Что я получил до сих пор
После долгих чтений и понимания того, что каким-то образом для этого нет готовой реализации, я решил использовать groupJoin
, в основном используя эту ссылку , например: (много проблем и местчтобы улучшить здесь, не используйте этот код)
private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
.share() //avoid weird side-effects of multiple onSubscribe calls
//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(
//bind 'right/other' stream to 'this/left'
right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?
//define when windows start/end ('this/left' onNext opens new window and closes prev)
, Function<T, ObservableSource<T>> { thisObservable }
//define 'right/other' stream to have no windows/intervals/aggregations by itself
// -> immediately bind each emitted item to a 'current' window(T) above
, Function<R, ObservableSource<R>> { Observable.empty() }
//collect the whole 'right' stream in 'current' ('left') window
, BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
rObs.collect({ mutableListOf<R>() }) { acc, value ->
acc.add(value)
}.map { Pair(t, it.toList()) }
}).mergeAllSingles()
}
Я также использовал аналогичное использование для создания timedBuffer()
- так же, как buffer(timeout)
, но с отметкой времени в каждом буфере (List
) чтобы знать, когда это началось.В основном, запустив один и тот же код на Observable.interval(timeout)
(как «левый»)
Проблемы / Вопросы (от самых простых до самых сложных)
- Это лучший способ сделать что-то подобное?Разве это не излишество?
- Есть ли лучший способ (должен быть) для завершения «результата» (и «правого»), когда «левый» завершен?Без этой уродливой логической логики?
Такое использование, похоже, испортило порядок rx.См. Код и распечатайте ниже:
leftObservable
.doOnComplete {
log("doOnComplete - before join")
}
.doOnComplete {
log("doOnComplete 2 - before join")
}
.rightGroupJoin(rightObservable)
.doOnComplete {
log("doOnComplete - after join")
}
Печатает (иногда! Похоже на состояние гонки) следующее:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join
При первом запуске вышеуказанного кода doOnComplete - after join
не вызывается, во второй раз он вызывается дважды .Третий раз как первый, четвертый как второй и т. Д.
Оба 3,4 запускаются с использованием этого кода.Возможно, это как-то связано с использованием подписки {}?Обратите внимание, что я не держу одноразовые.Этот поток заканчивается, потому что я собираю «левую» наблюдаемую
leftObservable.subscribeOn().observeOn()
.doOnComplete{log...}
.rightGroupJoin()
.doOnComplete{log...}
.subscribe {}
Примечание 1: добавление .takeUntil { thisCompleted }
после mergeAllSingles()
, кажется, исправляет # 4.
Примечание 2. После использования этого метода для объединения нескольких потоков и применения «Примечание 1» становится очевидным, что onComplete (до вызова groupJoin () !!!) будет вызываться столько раз, сколько «правильных».Наблюдаемые, вероятно означающие, что причиной является right.takeUntil { thisCompleted }
, действительно ли важно закрыть «правильный» поток?
Примечание 3: в отношении Примечания 1 оно, похоже, очень тесно связано с takeUntil и takeWhile.Использование takeWhile снижает количество вызовов doOnComplete, и это как-то логично.Все еще пытаюсь понять это лучше.
Можете ли вы вспомнить multiGroupJoin или, в нашем случае, multiRightGroupJoin, кроме запуска zip для groupJoin * rightObservablesCount?
Пожалуйста, задавайте все, что вам нравится.Я точно знаю, что мое использование подписки / одноразового использования и руководства onComplete не так, я просто не совсем уверен, что это такое.