Spark dataFrame слишком долго отображается после обновления его столбцов - PullRequest
1 голос
/ 19 февраля 2020

У меня есть фрейм данных ок. 4 миллиона строк и 35 столбцов в качестве входных данных.

Все, что я делаю с этим фреймом данных, это следующие шаги:

  • Для списка заданных столбцов я вычисляю сумму для данного списка объектов группы и присоединили его в качестве нового столбца к моему входному фрейму данных
  • Я сбрасываю сумму каждого нового столбца сразу после того, как присоединил ее к фрейму данных.

Таким образом, мы получаем то же самое dataFrame, с которого мы начали (теоретически).

Однако я заметил, что если мой список заданных столбцов становится слишком большим (из более чем 6 столбцов), выходной dataFrame становится невозможным для манипулирования. Даже простое отображение занимает 10 минут.

Вот пример моего кода (df - мой входной кадр данных):

  for c in list_columns:
    df = df.join(df.groupby(list_group_features).agg(sum(c).alias('sum_' + c)), list_group_features)
    df = df.drop('sum_' + c)

1 Ответ

2 голосов
/ 19 февраля 2020

Это происходит из-за внутренней работы Spark и ее ленивой оценки.

Что делает Spark, когда вы звоните groupby, join, agg, она присоединяет эти вызовы к плану df объект. Поэтому, несмотря на то, что он ничего не выполняет с данными, вы создаете большой план выполнения, который хранится внутри объекта Spark DataFrame.

Только при вызове действия (show, count, write, et c.), Spark оптимизирует план и выполняет его. Если план слишком велик, выполнение шага оптимизации может занять некоторое время. Также помните, что оптимизация плана происходит на водителе, а не на исполнителях. Поэтому, если ваш драйвер занят или перегружен, он также задерживает шаг оптимизации плана зажигания.

Полезно помнить, что объединения - это дорогостоящие операции в Spark, как для оптимизации, так и для выполнения. Если вы можете, вы всегда должны избегать объединений при работе с одним DataFrame и использовать вместо этого функциональность окна. Объединения следует использовать только в том случае, если вы объединяете разные фреймы данных из разных источников (разных таблиц).

Способ оптимизации кода может быть следующим:

import pyspark
import pyspark.sql.functions as f

w = pyspark.sql.Window.partitionBy(list_group_features)
agg_sum_exprs = [f.sum(f.col(c)).alias("sum_" + c).over(w) for c in list_columns]
res_df = df.select(df.columns + agg_sum_exprs)

Это должно быть масштабируемым и быстрым для большие списки list_group_features и list_columns.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...