Повышение эффективности Spark SQL при повторных вызовах groupBy / count.Сводный итог - PullRequest
0 голосов
/ 09 мая 2019

У меня есть Spark DataFrame, состоящий из столбцов целых чисел. Я хочу табулировать каждый столбец и сводить результаты по именам столбцов.

В следующем примере с игрушкой я начинаю с этого кадра данных df

+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  1|  1|  1|  0|  2|
|  1|  1|  1|  1|  1|
|  2|  2|  2|  3|  3|
|  0|  0|  0|  0|  1|
|  1|  1|  1|  0|  0|
|  3|  3|  3|  2|  2|
|  0|  1|  1|  1|  0|
+---+---+---+---+---+

Каждая ячейка может содержать только одну из {0, 1, 2, 3}. Теперь я хочу подсчитать количество в каждом столбце. В идеале у меня должен быть столбец для каждой метки (0, 1, 2, 3) и строка для каждого столбца. Я делаю:

val output = df.columns.map(cs => df.select(cs).groupBy(cs).count().orderBy(cs).
  withColumnRenamed(cs, "severity").
  withColumnRenamed("count", "counts").withColumn("window", lit(cs))
)

Я получаю массив DataFrames, по одному на каждую строку df. Каждый из этих фреймов данных имеет 4 строки (по одной для каждого результата). Тогда я делаю:

val longOutput = output.reduce(_ union _) // flatten the array to produce one dataframe
longOutput.show()

, чтобы свернуть массив.

+--------+------+------+
|severity|counts|window|
+--------+------+------+
|       0|     2|     a|
|       1|     3|     a|
|       2|     1|     a|
|       3|     1|     a|
|       0|     1|     b|
|       1|     4|     b|
|       2|     1|     b|
|       3|     1|     b|
...

И, наконец, я поворачиваюсь к исходным именам столбцов

longOutput.cache()
val results = longOutput.groupBy("window").pivot("severity").agg(first("counts"))

results.show()

+------+---+---+---+---+
|window|  0|  1|  2|  3|
+------+---+---+---+---+
|     e|  2|  2|  2|  1|
|     d|  3|  2|  1|  1|
|     c|  1|  4|  1|  1|
|     b|  1|  4|  1|  1|
|     a|  2|  3|  1|  1|
+------+---+---+---+---+

Однако, на примере игрушки редукция заняла 8 полных секунд. Он работал более 2 часов на моих реальных данных, которые имели 1000 столбцов и 400 000 строк, прежде чем я завершил их. Я работаю локально на машине с 12 ядрами и 128 ГБ оперативной памяти. Но ясно, что я делаю медленно даже на небольшом количестве данных, поэтому размер машины сам по себе не является проблемой. Столбец groupby / count занял всего 7 минут на полном наборе данных. Но тогда я ничего не могу сделать с этим массивом [DataFrame].

Я пробовал несколько способов избежать union . Я попытался записать свой массив на диск, но это не удалось из-за проблемы с памятью после нескольких часов усилий. Я также пытался настроить припуски памяти на Zeppelin

Так что мне нужен способ создания таблиц, который не дает мне массив данных, а скорее простой фрейм данных.

1 Ответ

2 голосов
/ 09 мая 2019

Проблема с вашим кодом состоит в том, что вы запускаете одно искровое задание на столбец, а затем большой союз. В общем, гораздо быстрее попытаться сохранить все в пределах одного.

В вашем случае, вместо того, чтобы разделить работу, вы можете разбить информационный фрейм, чтобы сделать все за один проход, как это:

df
    .select(array(df.columns.map(c => struct(lit(c) as "name", col(c) as "value") ) : _*) as "a")
    .select(explode('a))
    .select($"col.name" as "name", $"col.value" as "value")
    .groupBy("name")
    .pivot("value")
    .count()
    .show()

Эта первая строка - немного хитрая. Он создает массив кортежей, в котором имя каждого столбца сопоставляется с его значением. Затем мы взрываем его (одна строка на элемент массива) и, наконец, вычисляем базовую опору.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...