Я написал коннектор spark mongoDB для искривления, используя API приемника от spark:
mongoConnector.getCollection[D]() match {
case Success(collection) => {
collection.find().subscribe(new Observer[D]() {
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
subscription.request(Long.MaxValue)
}
override def onNext(result: D): Unit = {
store(result)
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit ={stop("ed")}
})
}case Failure(ex) => stop("Failed to connect to MongoDB", ex)}
}
Я заметил, что метод onComplete
меняет все. На самом деле моя цель - читать данные потоковым способом из коллекции mongodb. когда я выбираю метод остановки искрового приемника, данные будут считываться снова и снова, поэтому я получаю много дубликатов, но когда я использую print ("msg") для метода onComplete, данные будут считаны один, но новые данные вставлены в коллекция после запуска задания никогда не будет прочитана. Не могли бы вы сказать мне, как заставить его читать данные, как это происходит ровно один раз