У меня есть поток данных, поступающих от устройств IoT, которые имеют идентификатор (uuid) и количество (т. Е. Температуру).
Я хотел бы вести подсчет событий, полученных за последние 15 минут, со скользящим окном, скажем, 1 или 5 минут.
Я реализовал в Spark следующее, но оно генерирует все окна, но меня интересует только самое последнее (и, возможно, ноль, если устройство за это время не отправило никаких данных):
import org.apache.spark.sql.functions._
val agg15min = stream
.withWatermark("createdAtTimestamp", "15 minutes")
.where("device_uuid is not null")
.groupBy($"device_uuid", window($"createdAtTimestamp", "15 minutes", "5 minutes"))
.count()
После этого я попытался отфильтровать данные следующим образом:
val query15min =
agg15min
.writeStream
.format("memory")
.queryName("query15min")
.outputMode("complete")
.start()
и затем:
val df15min = spark.sql("""
with cte as (
select
device_uuid,
date_format(window.end, "MMM-dd HH:mm") as time,
rank() over (partition by device_uuid order by window.end desc) as rank,
count
from query15min
)
select
device_uuid,
count
from cte
where rank = 1""")
Но в документации сказано, что memory
не для производственного использования, а также довольно неэффективно.
Есть ли эффективный способ реализовать такую логику в Spark Structured Streaming?