Объединение нескольких текучих - PullRequest
0 голосов
/ 26 января 2019

Я перенес свой проект из Spring в Ktor и решил заменить реализацию реактивных потоков, которая изначально была Reactor, на RxJava 2. Хотя я столкнулся с некоторой проблемой при попытке объединить несколько потоков в один в конце реактивного трубопровод. Вот как это выглядит:

internal interface Aggregator {
    fun acquireSomethingFromSomewhere(keyword: String): Flowable<Some>
}

fun acquireSomething(keyword: String) = Flowable
    .fromIterable(aggregators)
    .map { it.acquireSomethingFromSomewhere(keyword) }
    .flatMap { ??? }

Дело в том, что каждый вызов acquireSomethingFromSomewhere возвращает Flowable<Some>, есть ли оператор, который мог бы помочь мне объединить их в один поток в конце? В Reactor я просто использовал:

fun acquireSomething(keyword: String) = Flux
    .fromIterable(aggregators)
    .map { it.acquireSomethingFromSomewhere(keyword) }
    .flatMap { Flux.concat(it) }

Но в RxJava я не могу найти ни одного оператора, который мог бы решить мою проблему, поскольку каждый из них принимает Publisher в качестве аргумента, а Flowable не реализует его.

1 Ответ

0 голосов
/ 27 января 2019

Прежде всего, если функция, которую вы предоставляете в map, возвращает Flowable, вы получите вложенные Flowables (aka Flowable<Flowable<T>>), а это не то, что вы, вероятно, хотите.Это связано с тем, что функция map преобразует только элемент внутри контейнера (T) -> R (в данном случае контейнер Flowable).В вашем случае вы хотите преобразовать элемент внутри первого контейнера, возвращая новый контейнер (T) -> Flowable<R>, эта функция называется flatMap.В случае Rx у вас есть больше функций (операторов) в зависимости от их поведения, например concatMap и switchMap, но сигнатуры одинаковы,

Пример

fun acquireSomething(keyword: String) = Flowable
    .fromIterable(aggregators)
    .flatMap { it.acquireSomethingFromSomewhere(keyword) }

PS

Если вы хотите узнать больше о теории, вы можете следовать Arrow-kt документация Функтор и Монада

...