Оператор GroupBy для Kotlin Flow - PullRequest
       69

Оператор GroupBy для Kotlin Flow

2 голосов
/ 30 октября 2019

Я пытаюсь перейти с RxJava на Kotlin Flow. Поток действительно впечатляет. Но есть ли какой-либо оператор, похожий на RxJava "GroupBy" в потоке kotlin прямо сейчас?

Ответы [ 2 ]

2 голосов
/ 03 ноября 2019

Начиная с Kotlin Coroutines 1.3, стандартная библиотека, похоже, не предоставляет этот оператор. Однако, поскольку конструкция Flow такова, что все операторы являются функциями расширения, нет принципиального различия между стандартной библиотекой, предоставляющей ее, и тем, что вы пишете свою собственную.

Имея это в виду, вот некоторые измои идеи о том, как подойти к нему.

1. Собрать каждую группу в список

Если вам просто нужен список всех элементов для каждого ключа, используйте эту простую реализацию, которая генерирует пары (K, List<T>):

fun <T, K> Flow<T>.groupToList(getKey: (T) -> K): Flow<Pair<K, List<T>>> = flow {
    val storage = mutableMapOf<K, MutableList<T>>()
    collect { t -> storage.getOrPut(getKey(t)) { mutableListOf() } += t }
    storage.forEach { (k, ts) -> emit(k to ts) }
}

Для этого примера:

suspend fun main() {
    val input = 1..10
    input.asFlow()
            .groupToList { it % 2 }
            .collect { println(it) }
}

он печатает

(1, [1, 3, 5, 7, 9])
(0, [2, 4, 6, 8, 10])

2.a Создание потока для каждой группы

Если вам нужна полная семантика RxJava, где вы преобразуете входной поток во многиевыходные потоки (один на отдельный ключ), все становится более сложным.

Всякий раз, когда вы видите новую клавишу на входе, вы должны излучать новый внутренний поток в нисходящий поток, а затем асинхронно вставлять в него больше данных всякий раз, когда вы снова сталкиваетесь с той же клавишей.

Вот реализация, которая делает это:

fun <T, K> Flow<T>.groupBy(getKey: (T) -> K): Flow<Pair<K, Flow<T>>> = flow {
    val storage = mutableMapOf<K, SendChannel<T>>()
    try {
        collect { t ->
            val key = getKey(t)
            storage.getOrPut(key) {
                Channel<T>(32).also { emit(key to it.consumeAsFlow()) }
            }.send(t)
        }
    } finally {
        storage.values.forEach { chan -> chan.close() }
    }
}

Он устанавливает Channel для каждого ключа и выставляет канал в нисходящий поток как поток.

2.b Одновременно собирать иСокращение сгруппированных потоков

Поскольку groupBy продолжает отправлять данные во внутренние потоки после отправки самих потоков в нисходящий поток, вы должны быть очень осторожны с тем, как вы их собираете.

Вы должны собирать все внутренние потоки одновременно, без верхнего предела на уровне параллелизма. В противном случае каналы потоков, поставленных в очередь для последующего сбора, в конечном итоге заблокируют отправителя, и вы получите тупик.

Вот функция, которая делает это правильно:

fun <T, K, R> Flow<Pair<K, Flow<T>>>.reducePerKey(
        reduce: suspend Flow<T>.() -> R
): Flow<Pair<K, R>> = flow {
    coroutineScope {
        this@reducePerKey
                .map { (key, flow) -> key to async { flow.reduce() } }
                .toList()
                .forEach { (key, deferred) -> emit(key to deferred.await()) }
    }
}

Этап map запускает сопрограмму для каждого внутреннего потока, который он получает. Сопрограмма сводит его к конечному результату.

toList() - это терминальная операция, которая собирает весь восходящий поток, запуская все сопрограммы async в процессе. Сопрограммы начинают поглощать внутренние потоки, даже когда мы все еще собираем основной поток. Это необходимо для предотвращения тупика.

Наконец, после запуска всех сопрограмм мы запускаем цикл forEach, который ожидает и выдает окончательные результаты по мере их появления.

Вы можете реализовать почти такое же поведение в терминах flatMapMerge:

fun <T, K, R> Flow<Pair<K, Flow<T>>>.reducePerKey(
        reduce: suspend Flow<T>.() -> R
): Flow<Pair<K, R>> = flatMapMerge(Int.MAX_VALUE) { (key, flow) ->
    flow { emit(key to flow.reduce()) }
}

Разница заключается в упорядочении: тогда как первая реализация учитывает порядок появления клавиш на входе, эта не,Оба работают одинаково.

3. Пример

Этот пример группирует и суммирует 40 миллионов целых чисел:

suspend fun main() {
    val input = 1..40_000_000
    input.asFlow()
            .groupBy { it % 100 }
            .reducePerKey { sum { it.toLong() } }
            .collect { println(it) }
}

suspend fun <T> Flow<T>.sum(toLong: suspend (T) -> Long): Long {
    var sum = 0L
    collect { sum += toLong(it) }
    return sum
}

Я могу успешно запустить это с -Xmx64m. На моем 4-ядерном ноутбуке я получаю около 4 миллионов элементов в секунду.

Очень просто переопределить первое решение в терминах нового, например:

fun <T, K> Flow<T>.groupToList(getKey: (T) -> K): Flow<Pair<K, List<T>>> =
        groupBy(getKey).reducePerKey { toList() }
0 голосов
/ 31 октября 2019

Пока нет, но вы можете взглянуть на эту библиотеку https://github.com/akarnokd/kotlin-flow-extensions.

...