используя потоки Акка, чтобы пройти коллекцию Монго - PullRequest
0 голосов
/ 06 января 2019

У меня есть коллекция людей в Монго, и я хочу просмотреть каждого человека в коллекции как поток, и для каждого человека вызвать метод, который выполняет вызов API, изменяет модель и вставляет в новую коллекцию. в монго.

Это выглядит так:

  def processPeople()(implicit m: Materializer): Future[Unit] = {

    val peopleSource: Source[Person, Future[State]] = collection.find(json()).cursor[Person]().documentSource()

    peopleSource.runWith(Sink.seq[Person]).map(people => {
      people.foreach(person => {
        changeModelAndInsertToNewCollection(person)
      }) 
    })
  }

но это не работает ... кажется, что часть изменения модели работает, но вставка в монго не работает.

Похоже, что метод также не запускается сразу, какая-то обработка происходит раньше, чем за минуту до его запуска ... вы видите проблему?

1 Ответ

0 голосов
/ 08 января 2019

Решение 1:

def changeModelAndInsertToNewCollection(person:Person) : Future[Boolean] ={
//Todo : call mongo api to update the person
???
}

def processPeople()(implicit m: Materializer): Future[Done] = {
val numberOfConcurrentUpdate = 10

val peopleSource: Source[Person, Future[State]] =
  collection
    .find(json())
    .cursor[Person]()
    .documentSource()

peopleSource
  .mapAsync(numberOfConcurrentUpdate)(changeModelAndInsertToNewCollection)
  withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.ignore)}

Решение 2: используя Alpakka в качестве соединителя потока akka для mongo

val source: Source[Document, NotUsed] =
MongoSource(collection.find(json()).cursor[Person]().documentSource())

source.runWith(MongoSink.updateOne(2, collection))
...