натуральный ака.Интеллектуальная пакетная обработка - это метод обработки потоков, который оптимизирует пропускную способность, не влияя на задержку.На примере параллельной очереди потребитель имеет возможность атомарно истощить все наблюдаемые элементы в какой-то момент и затем обработать их как пакет.В идеале очередь должна быть ограничена, давая верхний предел размеру пакета и одновременно предоставляя отправителю обратное давление.
Это называется «естественным» пакетированием, потому что нет наложенного размера пакета: когда трафикнизкий, он будет обрабатывать каждый элемент, как только он прибывает.В этом случае вам не нужно оптимизировать пропускную способность, объединяя элементы в группы.Когда трафик становится выше, потребитель автоматически начинает обрабатывать большие пакеты, амортизируя фиксированную задержку одной операции, такой как база данных INSERT
.
Я написал некоторый код, который достигает основной цели:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
const val batchLimit = 20
@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
handleItems: (List<T>) -> Unit
) {
val buf = mutableListOf<T>()
while (true) {
receiveOrNull()?.also { buf.add(it) } ?: break
for (x in 2..batchLimit) {
poll()?.also { buf.add(it) } ?: break
}
handleItems(buf)
buf.clear()
}
}
Мы можем проверить это следующим образом:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val chan = generateMockTraffic()
runBlocking {
chan.consumeBatched { println("Received items: $it") }
}
}
@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
return GlobalScope.produce(capacity = batchLimit) {
(1..100).forEach {
send(it)
if (it % 10 == 0) {
delay(1)
}
}
}
}
consumeBatched()
опрашивает очередь по одному элементу за раз и, следовательно, должен дополнительно устанавливать ограничение на количество партий.Было бы более оптимальным, если бы они были записаны в параллельную очередь, такую как OneToOneConcurrentArrayQueue проекта Agrona, которая поддерживает операцию drain
.
Есть ли лучший подход с каналами Котлина, с большей поддержкой со стороныбиблиотека?
Если нет, то будет ли это рассматриваться как функция для добавления?