Использование нескольких свечей windows это плохо? - PullRequest
0 голосов
/ 08 января 2020

Недавно я начал работать с spark windows и пытался понять, что происходит под капотом искровых исполнителей при применении этих оконных функций. Мой вопрос заключается в том, что, поскольку каждое окно должно быть создано с использованием функции partitionBy, которая означает перестановку данных по кластеру, нормально ли использовать несколько windows?

Например, у меня есть этот фрейм данных:

    cli       date     item1   item2     
 --------- ---------- ------- -------
  1234567   20191030   A       D         
  1234567   20191030   B       D         
  1234567   20191029   A       E         
  1234567   20191026   A       F         
  7456123   20191026   C       D         
  7456123   20191025   C       F         

Целью здесь является вычисление частоты каждого элемента для каждого клиента для каждой даты на основе истории.

Например, клиент 1234567 в 20191030 использовал 4 item_1 из 20191030 и наоборот, поэтому частота А будет 3/4, а В 1 / 4.

Я решил рассчитать эти частоты для каждого дня, используя windows, так как он рассчитывает значение для каждой строки, но мне нужно использовать три windows:

// This will give me the number of items used by a client
// in that day and all history.
val lByCliWindow = Window.partitionBy(col("cli")) 

// This will give me how many times a client used this exact item_1 A in
// that day and back in history (here my history is 120 days)
val lByCliItem1Window = Window
    .partitionBy(col("cli"), col("item_1"))
    .orderBy(to_timestamp(col("date"), "yyyyMMdd").cast(LongType))
    .rangeBetween(-86400*120,0) 

// This will give me how many times a client used this exact item_3 F in
// that day and back in history (here my history is 120 days)
val lByCliItem2Window = Window
    .partitionBy(col("cli"), col("item_2"))
    .orderBy(to_timestamp(col("date"), "yyyyMMdd").cast(LongType))
    .rangeBetween(-86400*120,0) 

Ожидаемый результат:

  cli       date         frequency_item1              frequency_item2
 --------- ---------- ------------------------- --------------------------------
  1234567   20191030   Map(A -> 3/4, B -> 1/4)   Map(D -> 2/4, E -> 1/4, F 1/4)
  1234567   20191029   Map(A -> 2/2)             Map(E -> 1/2, F -> 1/2)            
  1234567   20191026   Map(A -> 1/1)             Map(F -> 1/1)
  7456123   20191026   Map(C -> 2/2)             Map(D -> 1/2, F -> 1/2)
  7456123   20191025   Map(C -> 1/1)             Map(F -> 1/1)

Когда я выполняю explain() на этом подходе, я вижу очень много планов обмена hashpartitioning et c и это вполне ожидаемо, поскольку мы каждый раз делаем partitionBy.

Если у меня почти 30 переменных, это означает 30-кратное разделение данных. (Это много перестановок)

Что я хочу понять, это нормальный подход? Будет ли спарк работать на этом разделении параллельно (создать несколько windows в одно и то же время, поэтому разделяет фрейм данных несколькими различными способами одновременно) или последовательно?

Можем ли мы использовать несколько windows? что является более дорогостоящим groupBy shuffle или partitionBy windows shuffle?

Спасибо за ваши ответы и не стесняйтесь предлагать другой подход для расчета частот с использованием windows.

1 Ответ

1 голос
/ 08 января 2020

У меня есть решение, которое включает только одно окно. Я объясню с комментариями.

// The columns you are interested in
val items = df.columns.filter(_ startsWith "item")

// collect_list aggregation. It avoid duplicates. We will group by cli and date.
val aggs = items.map(c => collect_list(col(c)) as c)

// A window over "cli" and ordered by date.
val win = Window.partitionBy("cli").orderBy("date")

// A UDF that computes the frequencies you want
// It takes as input a seq of seq because of the first aggregation we do
val compute_freqs = udf((s : Seq[Seq[String]]) => {
    val flat_s = s.flatten
    val total = flat_s.size
    flat_s.groupBy(identity).mapValues(_.size.toDouble / total)
})

// for each item, we collect the values over the window, and compute the frequency
val frequency_columns = items
    .map(item => compute_freqs(collect_list(col(item)) over win)
                       .alias(s"frequency_$item"))

// Then we use everything
val result = df
    .groupBy("cli", "date")
    .agg(aggs.head, aggs.tail : _*)
    .select((Seq("cli", "date").map(col) ++ frequency_columns) :_*)
    .orderBy($"cli", $"date" desc)

А вот и результат:

scala> result.show(false)
+-------+--------+----------------------+--------------------------------+
|cli    |date    |frequency_item1       |frequency_item2                 |
+-------+--------+----------------------+--------------------------------+
|1234567|20191030|[A -> 0.75, B -> 0.25]|[D -> 0.5, F -> 0.25, E -> 0.25]|
|1234567|20191029|[A -> 1.0]            |[F -> 0.5, E -> 0.5]            |
|1234567|20191026|[A -> 1.0]            |[F -> 1.0]                      |
|7456123|20191026|[C -> 1.0]            |[D -> 0.5, F -> 0.5]            |
|7456123|20191025|[C -> 1.0]            |[F -> 1.0]                      |
+-------+--------+----------------------+--------------------------------+
...