Kotlin Sequence
является синхронным понятием и не должен использоваться каким-либо ограниченным по времени способом.Если вы запрашиваете последовательность для следующего элемента, он блокирует поток invoker до тех пор, пока не будет создан следующий элемент, и нет способа отменить его.
Однако библиотека kotlinx.coroutines
представляет концепцию Channel
, которая является грубым аналогом последовательности для асинхронного мира, где выполнение операции может занять некоторое время, и при этом они не блокируют потоки.,Вы можете прочитать больше в этом руководстве .
. Оно не предоставляет готового к использованию оператора chunked
, но упрощает его написание.Вы можете использовать следующий код:
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
fun <T> ReceiveChannel<T>.chunked(size: Int, time: Long) =
produce<List<T>>(onCompletion = consumes()) {
while (true) { // this loop goes over each chunk
val chunk = mutableListOf<T>() // current chunk
val ticker = ticker(time) // time-limit for this chunk
try {
whileSelect {
ticker.onReceive {
false // done with chunk when timer ticks, takes priority over received elements
}
this@chunked.onReceive {
chunk += it
chunk.size < size // continue whileSelect if chunk is not full
}
}
} catch (e: ClosedReceiveChannelException) {
return@produce // that is normal exception when the source channel is over -- just stop
} finally {
ticker.cancel() // release ticker (we don't need it anymore as we wait for the first tick only)
if (chunk.isNotEmpty()) send(chunk) // send non-empty chunk on exit from whileSelect
}
}
}
Как видно из этого кода, в него встроены некоторые нетривиальные решения о том, что делать в угловых случаях.Что нам делать, если время таймера истекло, но текущий блок все еще пуст?Этот код запускает новый интервал времени и не отправляет предыдущий (пустой) фрагмент.Завершаем ли мы текущий блок по времени ожидания после последнего элемента, измеряем время с первого элемента или измеряем время с начала фрагмента?Этот код делает позже.
Этот код полностью последовательный - его логика легко следовать пошаговым способом (в коде нет параллелизма).Можно настроить его под любые специфические для проекта требования.