Каков идиоматический способ сбора данных из нескольких источников? - PullRequest
1 голос
/ 11 ноября 2019

Представьте себе сервер данных, данные которого случайно отбрасываются между 40 узлами, из которых вы хотите рассчитать значение для каждых 200 записей. Поэтому загружайте 200, вычисляйте, загружайте 200, вычисляйте и т. Д. Ваш сервер может обрабатывать 500 записей в секунду, но у вас достаточно пропускной способности для чтения 50 записей в секунду с каждого сервера (для максимальной пропускной способности в 2000 записей).

Вы можете сделать это последовательно, что является самым простым вариантом:

var cache = mutableListOf()
for (serv in servers) {
    for(record in serv.loadData()) {
        cache += record
        if (cache.count() == 500) {
            process(cache)
            cache.popFront(500)
        }

    }
}

Это не тратит место в памяти, а только загружает 50 записей / с и не обрабатывает результатыв параллели. Таким образом, другой способ - сначала получить результаты со всех серверов, а затем выполнить итерацию:

var queue = ConcurrentLinkedDeque()
coroutineScope {
    for (serv in servers) {
        launch(Dispatchers.IO) {
            for (record in serv.loadData()) {
                queue += record
            }
        }
    }
}

for (batch in queue.chunked(500)) {
    process(batch)
}

Это позволит максимально использовать вашу пропускную способность, но тратить пространство в вашей параллельной очереди и как есть. также не допускает параллельной обработки и загрузки.

Так что это хороший шанс использовать Flow. Мы хотим сохранить возможность загрузки из нескольких источников параллельно, поэтому мы заменили бы queue += record на emit(record), а затем пакетировали и обработали результаты в collect{} Но Flow.emit не является многопоточным (иконтекст меняется из-за launch, но это можно преодолеть, даже если это нежелательно).

Предполагая, что serv.loadData() загружает данные постепенно, это все равно может быть достигнуто путем приостановки загрузки данных, когда очередь получаетслишком полный. Но это кажется действительно ручным и неуклюжим, если вы пишете это так.

Итак, если вам все равно, в каком порядке загружаются данные - каков идиоматический способ сделать это в текущей версии Kotlin?

1 Ответ

1 голос
/ 11 ноября 2019

Вот подход с flatMapMerge, который автоматически распараллеливает внутренние потоки, которые вы излучаете:

suspend fun main() {
    servers.asFlow()
            .flatMapMerge(servers.size) { server -> flow {
                for (record in server.loadData()) {
                    emit(record)
                }
            } }
            .chunked(500)
            .flowOn(Dispatchers.IO) // optional
            .collect { batch ->
                process(batch)
            }
}

fun <T> Flow<T>.chunked(size: Int) = flow {
    var chunk = mutableListOf<T>()
    collect {
        chunk.add(it)
        if (chunk.size == size) {
            emit(chunk)
            chunk = mutableListOf()
        }
    }
    chunk.takeIf { it.isNotEmpty() }?.also { emit(it) }
}

Поток по-прежнему не имеет стандартной реализации chunked, поэтому я предоставил быстрый игрязный.

...