искровой потоковый приемник mongoDB - PullRequest
0 голосов
/ 07 января 2020

Я пишу пользовательский приемник потоковой искровой графики go Receiver для чтения данных из коллекции mongoDb с использованием spark Dstream.

Ниже приведен код, который я написал:

class MongoDBReceiver[D: ClassTag](mongoConnector: MongoDefaultConnector, 
                findOptions:    MongoOptions, 
                storageLevel: StorageLevel ) extends Receiver[D](storageLevel) {

  val logger =  LoggerFactory.getLogger(getClass)

  private var subscription: Option[Subscription] = None

  override def onStart(): Unit = {
    new Thread() {
      override def run() {
        logger.info("starting")
        receive()
      }
    }.start()

  }

  def receive(): Unit = {
    mongoConnector.getCollection[D]() match {
      case Success(collection) => {
        getPickObservable(collection, findOptions).snapshot(true).subscribe(
          new Observer[D] {
            override def onSubscribe(sub: Subscription): Unit = {
              subscription = Some(sub)
              sub.request(Long.MaxValue)
            }
            override def onNext(doc: D): Unit = store(doc)

            override def onError(throwable: Throwable): Unit = stop("Observable errored", throwable)

            override def onComplete(): Unit = stop("publisher finished")

          }
        )
      }
      case Failure(ex) => stop("Failed to connect to MongoDB", ex)
    }

  }

  override def onStop(): Unit = {
    logger.info("stopping")
  }
}

Это работает, но я получаю задание, читающее один и тот же документ несколько раз, после журнала приемник запускается и останавливается непрерывно, так что повторяет одну и ту же обработку снова и снова. Ниже приведены журналы, которые я получил:

20/01/07 15:06:21 INFO MongoDBReceiver: начиная с 20.01.07 15:06:21 INFO кластер: кластер создан с настройками {хосты = [sitewhere-mongodb-rd.gfxiq.prv: 27017], mode = SINGLE, requiredClusterType = UNKNOWN, serverSelectionTimeout = '30000 мс', maxWaitQueueSize = 500} 20/01/07 15:06:21 Кластер INFO: сервер не выбран com.mongodb.asyn c .client. ClientSessionHelper$1@72859317 из описания кластера ClusterDescription {type = UNKNOWN, connectionMode = SINGLE, serverDescription = [ServerDescription {address = sitewhere-mongodb-rd.gfxiq.prv: 27017, тип = UNKNOWN, состояние = СОЕДИНЕНИЕ}]}. Ожидание 30000 мсек до истечения времени ожидания 20/01/07 15:06:21 INFO-соединение: Открытое соединение [connectionId {localValue: 65, serverValue: 14408}] к sitewhere-mongodb-rd.gfxiq.prv: 27017 20/01/ 07 15:06:21 Кластер INFO: поток мониторинга успешно подключен к серверу с описанием ServerDescription {address = sitewhere-mongodb-rd.gfxiq.prv: 27017, тип = STANDALONE, состояние = СОЕДИНЕНО, ok = true, версия = ServerVersion {versionList = [3, 4, 9]}, minWireVersion = 0, maxWireVersion = 5, maxDocumentSize = 16777216, logicSessionTimeoutMinutes = null, roundTripTimeNanos = 12612869} 20/01/07 15:06:21 INFO-соединение: открытое соединение [connectionId {localValue: 66, serverValue: 14409}] to sitewhere-mongodb-rd.gfxiq.prv: 27017 20/01/07 15:06:21 INFO ReceiverSupervisorImpl: остановка получателя с сообщением: издатель завершил: 20/01/07 15:06:21 INFO MongoDBReceiver: остановка 20/01/07 15:06:21 INFO ReceiverSupervisorImpl: вызванный получатель onStop 20/01/07 15:06:21 INFO ReceiverSupervisorImpl: отмена регистрации получателя 0 20/01/07 15 : 06: 21 ОШИБКА ReceiverTracker: незарегистрированный получатель для потока 0: издатель завершил 20/01/07 15:06:21 INFO ReceiverSupervisorImpl: остановленный получатель 0

Знаете ли вы, как это исправить, чтобы сделать Разъем считывается один раз и остается включенным, пока соединение с сервером mongoDB установлено.

...