У меня есть простой метод, который читает из монго коллекции людей в виде потока, и для каждого человека, который делает некоторые изменения модели и вставляет в новую коллекцию:
def processPeople()(implicit m: Materializer): Future[Done] = {
val peopleSource: Source[Person, Future[State]] = collection.find(json())
.cursor[Person]()
.documentSource()
.throttle(50, 1.second)
peopleSource.runForeach(person => insertPerson(person))
}
def insertPerson(person: Person): Future[String] = {
peopleCollection.insert(person) map { _ =>
person.id
} recover {
case WriteResult.Code(11000) =>
println(WriteResult.lastError(_))
logger.error(s"fail to insert duplicate person ${person.id}")
throw DuplicateInsertion("duplicate person")
case _ =>
logger.error(s"fail to insert person ${person.id}")
throw new RuntimeException
}
}
собрание составляет около 370 тыс. Документов, и после 200 тыс. Я почему-то получил:
reactivemongo.core.errors.DetailedDatabaseException: DatabaseException['Cursor not found, cursor id: 72642197351' (code = 43)]
и в консоли монго я увидел:
I COMMAND [conn45] killcursors: found 0 of 1
Кто-нибудь понимает, почему это происходит?