пн go дБ подписчик наблюдаемый - PullRequest
0 голосов
/ 09 января 2020

Я написал коннектор spark mongoDB для искривления, используя API приемника от spark:

 mongoConnector.getCollection[D]() match {
    case Success(collection) => {
      collection.find().subscribe(new Observer[D]() {

        var subscription: Option[Subscription] = None

        override def onSubscribe(subscription: Subscription): Unit = {

          subscription.request(Long.MaxValue)
        }
        override def onNext(result: D): Unit = {
          store(result)
        }

        override def onError(e: Throwable): Unit = println(s"Error: $e")

        override def onComplete(): Unit ={stop("ed")}
      })
    }case Failure(ex) => stop("Failed to connect to MongoDB", ex)}

}

Я заметил, что метод onComplete меняет все. На самом деле моя цель - читать данные потоковым способом из коллекции mongodb. когда я выбираю метод остановки искрового приемника, данные будут считываться снова и снова, поэтому я получаю много дубликатов, но когда я использую print ("msg") для метода onComplete, данные будут считаны один, но новые данные вставлены в коллекция после запуска задания никогда не будет прочитана. Не могли бы вы сказать мне, как заставить его читать данные, как это происходит ровно один раз

...