Kotlin - последовательность фрагментов в зависимости от размера и времени. - PullRequest
0 голосов
/ 25 июня 2018

У меня есть бесконечный поток в виде последовательности.То, к чему я стремлюсь, это взять пакет из последовательности как по времени, так и по размеру.

Я имею в виду, что если моя последовательность имеет 2250 сообщений, я хочу отправить 3 пакета (1000, 1000, 250).

Также, если до следующих 5 минут я все еще не накопил 1000 сообщений, я все равно отправлю его с тем, что накопилось до сих пор.

        sequence
        .chunked(1000)
        .map { chunk ->
            // do something with chunk
        }

То, что я ожидал получитьэто что-то наподобие .chunked (1000, 300), 300 секунд для которого я хочу отправлять каждые 5 минут.

Заранее спасибо

1 Ответ

0 голосов
/ 26 июня 2018

Kotlin Sequence является синхронным понятием и не должен использоваться каким-либо ограниченным по времени способом.Если вы запрашиваете последовательность для следующего элемента, он блокирует поток invoker до тех пор, пока не будет создан следующий элемент, и нет способа отменить его.

Однако библиотека kotlinx.coroutines представляет концепцию Channel, которая является грубым аналогом последовательности для асинхронного мира, где выполнение операции может занять некоторое время, и при этом они не блокируют потоки.,Вы можете прочитать больше в этом руководстве .

. Оно не предоставляет готового к использованию оператора chunked, но упрощает его написание.Вы можете использовать следующий код:

import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*

fun <T> ReceiveChannel<T>.chunked(size: Int, time: Long) =
    produce<List<T>>(onCompletion = consumes()) {
        while (true) { // this loop goes over each chunk
            val chunk = mutableListOf<T>() // current chunk
            val ticker = ticker(time) // time-limit for this chunk
            try {
                whileSelect {
                    ticker.onReceive {
                        false  // done with chunk when timer ticks, takes priority over received elements
                    }
                    this@chunked.onReceive {
                        chunk += it
                        chunk.size < size // continue whileSelect if chunk is not full
                    }
                }
            } catch (e: ClosedReceiveChannelException) {
                return@produce // that is normal exception when the source channel is over -- just stop
            } finally {
                ticker.cancel() // release ticker (we don't need it anymore as we wait for the first tick only)
                if (chunk.isNotEmpty()) send(chunk) // send non-empty chunk on exit from whileSelect
            }
        }
    }

Как видно из этого кода, в него встроены некоторые нетривиальные решения о том, что делать в угловых случаях.Что нам делать, если время таймера истекло, но текущий блок все еще пуст?Этот код запускает новый интервал времени и не отправляет предыдущий (пустой) фрагмент.Завершаем ли мы текущий блок по времени ожидания после последнего элемента, измеряем время с первого элемента или измеряем время с начала фрагмента?Этот код делает позже.

Этот код полностью последовательный - его логика легко следовать пошаговым способом (в коде нет параллелизма).Можно настроить его под любые специфические для проекта требования.

...