У меня есть Spark DataFrame, состоящий из столбцов целых чисел. Я хочу табулировать каждый столбец и сводить результаты по именам столбцов.
В следующем примере с игрушкой я начинаю с этого кадра данных df
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
| 1| 1| 1| 0| 2|
| 1| 1| 1| 1| 1|
| 2| 2| 2| 3| 3|
| 0| 0| 0| 0| 1|
| 1| 1| 1| 0| 0|
| 3| 3| 3| 2| 2|
| 0| 1| 1| 1| 0|
+---+---+---+---+---+
Каждая ячейка может содержать только одну из {0, 1, 2, 3}
. Теперь я хочу подсчитать количество в каждом столбце. В идеале у меня должен быть столбец для каждой метки (0, 1, 2, 3)
и строка для каждого столбца. Я делаю:
val output = df.columns.map(cs => df.select(cs).groupBy(cs).count().orderBy(cs).
withColumnRenamed(cs, "severity").
withColumnRenamed("count", "counts").withColumn("window", lit(cs))
)
Я получаю массив DataFrames, по одному на каждую строку df
. Каждый из этих фреймов данных имеет 4 строки (по одной для каждого результата). Тогда я делаю:
val longOutput = output.reduce(_ union _) // flatten the array to produce one dataframe
longOutput.show()
, чтобы свернуть массив.
+--------+------+------+
|severity|counts|window|
+--------+------+------+
| 0| 2| a|
| 1| 3| a|
| 2| 1| a|
| 3| 1| a|
| 0| 1| b|
| 1| 4| b|
| 2| 1| b|
| 3| 1| b|
...
И, наконец, я поворачиваюсь к исходным именам столбцов
longOutput.cache()
val results = longOutput.groupBy("window").pivot("severity").agg(first("counts"))
results.show()
+------+---+---+---+---+
|window| 0| 1| 2| 3|
+------+---+---+---+---+
| e| 2| 2| 2| 1|
| d| 3| 2| 1| 1|
| c| 1| 4| 1| 1|
| b| 1| 4| 1| 1|
| a| 2| 3| 1| 1|
+------+---+---+---+---+
Однако, на примере игрушки редукция заняла 8 полных секунд. Он работал более 2 часов на моих реальных данных, которые имели 1000 столбцов и 400 000 строк, прежде чем я завершил их. Я работаю локально на машине с 12 ядрами и 128 ГБ оперативной памяти. Но ясно, что я делаю медленно даже на небольшом количестве данных, поэтому размер машины сам по себе не является проблемой. Столбец groupby / count занял всего 7 минут на полном наборе данных. Но тогда я ничего не могу сделать с этим массивом [DataFrame].
Я пробовал несколько способов избежать union . Я попытался записать свой массив на диск, но это не удалось из-за проблемы с памятью после нескольких часов усилий. Я также пытался настроить припуски памяти на Zeppelin
Так что мне нужен способ создания таблиц, который не дает мне массив данных, а скорее простой фрейм данных.