У меня есть очередь с обратным давлением, поток, читающий очередь, и процесс, подающий эту очередь в фоновом режиме. (потребление очереди начинается до окончания подачи очереди).
Подача осуществляется через доступный интерфейс, который читает базу данных. Единственный метод, который у меня есть на этом интерфейсе, это foreach. Я заполняю очередь следующим образом:
def sourceFromTraversable[T](traversable: Traversable[T]) = {
val (queue, source) = Source.queue[T](queueSize, OverflowStrategy.backpressure).preMaterialize()
Future {
traversable.foreach{
element =>
Await.result(queue.offer(element), Duration.Inf)
}
}.map(_ => queue.complete())
source
}
Я использую Await.result, потому что если я этого не сделаю, обратное давление не будет применено к проходимому, то есть оно будет слишком сильно тянуть базу данных, потому что он будет запускать много Futures параллельно вместо ожидания.
Есть ли более чистый способ (без Await.result) для достижения этого с помощью интерфейса типа traversable?
Спасибо