У меня есть потоковый код pyspark, который считывает данные сердцебиения с сервера kafka.Я могу подтвердить, что я получаю данные, используя следующую строку.
heartBeats = mac_id.groupBy('macId').count()
Я хочу использовать оконные функции для отслеживания устройств, которые не отправляли звуковые сигналы, скажем, в последние несколько минут.
windowedCountsDF = \
mac_id \
.withWatermark("time", "10 seconds") \
.groupBy(
mac_id.macId,
window(mac_id.time, "10 seconds", "5 seconds")) \
.count()
Однако при запуске потоковой передачи это не дает никаких результатов, даже если данные поступают.
query = windowedCountsDF.writeStream.outputMode('complete').format('console').option('truncate',False).start()
Что-то идет не так??Спасибо !