Если вы планируете сохранить while
l oop, я бы сначала предложил вам использовать scala.collection.mutable.ListBuffer
вместо неизменного List
. Это предотвратит создание копий всего списка в памяти на каждой итерации.
Если вы хотите более «функциональный» способ написания приведенного выше кода при сохранении Consumer API (вместо Kafka Streams API), вы можете вручную определить scala Stream
, например, так:
import scala.util.Random
// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
val size = Random.nextInt(10)
println("fetching messages")
Thread.sleep(1000)
(1 to size).map(_ => Random.nextInt(10)).toList
}
lazy val s: Stream[Int] = Stream.continually(poll()).flatten
// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:
/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/