Как потреблять бесконечный поток в Котлине? - PullRequest
2 голосов
/ 28 октября 2019

Я хочу написать программу, которая будет использовать поток бесконечный из Интернета. Поток поступит как JSON через веб-сокеты. Я ищу структурированные данные, которые я должен использовать.

Требования:

  1. Поток будет бесконечным. Я никогда не хочу прекращать прослушивание новых данных.
  2. Время между новыми событиями в потоке неизвестно. Они могут приходить быстро один за другим, но также возможны большие паузы в несколько часов. Я хочу потреблять, когда что-то есть, и ждать между тем .
  3. Я хочу использовать события последовательно, одно за другим.

Мой компонент должентолько трансформировать потребляемые события и пересылать их дальше. Я пробовал что-то вроде этого:

fun consume(stream: Stream<WebEvent>): Sequence<TransformedEvent> {
    return try {
        stream.asSequence().let { seq ->
            var currentEvent = generator.firstEvent(seq.first())

            seq.map {
                currentEvent = generator.nextEvent(currentEvent, it)
                return@map currentEvent
            }
        }
    } catch (e: NoSuchElementException) {
        throw EmptyStreamException(e)
    }
}

Генератору в этом примере «нужно» «предыдущее» событие для генерации нового, но это часть логики преобразования. Я заинтересован в использовании потока.

Это сработало. Но мне интересно, есть ли лучший способ сделать это в Котлине. Может с блокирующей очередью или что-то в этом роде.

...