Исходная очередь Akka Stream: как ограничить вставку в очередь с противодавлением (из проходимого интерфейса) - PullRequest
1 голос
/ 20 января 2020

У меня есть очередь с обратным давлением, поток, читающий очередь, и процесс, подающий эту очередь в фоновом режиме. (потребление очереди начинается до окончания подачи очереди).

Подача осуществляется через доступный интерфейс, который читает базу данных. Единственный метод, который у меня есть на этом интерфейсе, это foreach. Я заполняю очередь следующим образом:

 def sourceFromTraversable[T](traversable: Traversable[T]) = {
 val (queue, source) = Source.queue[T](queueSize, OverflowStrategy.backpressure).preMaterialize()
    Future {
      traversable.foreach{
        element =>
          Await.result(queue.offer(element), Duration.Inf)
      }
    }.map(_ => queue.complete())
  source
}

Я использую Await.result, потому что если я этого не сделаю, обратное давление не будет применено к проходимому, то есть оно будет слишком сильно тянуть базу данных, потому что он будет запускать много Futures параллельно вместо ожидания.

Есть ли более чистый способ (без Await.result) для достижения этого с помощью интерфейса типа traversable?

Спасибо

...