Недавно я начал работать с spark windows и пытался понять, что происходит под капотом искровых исполнителей при применении этих оконных функций. Мой вопрос заключается в том, что, поскольку каждое окно должно быть создано с использованием функции partitionBy
, которая означает перестановку данных по кластеру, нормально ли использовать несколько windows?
Например, у меня есть этот фрейм данных:
cli date item1 item2
--------- ---------- ------- -------
1234567 20191030 A D
1234567 20191030 B D
1234567 20191029 A E
1234567 20191026 A F
7456123 20191026 C D
7456123 20191025 C F
Целью здесь является вычисление частоты каждого элемента для каждого клиента для каждой даты на основе истории.
Например, клиент 1234567 в 20191030 использовал 4 item_1 из 20191030 и наоборот, поэтому частота А будет 3/4, а В 1 / 4.
Я решил рассчитать эти частоты для каждого дня, используя windows, так как он рассчитывает значение для каждой строки, но мне нужно использовать три windows:
// This will give me the number of items used by a client
// in that day and all history.
val lByCliWindow = Window.partitionBy(col("cli"))
// This will give me how many times a client used this exact item_1 A in
// that day and back in history (here my history is 120 days)
val lByCliItem1Window = Window
.partitionBy(col("cli"), col("item_1"))
.orderBy(to_timestamp(col("date"), "yyyyMMdd").cast(LongType))
.rangeBetween(-86400*120,0)
// This will give me how many times a client used this exact item_3 F in
// that day and back in history (here my history is 120 days)
val lByCliItem2Window = Window
.partitionBy(col("cli"), col("item_2"))
.orderBy(to_timestamp(col("date"), "yyyyMMdd").cast(LongType))
.rangeBetween(-86400*120,0)
Ожидаемый результат:
cli date frequency_item1 frequency_item2
--------- ---------- ------------------------- --------------------------------
1234567 20191030 Map(A -> 3/4, B -> 1/4) Map(D -> 2/4, E -> 1/4, F 1/4)
1234567 20191029 Map(A -> 2/2) Map(E -> 1/2, F -> 1/2)
1234567 20191026 Map(A -> 1/1) Map(F -> 1/1)
7456123 20191026 Map(C -> 2/2) Map(D -> 1/2, F -> 1/2)
7456123 20191025 Map(C -> 1/1) Map(F -> 1/1)
Когда я выполняю explain()
на этом подходе, я вижу очень много планов обмена hashpartitioning
et c и это вполне ожидаемо, поскольку мы каждый раз делаем partitionBy
.
Если у меня почти 30 переменных, это означает 30-кратное разделение данных. (Это много перестановок)
Что я хочу понять, это нормальный подход? Будет ли спарк работать на этом разделении параллельно (создать несколько windows в одно и то же время, поэтому разделяет фрейм данных несколькими различными способами одновременно) или последовательно?
Можем ли мы использовать несколько windows? что является более дорогостоящим groupBy
shuffle или partitionBy
windows shuffle?
Спасибо за ваши ответы и не стесняйтесь предлагать другой подход для расчета частот с использованием windows.