Заполнение списка результатами из очереди в течение некоторого времени l oop in scala - PullRequest
1 голос
/ 23 января 2020

Я пытаюсь какое-то время написать l oop в scala. Что я хочу сделать, это заполнить список сообщениями из очереди (Кафка в данном случае, но на самом деле не имеет значения).

Я делаю это для интеграционного теста, и поскольку Kafka запускается удаленно, когда тесты выполняются в CI, тест несколько раз завершается неудачей, потому что Kafka не возвращает никаких сообщений. Поэтому я написал oop, который будет запрашивать Кафку до тех пор, пока я не верну все ожидаемые результаты (в противном случае проверка через некоторое время прекратится и завершится неудачей). У меня есть это прямо сейчас:

var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
    result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}

Это прекрасно работает, но для меня это выглядит ужасно. Плюс, если бы это был производственный код, он также был бы неэффективным. Кто-нибудь может предложить лучший способ сделать это функционально?

Ответы [ 2 ]

2 голосов
/ 23 января 2020

Возможно, что-то вроде этого?

def pollKafka = kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator.map(_.value.getPayload)

Iterator
  .continually(pollKafka)
  .flatten
  .take(expectedNumberOfMessages)
  .toList

Iterator является внутренне изменяемым, но если вы используете его высокоуровневый функциональный интерфейс и не используете Iterator, это прекрасно, ИМХО.

Если вы хотите go функциональных потоков до конца, вы можете рассмотреть такую ​​библиотеку, как fs2.

2 голосов
/ 23 января 2020

Если вы планируете сохранить while l oop, я бы сначала предложил вам использовать scala.collection.mutable.ListBuffer вместо неизменного List. Это предотвратит создание копий всего списка в памяти на каждой итерации.

Если вы хотите более «функциональный» способ написания приведенного выше кода при сохранении Consumer API (вместо Kafka Streams API), вы можете вручную определить scala Stream, например, так:

import scala.util.Random

// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
    val size = Random.nextInt(10)
    println("fetching messages")
    Thread.sleep(1000)
    (1 to size).map(_ => Random.nextInt(10)).toList
}

lazy val s: Stream[Int] = Stream.continually(poll()).flatten

// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:

/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...