Представьте себе сервер данных, данные которого случайно отбрасываются между 40 узлами, из которых вы хотите рассчитать значение для каждых 200 записей. Поэтому загружайте 200, вычисляйте, загружайте 200, вычисляйте и т. Д. Ваш сервер может обрабатывать 500 записей в секунду, но у вас достаточно пропускной способности для чтения 50 записей в секунду с каждого сервера (для максимальной пропускной способности в 2000 записей).
Вы можете сделать это последовательно, что является самым простым вариантом:
var cache = mutableListOf()
for (serv in servers) {
for(record in serv.loadData()) {
cache += record
if (cache.count() == 500) {
process(cache)
cache.popFront(500)
}
}
}
Это не тратит место в памяти, а только загружает 50 записей / с и не обрабатывает результатыв параллели. Таким образом, другой способ - сначала получить результаты со всех серверов, а затем выполнить итерацию:
var queue = ConcurrentLinkedDeque()
coroutineScope {
for (serv in servers) {
launch(Dispatchers.IO) {
for (record in serv.loadData()) {
queue += record
}
}
}
}
for (batch in queue.chunked(500)) {
process(batch)
}
Это позволит максимально использовать вашу пропускную способность, но тратить пространство в вашей параллельной очереди и как есть. также не допускает параллельной обработки и загрузки.
Так что это хороший шанс использовать Flow
. Мы хотим сохранить возможность загрузки из нескольких источников параллельно, поэтому мы заменили бы queue += record
на emit(record)
, а затем пакетировали и обработали результаты в collect{}
Но Flow.emit
не является многопоточным (иконтекст меняется из-за launch
, но это можно преодолеть, даже если это нежелательно).
Предполагая, что serv.loadData()
загружает данные постепенно, это все равно может быть достигнуто путем приостановки загрузки данных, когда очередь получаетслишком полный. Но это кажется действительно ручным и неуклюжим, если вы пишете это так.
Итак, если вам все равно, в каком порядке загружаются данные - каков идиоматический способ сделать это в текущей версии Kotlin?