У меня есть запрос, который возвращает большой кусок данных, я хотел бы передать эти данные из postgres в scala, используя размер выборки.Мой код выглядит примерно так
override def getFilteredData(filters: Seq[Filter]): StreamReadySQL[myObject] = {
sql""" QUERY
""".stripMargin
.fetchSize(2000)
.map(MyObject(_))
.iterator()
}
И в каком-то другом классе я называю это так
val productPublisher: DatabasePublisher[RuleData] = {
DB readOnlyStream {
repositories.productRepository.getFilteredData(trigger.filters)
}
}
Source.fromPublisher(publisher).mapAsync(Parallelism) { batch =>
//do something
Future.successful(().asRight[ApplicationError])
}.runWith(Sink.head[Either[ApplicationError, Unit]])
Используя этот код, приведенный выше источник будет возвращать только последние n выбранных записейКак получить итератор из потока, где я могу перебрать все данные?