Я читаю данные из eventHub с использованием потоковой передачи искры, и мне нужно сгенерировать предупреждение, если значение не нормализовано в период ожидания. Если rdd пуст, то я хочу запустить только логику ожидания. Поэтому я отфильтрую, используя if(!rdd.isEmpty()){ add event to emptyrdd } else{ run waiting logic }
Когда я запускал этот код, тогда он в первый раз считал все значения, но когда в следующий раз произойдет событие, он не будет запущен, если условие.
var emptyrdd = sc.emptyRDD[String]
streamListner.foreachRDD(rdd =>
{
import spark.implicits._
if (rdd.isEmpty() == false) {
val eventStream = rdd
.map(eventData => (new String(eventData.getBytes), eventData.getSystemProperties.getOffset))
val eventJson = eventStream.map(x => x._1).filter(x => x.contains("SmartObjectGeneralEvent") && x.contains("id") && x.contains("name") && x.contains("descr") && x.contains("assetId") && x.contains("occurredAt"))
emptyrdd = eventJson.union(emptyrdd)
}
emptyrdd.toDF().show(false)
})