Фильтр пустого потока искр RDD - PullRequest
0 голосов
/ 07 апреля 2019

Я читаю данные из eventHub с использованием потоковой передачи искры, и мне нужно сгенерировать предупреждение, если значение не нормализовано в период ожидания. Если rdd пуст, то я хочу запустить только логику ожидания. Поэтому я отфильтрую, используя if(!rdd.isEmpty()){ add event to emptyrdd } else{ run waiting logic }

Когда я запускал этот код, тогда он в первый раз считал все значения, но когда в следующий раз произойдет событие, он не будет запущен, если условие.

 var emptyrdd = sc.emptyRDD[String]
streamListner.foreachRDD(rdd =>
  {
    import spark.implicits._
    if (rdd.isEmpty() == false) {
      val eventStream = rdd
       .map(eventData => (new String(eventData.getBytes), eventData.getSystemProperties.getOffset))
      val eventJson = eventStream.map(x => x._1).filter(x => x.contains("SmartObjectGeneralEvent") && x.contains("id") && x.contains("name") && x.contains("descr") && x.contains("assetId") && x.contains("occurredAt"))
      emptyrdd = eventJson.union(emptyrdd)
    }
    emptyrdd.toDF().show(false)
  })
...