Раздвижное окно в Spark Структурированное Потоковое - PullRequest
0 голосов
/ 05 июня 2019

У меня есть поток данных, поступающих от устройств IoT, которые имеют идентификатор (uuid) и количество (т. Е. Температуру).

Я хотел бы вести подсчет событий, полученных за последние 15 минут, со скользящим окном, скажем, 1 или 5 минут.

Я реализовал в Spark следующее, но оно генерирует все окна, но меня интересует только самое последнее (и, возможно, ноль, если устройство за это время не отправило никаких данных):

import org.apache.spark.sql.functions._

val agg15min = stream
  .withWatermark("createdAtTimestamp", "15 minutes")
  .where("device_uuid is not null")
  .groupBy($"device_uuid", window($"createdAtTimestamp", "15 minutes", "5 minutes"))
  .count()

После этого я попытался отфильтровать данные следующим образом:

val query15min =
  agg15min
    .writeStream
    .format("memory")
    .queryName("query15min")
    .outputMode("complete")
    .start()

и затем:

val df15min = spark.sql("""
with cte as (
select
    device_uuid,
    date_format(window.end, "MMM-dd HH:mm") as time,
    rank() over (partition by device_uuid order by window.end desc) as rank,
    count
  from query15min
)
select
  device_uuid,
  count
from cte
where rank = 1""")

Но в документации сказано, что memory не для производственного использования, а также довольно неэффективно.

Есть ли эффективный способ реализовать такую ​​логику в Spark Structured Streaming?

1 Ответ

1 голос
/ 06 июня 2019

Да, опция памяти не должна использоваться, так как она предназначена для режима отладки. Опция памяти также приносит все данные на узел драйвера Spark. Эффективным способом здесь было бы сохранить вывод (Writestream) в путь HDFS в виде файла (паркет и т. Д.). Используйте этот путь для чтения файла паркета в сеансе спарк и регулярного выполнения запроса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...