Как реализовать естественное (ака. Умное) дозирование по каналам Kotlin? - PullRequest
0 голосов
/ 14 декабря 2018

натуральный ака.Интеллектуальная пакетная обработка - это метод обработки потоков, который оптимизирует пропускную способность, не влияя на задержку.На примере параллельной очереди потребитель имеет возможность атомарно истощить все наблюдаемые элементы в какой-то момент и затем обработать их как пакет.В идеале очередь должна быть ограничена, давая верхний предел размеру пакета и одновременно предоставляя отправителю обратное давление.

Это называется «естественным» пакетированием, потому что нет наложенного размера пакета: когда трафикнизкий, он будет обрабатывать каждый элемент, как только он прибывает.В этом случае вам не нужно оптимизировать пропускную способность, объединяя элементы в группы.Когда трафик становится выше, потребитель автоматически начинает обрабатывать большие пакеты, амортизируя фиксированную задержку одной операции, такой как база данных INSERT.

Я написал некоторый код, который достигает основной цели:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val batchLimit = 20

@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}

Мы можем проверить это следующим образом:

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}

@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}

consumeBatched() опрашивает очередь по одному элементу за раз и, следовательно, должен дополнительно устанавливать ограничение на количество партий.Было бы более оптимальным, если бы они были записаны в параллельную очередь, такую ​​как OneToOneConcurrentArrayQueue проекта Agrona, которая поддерживает операцию drain.

Есть ли лучший подход с каналами Котлина, с большей поддержкой со стороныбиблиотека?

Если нет, то будет ли это рассматриваться как функция для добавления?

1 Ответ

0 голосов
/ 18 декабря 2018

Есть ли лучший подход к каналам Kotlin с большей поддержкой из библиотеки?

В библиотеке нет поддержки этой функции.

Если нет, то будет ли это рассматриваться как функция для добавления?

Это зависит от желаемой поверхности API.Элемент drain вряд ли подходит для семантики канала: он ограничивает реализацию, он должен каким-то образом выставлять ограничение стока и дает каналу более "подобный коллекции" API.Например, как drain должен вести себя с неограниченным каналом?Можно ли реализовать drain эффективным способом (с буфером предварительного размера, но избегая OOM и неограниченными коллекциями) один раз и использовать его с любой реализацией канала?

Что может бытьулучшены дополнительные подсказки от канала, такие как ожидаемая емкость и количество помещенных в очередь элементов.Они могут иметь упрощенную семантику с реализацией по умолчанию и действовать как подсказки к расширению drain с некоторыми разумно настраиваемыми верхними границами.Такой API можно добавить в будущем, не стесняйтесь создать проблему

...