ограничение вызовов API во время чтения из потока (playframework / akka) - PullRequest
0 голосов
/ 09 января 2019

Моя задача - прочитать коллекцию из mongo db (используя потоки akka) и для каждого элемента (документа) вызвать некоторый API-интерфейс google и обогатить данные элемента результатом google.

Google Limit Api звонки до 50 в секунду, поэтому я использую газ, как это:

  def processSuppliers()(implicit m: Materializer): Future[Done] = {
    val suppliersSource: Source[Supplier, Future[State]] =
      suppliersCollection.find(json())
        .noCursorTimeout
        .cursor[Supplier]()
        .documentSource()
        .throttle(50, 1.second)

    suppliersSource
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runForeach(supplier => insertEnrichedSupplier(supplier))
  }

проблема в том, что для каждого элемента я звоню в Google 0 или более раз, и есть некоторые элементы, которые будут вызывать 50 - 100 звонков в Google, даже больше.

так что мой предел газа на самом деле не 50 в секунду ... у вас есть какие-либо предположения о том, что может решить эту проблему?

1 Ответ

0 голосов
/ 09 января 2019

Я думаю, что это будет работать:

def processSuppliers()(implicit m: Materializer): Future[Done] = {
val suppliersSource: Source[Supplier, Future[State]] =
  suppliersCollection.find(json())
    .noCursorTimeout
    .cursor[Supplier]()
    .documentSource()
    .mapAsync(50)(supplier => insertEnrichedSupplier(supplier))
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .run()
}
...