Rx (RxKotlin) - rightGroupJoin с использованием groupJoin - объединить / объединить две наблюдаемые разных типов - PullRequest
0 голосов
/ 17 апреля 2019

После нескольких дней борьбы за то, что кажется простым делом, я прихожу к вам, ребята:)

Идея проста.У меня есть два потока / наблюдаемые, «левый» и «правый».Я хочу, чтобы элементы из «правого» буфера / сбора / агрегирования в «текущий» элемент в «левом».
Таким образом, каждый элемент в «левом» определяет новое «окно», в то время как все «правые» элементы будут связываться сэто окно, пока не появится новый «левый» элемент.Итак, для визуализации:

Задание:
«влево»: | - A - - - - - B - - C - - - - |
«вправо»:| - 1 - 2 - 3 -4 - 5 - 6 - - - |
'результат': | - - - - - - - -x - - -y - - - -z |(Pair<Left, List<Right>>)
Где: A, 1 ; В, 4 (т. Х); C (поэтому y) испускаются одновременно
Итак: x = пара (A, [1,2,3]), y = пара (B, [4, 5])
И: «вправо» и «результат» завершаются / завершаются, когда «левый» делает
Итак: z = пара (C, [6]) - испускается в результате «левого» завершения

----
РЕДАКТИРОВАТЬ 2 - ОКОНЧАТЕЛЬНОЕ РЕШЕНИЕ!
InЧтобы объединить «правые» элементы со следующим «левым», а не с предыдущим, я изменил код на более короткий / более простой:

fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
    return this.share().run {
        zipWith(right.buffer(this), BiFunction { left, rightList ->
            Pair(left, rightList)
        })
    }
}  

РЕДАКТИРОВАТЬ 1 - исходное решение!
Взятый из ответа @ Mark (принятого) ниже, вот что я придумал.
Он разделен на более мелкие методы, потому что я также делаю multiRightGroupJoin(), чтобы объединить столько (правильных) потоков, сколько я хочу,

fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
    return this.share().let { thisObservable ->    //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
        thisObservable.flatMapSingle { t ->        //treat each 'left' as a Single
            bufferRightOnSingleLeft(thisObservable, t, right)
        }
    }
}

Где:

private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
    : Single<Pair<T, MutableList<R>>> {

    return right.buffer(left)                              //buffer 'right' until 'left' onNext() (for each 'left' Single) 
        .map { Pair(leftSingleItem, it) }
        .first(Pair(leftSingleItem, emptyList()))   //should be only 1 (list). THINK firstOrError
}  

----

Что я получил до сих пор
После долгих чтений и понимания того, что каким-то образом для этого нет готовой реализации, я решил использовать groupJoin, в основном используя эту ссылку , например: (много проблем и местчтобы улучшить здесь, не используйте этот код)

private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {

var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
        .share() //avoid weird side-effects of multiple onSubscribe calls

//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(

    //bind 'right/other' stream to 'this/left'
    right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?

    //define when windows start/end ('this/left' onNext opens new window and closes prev)
    , Function<T, ObservableSource<T>> { thisObservable }

    //define 'right/other' stream to have no windows/intervals/aggregations by itself
    // -> immediately bind each emitted item to a 'current' window(T) above
    , Function<R, ObservableSource<R>> { Observable.empty() }

    //collect the whole 'right' stream in 'current' ('left') window
    , BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
        rObs.collect({ mutableListOf<R>() }) { acc, value ->
            acc.add(value)
        }.map { Pair(t, it.toList()) }

    }).mergeAllSingles()
}  

Я также использовал аналогичное использование для создания timedBuffer() - так же, как buffer(timeout), но с отметкой времени в каждом буфере (List) чтобы знать, когда это началось.В основном, запустив один и тот же код на Observable.interval(timeout) (как «левый»)

Проблемы / Вопросы (от самых простых до самых сложных)

  1. Это лучший способ сделать что-то подобное?Разве это не излишество?
  2. Есть ли лучший способ (должен быть) для завершения «результата» (и «правого»), когда «левый» завершен?Без этой уродливой логической логики?
  3. Такое использование, похоже, испортило порядок rx.См. Код и распечатайте ниже:

    leftObservable
    .doOnComplete {
        log("doOnComplete - before join")
     }
    .doOnComplete {
        log("doOnComplete 2 - before join")
     }
    .rightGroupJoin(rightObservable)
    .doOnComplete {
        log("doOnComplete - after join")
     }
    

Печатает (иногда! Похоже на состояние гонки) следующее:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

При первом запуске вышеуказанного кода doOnComplete - after join не вызывается, во второй раз он вызывается дважды .Третий раз как первый, четвертый как второй и т. Д.
Оба 3,4 запускаются с использованием этого кода.Возможно, это как-то связано с использованием подписки {}?Обратите внимание, что я не держу одноразовые.Этот поток заканчивается, потому что я собираю «левую» наблюдаемую

leftObservable.subscribeOn().observeOn()
.doOnComplete{log...}
.rightGroupJoin()
.doOnComplete{log...}
.subscribe {}  

Примечание 1: добавление .takeUntil { thisCompleted } после mergeAllSingles(), кажется, исправляет # 4.

Примечание 2. После использования этого метода для объединения нескольких потоков и применения «Примечание 1» становится очевидным, что onComplete (до вызова groupJoin () !!!) будет вызываться столько раз, сколько «правильных».Наблюдаемые, вероятно означающие, что причиной является right.takeUntil { thisCompleted }, действительно ли важно закрыть «правильный» поток?

Примечание 3: в отношении Примечания 1 оно, похоже, очень тесно связано с takeUntil и takeWhile.Использование takeWhile снижает количество вызовов doOnComplete, и это как-то логично.Все еще пытаюсь понять это лучше.

Можете ли вы вспомнить multiGroupJoin или, в нашем случае, multiRightGroupJoin, кроме запуска zip для groupJoin * rightObservablesCount?

Пожалуйста, задавайте все, что вам нравится.Я точно знаю, что мое использование подписки / одноразового использования и руководства onComplete не так, я просто не совсем уверен, что это такое.

Ответы [ 2 ]

1 голос
/ 18 апреля 2019

Что-то настолько простое, как это должно работать:

@JvmStatic
fun main(string: Array<String>) {
    val left = PublishSubject.create<String>()
    val right = PublishSubject.create<Int>()

    left.flatMapSingle { s ->  right.buffer(left).map { Pair(s, it) }.firstOrError() }
            .subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }


    left.onNext("A")
    right.onNext(1)
    right.onNext(2)
    right.onNext(3)
    left.onNext("B")
    right.onNext(4)
    right.onNext(5)
    left.onNext("C")
    right.onNext(6)
    left.onComplete()
}

Выход:

Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]

Ваш Observable интерес остается слева, так что подпишитесь на него. Затем просто буферизуйте правую по следующей эмиссии или завершению левой наблюдаемой. Вас интересует только один результат каждого выброса слева вверх по течению, поэтому просто используйте flatMapSingle. Я выбрал firstOrError(), но, очевидно, мог иметь элемент по умолчанию или другую обработку ошибок или даже flatMapMaybe в сочетании с firstElement()

Редактировать

У OP были дополнительные вопросы и ответы, и он обнаружил, что исходный вопрос и решение выше для буферизации правых значений с предыдущим левым излучением, до следующего левого выброса (как указано выше), не является обязательным поведением. Новое обязательное поведение заключается в буферизации правых значений в СЛЕДУЮЩЕМ левом излучении, как показано ниже:

@JvmStatic
    fun main(string: Array<String>) {
        val left = PublishSubject.create<String>()
        val right = PublishSubject.create<Int>()


        left.zipWith (right.buffer(left), 
                BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
        }).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }

        left.onNext("A")
        right.onNext(1)
        right.onNext(2)
        right.onNext(3)
        left.onNext("B")
        right.onNext(4)
        right.onNext(5)
        left.onNext("C")
        right.onNext(6)
        left.onComplete()
    }

, который дает другой конечный результат, поскольку левые значения упакованы с предыдущими правыми значениями, до следующего левого выброса (обратного).

Выход:

Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]
0 голосов
/ 17 апреля 2019

С первого взгляда я бы использовал 2 scan с.Пример:

data class Result(val left: Left?, val rightList: List<Right>) {
    companion object {
        val defaultInstance: Result = Result(null, listOf())
    }
}

leftObservable.switchMap { left -> 
    rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
        .map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult -> 
    Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }

Единственная проблема здесь заключается в обработке onComplete, не знаю, как это сделать

...