Spark SQL Window за интервал между двумя указанными временными границами - от 3 часов до 2 часов назад - PullRequest
1 голос
/ 21 мая 2019

Как правильно указать интервал окна в Spark SQL, используя две предопределенные границы?

Я пытаюсь суммировать значения из моей таблицы за окном от 3 часов назад до 2 часов назад".

Когда я запускаю этот запрос:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

Это работает.Я получаю ожидаемые результаты, то есть суммы значений, которые попадают в скользящее окно за 2 часа.

Теперь мне нужно, чтобы скользящее окно не привязывалось к текущей строке, а учитывало строки между3 часа назад и 2 часа назад.Я пытался с:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

Но я получаю extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'} ошибку.

Я также пытался с:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

но затем я получаю другую ошибку scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)

Третий вариант, который я пробовал:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

, и он не работает так, как мы ожидаем: cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch

У меня возникают трудности с поиском документов за интервалвведите эту ссылку недостаточно, а другая информация наполовину испечена.По крайней мере, что я нашел.

1 Ответ

0 голосов
/ 30 мая 2019

Поскольку интервалы интервалов не сработали, мне пришлось обратиться к альтернативному подходу. Это выглядит примерно так:

  • подготовить список интервалов, для которых необходимо выполнить вычисления
  • для каждого из интервалов, запустите вычисление
    • каждая из этих итераций создает фрейм данных
  • после итераций у нас есть список фреймов данных
  • объединяет фреймы данных из списка в один больший фрейм данных
  • выпиши результаты

В моем случае мне приходилось выполнять вычисления для каждого часа дня и объединять эти «ежечасные» результаты, то есть список из 24 фреймов данных, в один «ежедневный» фрейм данных.

Код с точки зрения очень высокого уровня выглядит следующим образом:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...