Mongodb - поток документов с полем даты, которое прошло сейчас - PullRequest
0 голосов
/ 07 апреля 2020

Я хочу использовать реактивный поток в mongoDB (collection.watch, может быть?) Для потоковой передачи данных. В моих документах есть свойство date, которое называется triggerAfter, и когда эта дата пройдена, я хочу, чтобы mon go поместил документ в мой поток. Можно ли это сделать с Mongodb?

Я использую Scala и Akka для работы с потоками. Текущий код выглядит следующим образом, но он не реагирует.

def streamItemsByTriggerAtDate(): Source[MyDocument, NotUsed] = {
    val query = Json.obj("triggerAt" -> Json.obj("$lte" -> OffsetDatetime.now()))

    MongoSource(mongoDb.collection("myCollection").find(query))
      .map(fromDocument[MyDocument])

Поэтому я хочу, чтобы Mon go дал мне элементы, которые проходят дату triggerAt (в реальном времени), когда мой поток активен.

1 Ответ

1 голос
/ 07 апреля 2020

Да, есть много библиотек, обеспечивающих эту функциональность. Если вы используете библиотеку реактивный-mongodb, вам следует посмотреть, как реализован Cursor. Есть несколько параметров, которые помогут вам шагать с потоком. Даже если вы используете библиотеку alpakka-mongodb, вы должны будете реагировать-mon go lib:

import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

def events(coll: BSONCollection): AkkaStreamCursor[Int] =
    collection.find(BSONDocument.empty/* findAll */).
    sort(BSONDocument("id" -> 1)).cursor[Int]()

val src: Source[Int, Future[State]] = cursor.documentSource()

Вы можете найти подробную информацию здесь и здесь

...